Source code for examples.aggregate7.immutablemodel

from __future__ import annotations

from collections.abc import Callable
from datetime import datetime
from typing import TYPE_CHECKING, Any, TypeVar
from uuid import UUID

from pydantic import BaseModel, ConfigDict

from eventsourcing.domain import datetime_now_with_tzinfo
from eventsourcing.utils import get_topic

if TYPE_CHECKING:
    from collections.abc import Iterable


[docs] class Immutable(BaseModel): model_config = ConfigDict(extra="forbid", frozen=True)
[docs] class DomainEvent(Immutable): originator_id: UUID originator_version: int timestamp: datetime
[docs] class Aggregate(Immutable): id: UUID version: int created_on: datetime modified_on: datetime
[docs] class Snapshot(DomainEvent): topic: str state: dict[str, Any]
[docs] @classmethod def take(cls, aggregate: Aggregate) -> Snapshot: return Snapshot( originator_id=aggregate.id, originator_version=aggregate.version, timestamp=datetime_now_with_tzinfo(), topic=get_topic(type(aggregate)), state=aggregate.model_dump(), )
TAggregate = TypeVar("TAggregate", bound=Aggregate) MutatorFunction = Callable[..., TAggregate | None]
[docs] def aggregate_projector( mutator: MutatorFunction[TAggregate], ) -> Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None]: def project_aggregate( aggregate: TAggregate | None, events: Iterable[DomainEvent] ) -> TAggregate | None: for event in events: aggregate = mutator(event, aggregate) return aggregate return project_aggregate