from __future__ import annotations
import os
from typing import TYPE_CHECKING, Any, Generic, cast
from eventsourcing.dcb.domain import (
EnduringObject,
InitialDecision,
Perspective,
Selector,
TDecision,
TGroup,
TPerspective,
TSlice,
)
from eventsourcing.dcb.persistence import (
DCBEventStore,
DCBInfrastructureFactory,
DCBMapper,
NotFoundError,
)
from eventsourcing.utils import Environment, EnvType, resolve_topic
if TYPE_CHECKING:
from collections.abc import Mapping
from types import TracebackType
from typing_extensions import Self
[docs]
class DCBApplication:
name = "DCBApplication"
env: Mapping[str, str] = {"PERSISTENCE_MODULE": "eventsourcing.dcb.popo"}
def __init_subclass__(cls, **kwargs: Any) -> None:
if "name" not in cls.__dict__:
cls.name = cls.__name__
[docs]
def __init__(self, env: EnvType | None = None):
self.env = self.construct_env(self.name, env)
self.factory = DCBInfrastructureFactory.construct(self.env)
self.recorder = self.factory.dcb_recorder()
if "MAPPER_TOPIC" in self.env:
# Only need a mapper, event store, and repository
# if we are using the higher-level abstractions.
self.mapper = cast(
DCBMapper[Any], resolve_topic(self.env["MAPPER_TOPIC"])()
)
assert isinstance(self.mapper, DCBMapper)
self.events = DCBEventStore(self.mapper, self.recorder)
self.repository = DCBRepository(self.events)
[docs]
def construct_env(self, name: str, env: EnvType | None = None) -> Environment:
"""Constructs environment from which application will be configured."""
_env = dict(type(self).env)
_env.update(os.environ)
if env is not None:
_env.update(env)
return Environment(name, _env)
[docs]
def do(self, s: TSlice) -> TSlice:
"""
Advances and executes a slice, then saves new decisions.
"""
if type(s).do_projection:
s = self.repository.advance(s)
s.execute()
if s.new_decisions:
self.repository.save(s)
return s
[docs]
def close(self) -> None:
self.factory.close()
def __enter__(self) -> Self:
self.factory.__enter__()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.close()
self.factory.__exit__(exc_type, exc_val, exc_tb)
[docs]
class DCBRepository(Generic[TDecision]):
[docs]
def __init__(self, eventstore: DCBEventStore[TDecision]):
self.eventstore = eventstore
[docs]
def save(self, p: Perspective[TDecision]) -> int:
return self.eventstore.append(
events=p.collect_events(),
cb=p.consistency_boundary(),
after=p.last_known_position,
)
[docs]
def get(
self,
enduring_object_id: str,
) -> EnduringObject[TDecision]:
cb = [Selector(tags=[enduring_object_id])]
events = self.eventstore.read(*cb)
obj: EnduringObject[TDecision] | None = None
for event in events:
obj = event.decision.mutate(obj)
if obj is None:
raise NotFoundError
obj.last_known_position = events.head
return obj
[docs]
def get_many(
self,
*enduring_object_ids: str,
) -> list[EnduringObject[TDecision] | None]:
cb = [
Selector(tags=[enduring_object_id])
for enduring_object_id in enduring_object_ids
]
tagged_decisions = self.eventstore.read(cb)
objs: dict[str, EnduringObject[TDecision] | None] = dict.fromkeys(
enduring_object_ids
)
for tagged in tagged_decisions:
for tag in tagged.tags:
obj = objs.get(tag)
if not isinstance(tagged.decision, InitialDecision) and not obj:
continue
obj = tagged.decision.mutate(obj)
objs[tag] = obj
for obj in objs.values():
if obj is not None:
obj.last_known_position = tagged_decisions.head
return list(objs.values())
[docs]
def get_group(self, cls: type[TGroup], *enduring_object_ids: str) -> TGroup:
enduring_objects = self.get_many(*enduring_object_ids)
perspective = cls(*enduring_objects)
last_known_positions = [
o.last_known_position
for o in enduring_objects
if o and o.last_known_position
]
perspective.last_known_position = (
max(last_known_positions) if last_known_positions else None
)
return perspective
[docs]
def advance(self, p: TPerspective) -> TPerspective:
events = self.eventstore.read(
cb=p.consistency_boundary(),
after=p.last_known_position,
)
for event in events:
event.decision.mutate(p)
p.last_known_position = events.head
return p