Aggregate 4 - Custom base classes¶
This example shows another variation of the Dog
aggregate class used
in the tutorial and module docs.
In contrast with the previous examples, this example does not use the
library Aggregate
class. Instead, it defines its own Aggregate
and
DomainEvent
base classes. Similar to the previous examples, the Aggregate
class is a normal (mutable) Python class and the DomainEvent
class is a
frozen Python data class. A projector()
class method is defined
on the Aggregate
class.
The aggregate event classes are explicitly defined, and the command method bodies explicitly trigger events. Like the previous example, this example uses a separate function to apply events to the current aggregate state.
As in the previous example, the application code simply uses the aggregate class as if it were a normal Python object class. However, the aggregate projector function must be supplied when getting an aggregate from the repository and when taking snapshots.
Domain model¶
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, List, Optional, Type, TypeVar, cast
from uuid import UUID, uuid4
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import Snapshot
@dataclass(frozen=True)
class DomainEvent:
originator_version: int
originator_id: UUID
timestamp: datetime
@staticmethod
def create_timestamp() -> datetime:
return datetime.now(tz=timezone.utc)
TAggregate = TypeVar("TAggregate", bound="Aggregate")
class Aggregate:
id: UUID
version: int
created_on: datetime
def __init__(self, event: DomainEvent):
self.id = event.originator_id
self.version = event.originator_version
self.created_on = event.timestamp
def trigger_event(
self,
event_class: Type[DomainEvent],
**kwargs: Any,
) -> None:
kwargs = kwargs.copy()
kwargs.update(
originator_id=self.id,
originator_version=self.version + 1,
timestamp=event_class.create_timestamp(),
)
new_event = event_class(**kwargs)
self.apply(new_event)
self.pending_events.append(new_event)
@singledispatchmethod
def apply(self, event: DomainEvent) -> None:
"""Applies event to aggregate."""
def collect_events(self) -> List[DomainEvent]:
events, self.pending_events = self.pending_events, []
return events
@classmethod
def projector(
cls: Type[TAggregate],
_: Optional[TAggregate],
events: Iterable[DomainEvent],
) -> Optional[TAggregate]:
aggregate = object.__new__(cls)
for event in events:
aggregate.apply(event)
return aggregate
@property
def pending_events(self) -> List[DomainEvent]:
return type(self).__pending_events[id(self)]
@pending_events.setter
def pending_events(self, pending_events: List[DomainEvent]) -> None:
type(self).__pending_events[id(self)] = pending_events
__pending_events: Dict[int, List[DomainEvent]] = defaultdict(list)
def __del__(self) -> None:
try:
type(self).__pending_events.pop(id(self))
except KeyError:
pass
class Dog(Aggregate):
@dataclass(frozen=True)
class Registered(DomainEvent):
name: str
@dataclass(frozen=True)
class TrickAdded(DomainEvent):
trick: str
@classmethod
def register(cls, name: str) -> "Dog":
event = cls.Registered(
originator_id=uuid4(),
originator_version=1,
timestamp=DomainEvent.create_timestamp(),
name=name,
)
dog = cast(Dog, cls.projector(None, [event]))
dog.pending_events.append(event)
return dog
def add_trick(self, trick: str) -> None:
self.trigger_event(self.TrickAdded, trick=trick)
@singledispatchmethod
def apply(self, event: DomainEvent) -> None:
"""Applies event to aggregate."""
@apply.register(Registered)
def _(self, event: Registered) -> None:
super().__init__(event)
self.name = event.name
self.tricks: List[str] = []
@apply.register(TrickAdded)
def _(self, event: TrickAdded) -> None:
self.tricks.append(event.trick)
self.version = event.originator_version
@apply.register(Snapshot)
def _(self, event: Snapshot) -> None:
self.__dict__.update(event.state)
Application¶
from __future__ import annotations
from typing import Any, Dict
from uuid import UUID
from eventsourcing.application import Application
from eventsourcing.examples.aggregate4.domainmodel import Dog
class DogSchool(Application):
is_snapshotting_enabled = True
def register_dog(self, name: str) -> UUID:
dog = Dog.register(name)
self.save(dog)
return dog.id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog = self.repository.get(dog_id, projector_func=Dog.projector)
dog.add_trick(trick)
self.save(dog)
def get_dog(self, dog_id: UUID) -> Dict[str, Any]:
dog = self.repository.get(dog_id, projector_func=Dog.projector)
return {"name": dog.name, "tricks": tuple(dog.tricks)}
Test case¶
from __future__ import annotations
from unittest import TestCase
from eventsourcing.examples.aggregate4.application import DogSchool
from eventsourcing.examples.aggregate4.domainmodel import Dog
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
school.take_snapshot(dog_id, version=3, projector_func=Dog.projector)
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead", "fetch ball")