Source code for examples.aggregate6.baseclasses

from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, TypeVar

from eventsourcing.domain import datetime_now_with_tzinfo

if TYPE_CHECKING:
    from collections.abc import Iterable
    from datetime import datetime
    from uuid import UUID


[docs] @dataclass(frozen=True) class DomainEvent: originator_id: UUID originator_version: int timestamp: datetime
[docs] @dataclass(frozen=True) class Aggregate: id: UUID version: int created_on: datetime modified_on: datetime
[docs] @dataclass(frozen=True) class Snapshot(DomainEvent): state: dict[str, Any]
[docs] @classmethod def take(cls, aggregate: Aggregate) -> Snapshot: return Snapshot( originator_id=aggregate.id, originator_version=aggregate.version, timestamp=datetime_now_with_tzinfo(), state=aggregate.__dict__, )
TAggregate = TypeVar("TAggregate", bound=Aggregate) MutatorFunction = Callable[..., TAggregate | None]
[docs] def aggregate_projector( mutator: MutatorFunction[TAggregate], ) -> Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None]: def project_aggregate( aggregate: TAggregate | None, events: Iterable[DomainEvent] ) -> TAggregate | None: for event in events: aggregate = mutator(event, aggregate) return aggregate return project_aggregate