Source code for eventsourcing.dcb.application

from __future__ import annotations

import os
from collections import defaultdict
from typing import TYPE_CHECKING, Any, cast

from typing_extensions import TypeVar

from eventsourcing.dcb.domain import (
    EnduringObject,
    Perspective,
    Selector,
    TDecision,
    TGroup,
    TPerspective,
    TSlice,
)
from eventsourcing.dcb.persistence import (
    DCBEventStore,
    DCBInfrastructureFactory,
    DCBMapper,
    NotFoundError,
)
from eventsourcing.utils import Environment, EnvType, resolve_topic

if TYPE_CHECKING:
    from collections.abc import Mapping, Sequence
    from types import TracebackType
    from typing import Self


[docs] class DCBApplication: name = "DCBApplication" env: Mapping[str, str] = {"PERSISTENCE_MODULE": "eventsourcing.dcb.popo"} def __init_subclass__(cls, **kwargs: Any) -> None: if "name" not in cls.__dict__: cls.name = cls.__name__
[docs] def __init__(self, env: EnvType | None = None): self.env = self.construct_env(self.name, env) self.factory = DCBInfrastructureFactory.construct(self.env) self.recorder = self.factory.dcb_recorder() if "MAPPER_TOPIC" in self.env: # Only need a mapper, event store, and repository # if we are using the higher-level abstractions. self.mapper = cast(DCBMapper, resolve_topic(self.env["MAPPER_TOPIC"])()) assert isinstance(self.mapper, DCBMapper) self.events = DCBEventStore(self.mapper, self.recorder) self.repository = DCBRepository(self.events)
[docs] def construct_env(self, name: str, env: EnvType | None = None) -> Environment: """Constructs environment from which application will be configured.""" _env = dict(type(self).env) _env.update(os.environ) if env is not None: _env.update(env) return Environment(name, _env)
[docs] def do(self, s: TSlice) -> TSlice: """ Advances and executes a slice, then saves new decisions. """ if type(s).do_projection: s = self.repository.advance(s) s.execute() if s.new_decisions: self.repository.save(s) return s
[docs] def close(self) -> None: self.factory.close()
def __enter__(self) -> Self: self.factory.__enter__() return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: self.close() self.factory.__exit__(exc_type, exc_val, exc_tb)
TEnduringObject = TypeVar("TEnduringObject", bound=EnduringObject[Any, Any])
[docs] class DCBRepository:
[docs] def __init__(self, eventstore: DCBEventStore): self.eventstore = eventstore
[docs] def save(self, p: Perspective[TDecision]) -> int: return self.eventstore.append( events=p.collect_events(), cb=p.consistency_boundary(), after=p.last_known_position, )
[docs] def get( self, enduring_object_id: str, enduring_object_cls: type[TEnduringObject] ) -> TEnduringObject: cb = [Selector(tags=[enduring_object_id])] events = self.eventstore.read(*cb) obj: TEnduringObject | None = enduring_object_cls.__new__(enduring_object_cls) count_events = 0 for event in events: count_events += 1 obj = event.decision.mutate(obj) if count_events == 0 or obj is None: raise NotFoundError obj.last_known_position = events.head return obj
[docs] def get_many( self, ids: Sequence[str], *, classes: Sequence[type[EnduringObject[Any, Any]]] = (), cls: type[EnduringObject[Any, Any]] | None = None, ) -> list[EnduringObject[Any] | None]: if len(classes) == 0: assert cls is not None classes = [cls] * len(ids) cb = [Selector(tags=[id_]) for id_ in ids] objs: dict[str, EnduringObject[Any, Any] | None] = { id_: cls.__new__(cls) for (id_, cls) in zip(ids, classes, strict=True) } event_counts: dict[str, int] = defaultdict(int) read_response = self.eventstore.read(cb) for tagged in read_response: for tag in tagged.tags: event_counts[tag] += 1 obj = objs.get(tag) if obj is not None: objs[tag] = tagged.decision.mutate(obj) for id_ in ids: obj = objs.get(id_) if obj is None or event_counts[id_] == 0: objs[id_] = None else: obj.last_known_position = read_response.head return list(objs.values())
[docs] def get_group(self, cls: type[TGroup], *enduring_object_ids: str) -> TGroup: enduring_objects = self.get_many( ids=enduring_object_ids, classes=cls.classes, ) group = cls(*enduring_objects) last_known_positions = [ o.last_known_position for o in enduring_objects if o and o.last_known_position ] group.last_known_position = ( max(last_known_positions) if last_known_positions else None ) return group
[docs] def advance(self, p: TPerspective) -> TPerspective: events = self.eventstore.read( cb=p.consistency_boundary(), after=p.last_known_position, ) for event in events: event.decision.mutate(p) p.last_known_position = events.head return p