Source code for examples.aggregate3.domainmodel

from __future__ import annotations

from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import Aggregate


[docs] class Dog(Aggregate):
[docs] class Event(Aggregate.Event):
[docs] def apply(self, aggregate: Dog) -> None: aggregate.apply(self)
[docs] class Registered(Event, Aggregate.Created): name: str
[docs] class TrickAdded(Event): trick: str
[docs] @classmethod def register(cls, name: str) -> Dog: return cls._create(cls.Registered, name=name)
[docs] def add_trick(self, trick: str) -> None: self.trigger_event(self.TrickAdded, trick=trick)
[docs] @singledispatchmethod def apply(self, event: Event) -> None: """Applies event to aggregate."""
@apply.register def _(self, event: Dog.Registered) -> None: self.name = event.name self.tricks: list[str] = [] @apply.register def _(self, event: Dog.TrickAdded) -> None: self.tricks.append(event.trick)