Source code for examples.aggregate7.domainmodel

from __future__ import annotations

from functools import singledispatch
from uuid import uuid4

from eventsourcing.domain import datetime_now_with_tzinfo
from examples.aggregate7.immutablemodel import (
    Aggregate,
    DomainEvent,
    Immutable,
    Snapshot,
    aggregate_projector,
)


[docs] class Trick(Immutable): name: str
[docs] class Dog(Aggregate): name: str tricks: tuple[Trick, ...]
[docs] class DogRegistered(DomainEvent): name: str
[docs] class TrickAdded(DomainEvent): trick: Trick
[docs] def register_dog(name: str) -> DomainEvent: return DogRegistered( originator_id=uuid4(), originator_version=1, timestamp=datetime_now_with_tzinfo(), name=name, )
[docs] def add_trick(dog: Dog, trick: Trick) -> DomainEvent: return TrickAdded( originator_id=dog.id, originator_version=dog.version + 1, timestamp=datetime_now_with_tzinfo(), trick=trick, )
[docs] @singledispatch def mutate_dog(_: DomainEvent, __: Dog | None) -> Dog | None: """Mutates aggregate with event."""
@mutate_dog.register def _(event: DogRegistered, _: None) -> Dog: return Dog( id=event.originator_id, version=event.originator_version, created_on=event.timestamp, modified_on=event.timestamp, name=event.name, tricks=(), ) @mutate_dog.register def _(event: TrickAdded, dog: Dog) -> Dog: return Dog( id=dog.id, version=event.originator_version, created_on=dog.created_on, modified_on=event.timestamp, name=dog.name, tricks=(*dog.tricks, event.trick), ) @mutate_dog.register def _(event: Snapshot, _: None) -> Dog: return Dog( id=event.state["id"], version=event.state["version"], created_on=event.state["created_on"], modified_on=event.state["modified_on"], name=event.state["name"], tricks=event.state["tricks"], ) project_dog = aggregate_projector(mutate_dog)