from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar
from eventsourcing.domain import datetime_now_with_tzinfo
if TYPE_CHECKING:
from collections.abc import Iterable
from datetime import datetime
from uuid import UUID
[docs]
@dataclass(frozen=True)
class DomainEvent:
originator_id: UUID
originator_version: int
timestamp: datetime
[docs]
@dataclass(frozen=True)
class Aggregate:
id: UUID
version: int
created_on: datetime
modified_on: datetime
[docs]
@dataclass(frozen=True)
class Snapshot(DomainEvent):
state: dict[str, Any]
[docs]
@classmethod
def take(cls, aggregate: Aggregate) -> Snapshot:
return Snapshot(
originator_id=aggregate.id,
originator_version=aggregate.version,
timestamp=datetime_now_with_tzinfo(),
state=aggregate.__dict__,
)
TAggregate = TypeVar("TAggregate", bound=Aggregate)
MutatorFunction = Callable[..., Optional[TAggregate]]
[docs]
def aggregate_projector(
mutator: MutatorFunction[TAggregate],
) -> Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None]:
def project_aggregate(
aggregate: TAggregate | None, events: Iterable[DomainEvent]
) -> TAggregate | None:
for event in events:
aggregate = mutator(event, aggregate)
return aggregate
return project_aggregate