Source code for examples.aggregate4.baseclasses

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, TypeVar

from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import datetime_now_with_tzinfo
from eventsourcing.utils import get_topic

if TYPE_CHECKING:
    from collections.abc import Iterable
    from datetime import datetime
    from uuid import UUID

    from typing_extensions import Self

TAggregate = TypeVar("TAggregate", bound="Aggregate")


[docs] @dataclass(frozen=True) class DomainEvent: originator_version: int originator_id: UUID timestamp: datetime
[docs] @dataclass class Aggregate: id: UUID version: int created_on: datetime modified_on: datetime _pending_events: list[DomainEvent]
[docs] @dataclass(frozen=True) class Snapshot(DomainEvent): topic: str state: dict[str, Any]
[docs] @classmethod def take( cls, aggregate: Aggregate, ) -> Aggregate.Snapshot: aggregate_state = dict(aggregate.__dict__) aggregate_state.pop("_pending_events") return Aggregate.Snapshot( originator_id=aggregate.id, originator_version=aggregate.version, timestamp=datetime_now_with_tzinfo(), topic=get_topic(type(aggregate)), state=aggregate_state, )
[docs] def trigger_event( self, event_class: type[DomainEvent], **kwargs: Any, ) -> None: kwargs = kwargs.copy() kwargs.update( originator_id=self.id, originator_version=self.version + 1, timestamp=datetime_now_with_tzinfo(), ) new_event = event_class(**kwargs) self.apply_event(new_event) self.append_event(new_event)
[docs] def append_event(self, *events: DomainEvent) -> None: self._pending_events.extend(events)
[docs] def collect_events(self) -> list[DomainEvent]: events, self._pending_events = self._pending_events, [] return events
[docs] @singledispatchmethod def apply_event(self, event: DomainEvent) -> None: msg = f"For {type(event).__qualname__}" raise NotImplementedError(msg)
@apply_event.register(Snapshot) def _(self, event: Snapshot) -> None: self.__dict__.update(event.state)
[docs] @classmethod def project_events( cls, _: Self | None, events: Iterable[DomainEvent], ) -> Self: aggregate: Self = Aggregate.__new__(cls) for event in events: aggregate.apply_event(event) return aggregate
def __new__(cls, *args: Any, **kwargs: Any) -> Self: aggregate = super().__new__(cls, *args, **kwargs) aggregate._pending_events = [] return aggregate