Aggregate 4 - Custom base classes¶
This example shows another variation of the Dog
aggregate class used
in the tutorial and module docs.
In contrast with the previous examples, this example does not use the
library Aggregate
class. Instead, it defines its own Aggregate
and
DomainEvent
base classes. Similar to the previous examples, the Aggregate
class is a normal (mutable) Python class and the DomainEvent
class is a
frozen Python dataclass. A projector()
class method is defined
on the Aggregate
class.
The aggregate event classes are explicitly defined, and the command method bodies explicitly trigger events. Like the previous example, this example uses a separate function to apply events to the current aggregate state.
As in the previous example, the application code simply uses the aggregate class as if it were a normal Python object class. However, the aggregate projector function must be supplied when getting an aggregate from the repository and when taking snapshots.
Domain model¶
from __future__ import annotations
import sys
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, List, Optional, Type, TypeVar
from uuid import UUID, uuid4
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import Snapshot
@dataclass(frozen=True)
class DomainEvent:
originator_version: int
originator_id: UUID
timestamp: datetime
@staticmethod
def create_timestamp() -> datetime:
return datetime.now(tz=timezone.utc)
TAggregate = TypeVar("TAggregate", bound="Aggregate")
class Aggregate:
id: UUID
version: int
created_on: datetime
def __init__(self, event: DomainEvent):
self.id = event.originator_id
self.version = event.originator_version
self.created_on = event.timestamp
def trigger_event(
self,
event_class: Type[DomainEvent],
**kwargs: Any,
) -> None:
kwargs = kwargs.copy()
kwargs.update(
originator_id=self.id,
originator_version=self.version + 1,
timestamp=event_class.create_timestamp(),
)
new_event = event_class(**kwargs)
self.apply(new_event)
self._pending_events.append(new_event)
if sys.version_info >= (3, 8): # pragma: no cover
apply: singledispatchmethod[None]
else: # pragma: no cover
apply: singledispatchmethod
def collect_events(self) -> List[DomainEvent]:
events, self._pending_events = self._pending_events, []
return events
@classmethod
def projector(
cls: Type[TAggregate],
aggregate: Optional[TAggregate],
events: Iterable[DomainEvent],
) -> Optional[TAggregate]:
aggregate = object.__new__(cls)
for event in events:
aggregate.apply(event)
return aggregate
__pending_events: Dict[int, List[DomainEvent]] = defaultdict(list)
@property
def _pending_events(self) -> List[DomainEvent]:
return type(self).__pending_events[id(self)]
@_pending_events.setter
def _pending_events(self, pending_events: List[DomainEvent]) -> None:
type(self).__pending_events[id(self)] = pending_events
def __del__(self) -> None:
try:
type(self).__pending_events.pop(id(self))
except KeyError:
pass
class Dog(Aggregate):
@dataclass(frozen=True)
class Registered(DomainEvent):
name: str
def __init__(self, name: str) -> None:
event = self.Registered(
originator_id=uuid4(),
originator_version=1,
timestamp=DomainEvent.create_timestamp(),
name=name,
)
self.apply(event)
self._pending_events.append(event)
@singledispatchmethod
def apply(self, event: DomainEvent) -> None:
"""Applies event to aggregate."""
@apply.register(Registered)
def _(self, event: Registered) -> None:
super().__init__(event)
self.name = event.name
self.tricks: List[str] = []
@dataclass(frozen=True)
class TrickAdded(DomainEvent):
trick: str
def add_trick(self, trick: str) -> None:
self.trigger_event(self.TrickAdded, trick=trick)
@apply.register(TrickAdded)
def _(self, event: TrickAdded) -> None:
self.tricks.append(event.trick)
self.version = event.originator_version
@apply.register(Snapshot)
def _(self, event: Snapshot) -> None:
self.__dict__.update(event.state)
Application¶
from typing import Any, Dict
from uuid import UUID
from eventsourcing.application import Application
from eventsourcing.examples.aggregate4.domainmodel import Dog
class DogSchool(Application):
is_snapshotting_enabled = True
def register_dog(self, name: str) -> UUID:
dog = Dog(name)
self.save(dog)
return dog.id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog = self.repository.get(dog_id, projector_func=Dog.projector)
dog.add_trick(trick)
self.save(dog)
def get_dog(self, dog_id: UUID) -> Dict[str, Any]:
dog = self.repository.get(dog_id, projector_func=Dog.projector)
return {"name": dog.name, "tricks": tuple(dog.tricks)}
Test case¶
from unittest import TestCase
from eventsourcing.examples.aggregate4.application import DogSchool
from eventsourcing.examples.aggregate4.domainmodel import Dog
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
school.take_snapshot(dog_id, version=3, projector_func=Dog.projector)
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead", "fetch ball")