Source code for examples.aggregate10.mutablemodel

from __future__ import annotations

import typing
from datetime import datetime  # noqa: TC003
from typing import TYPE_CHECKING, Any, cast
from uuid import UUID, uuid4

from eventsourcing.domain import (
    BaseAggregate,
    CanInitAggregate,
    CanMutateAggregate,
    CanSnapshotAggregate,
    datetime_now_with_tzinfo,
)
from eventsourcing.utils import get_topic, resolve_topic
from examples.aggregate9.immutablemodel import DomainEvent, Immutable

if TYPE_CHECKING:
    from eventsourcing.domain import MutableOrImmutableAggregate


[docs] class SnapshotState(Immutable, frozen=True): created_on: datetime modified_on: datetime
[docs] class AggregateSnapshot(DomainEvent, CanSnapshotAggregate[UUID], frozen=True): topic: str state: Any
[docs] @classmethod def take(cls, aggregate: MutableOrImmutableAggregate[UUID]) -> AggregateSnapshot: type_of_snapshot_state = typing.get_type_hints(cls)["state"] aggregate_state = dict(aggregate.__dict__) aggregate_state.pop("_id") aggregate_state.pop("_version") aggregate_state["created_on"] = aggregate_state.pop("_created_on") aggregate_state["modified_on"] = aggregate_state.pop("_modified_on") aggregate_state.pop("_pending_events") snapshot_state = type_of_snapshot_state(**aggregate_state) return cls( originator_id=aggregate.id, originator_version=aggregate.version, timestamp=datetime_now_with_tzinfo(), topic=get_topic(type(aggregate)), state=snapshot_state, )
[docs] def mutate(self, _: None) -> Aggregate: """Reconstructs the snapshotted :class:`Aggregate` object.""" cls = cast("type[Aggregate]", resolve_topic(self.topic)) aggregate_state: dict[str, Any] = { key: getattr(self.state, key) for key in type(self.state).__struct_fields__ } aggregate_state["_id"] = self.originator_id aggregate_state["_version"] = self.originator_version aggregate_state["_created_on"] = self.state.created_on aggregate_state["_modified_on"] = self.state.modified_on aggregate_state["_version"] = self.originator_version aggregate_state["_pending_events"] = [] aggregate = object.__new__(cls) aggregate.__dict__.update(aggregate_state) return aggregate
[docs] class AggregateEvent(DomainEvent, CanMutateAggregate[UUID], frozen=True): def _as_dict(self) -> dict[str, Any]: return {key: getattr(self, key) for key in self.__struct_fields__}
[docs] class Aggregate(BaseAggregate[UUID]):
[docs] @classmethod def create_id(cls, *_: Any, **__: Any) -> UUID: return uuid4()
[docs] class Event(AggregateEvent, frozen=True): pass
[docs] class Created(Event, CanInitAggregate[UUID], frozen=True): originator_topic: str