Source code for examples.aggregate5.baseclasses

from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, TypeVar

from eventsourcing.dispatch import singledispatchmethod

if TYPE_CHECKING:
    from collections.abc import Iterable
    from uuid import UUID

    from typing_extensions import Self


[docs] @dataclass(frozen=True) class DomainEvent: originator_id: UUID originator_version: int timestamp: datetime
[docs] @staticmethod def create_timestamp() -> datetime: return datetime.now(tz=timezone.utc)
TAggregate = TypeVar("TAggregate", bound="Aggregate")
[docs] @dataclass(frozen=True) class Aggregate: id: UUID version: int created_on: datetime modified_on: datetime
[docs] def trigger_event( self, event_class: type[DomainEvent], **kwargs: Any, ) -> DomainEvent: kwargs = kwargs.copy() kwargs.update( originator_id=self.id, originator_version=self.version + 1, timestamp=event_class.create_timestamp(), ) return event_class(**kwargs)
[docs] @classmethod def projector( cls, aggregate: Self | None, events: Iterable[DomainEvent], ) -> Self | None: for event in events: aggregate = cls.mutate(event, aggregate) return aggregate
[docs] @singledispatchmethod[Any] @staticmethod def mutate(event: DomainEvent, aggregate: TAggregate | None) -> TAggregate | None: """Mutates aggregate with event."""
[docs] @dataclass(frozen=True) class Snapshot(DomainEvent): state: dict[str, Any]
[docs] @classmethod def take(cls, aggregate: Aggregate) -> Aggregate.Snapshot: return Aggregate.Snapshot( originator_id=aggregate.id, originator_version=aggregate.version, timestamp=DomainEvent.create_timestamp(), state=aggregate.__dict__, )