from __future__ import annotations
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import Aggregate
[docs]
class Dog(Aggregate):
[docs]
class Event(Aggregate.Event):
[docs]
def apply(self, aggregate: Dog) -> None:
aggregate.apply(self)
[docs]
class Registered(Event, Aggregate.Created):
name: str
[docs]
class TrickAdded(Event):
trick: str
[docs]
@classmethod
def register(cls, name: str) -> Dog:
return cls._create(cls.Registered, name=name)
[docs]
def add_trick(self, trick: str) -> None:
self.trigger_event(self.TrickAdded, trick=trick)
[docs]
@singledispatchmethod
def apply(self, event: Event) -> None:
"""Applies event to aggregate."""
@apply.register
def _(self, event: Dog.Registered) -> None:
self.name = event.name
self.tricks: list[str] = []
@apply.register
def _(self, event: Dog.TrickAdded) -> None:
self.tricks.append(event.trick)