Aggregate 5 - Immutable aggregate¶
This example shows the Dog
class used in the tutorial and module docs.
This example also does not use the library Aggregate
class. Instead, it
defines its own Aggregate
and DomainEvent
base classes. In contrast
to the previous examples, both the event and the aggregate classes are
defined as frozen dataclasses.
The Dog
aggregate is a frozen dataclass, but it otherwise similar to the
previous example. It explicitly defines event classes. And it explicitly
triggers events in command methods. However, in this example the aggregate
state is evolved by constructing a new instance of the aggregate class.
In contrast to the previous examples, the application code in this example must receive the new aggregate instance and the new domain events that are returned from the aggregate command methods. The aggregate projector function must also be supplied when getting an aggregate from the repository and when taking snapshots.
Domain model¶
from dataclasses import dataclass
from datetime import datetime, timezone
from time import monotonic
from typing import Any, Iterable, List, Optional, Tuple, Type, TypeVar, cast
from uuid import UUID, uuid4
from eventsourcing.domain import Snapshot
@dataclass(frozen=True)
class DomainEvent:
originator_id: UUID
originator_version: int
timestamp: datetime
@staticmethod
def create_timestamp() -> datetime:
return datetime.fromtimestamp(monotonic(), timezone.utc)
TAggregate = TypeVar("TAggregate", bound="Aggregate")
@dataclass(frozen=True)
class Aggregate:
id: UUID
version: int
created_on: datetime
def trigger_event(
self: TAggregate,
event_class: Type[DomainEvent],
**kwargs: Any,
) -> DomainEvent:
kwargs = kwargs.copy()
kwargs.update(
originator_id=self.id,
originator_version=self.version + 1,
timestamp=event_class.create_timestamp(),
)
return event_class(**kwargs)
@classmethod
def projector(
cls: Type[TAggregate],
aggregate: Optional[TAggregate],
events: Iterable[DomainEvent],
) -> Optional[TAggregate]:
for event in events:
aggregate = cls.mutate(event, aggregate)
return aggregate
@classmethod
def mutate(cls, event: DomainEvent, aggregate: Any) -> Any:
"""Mutates aggregate with event."""
@dataclass(frozen=True)
class Dog(Aggregate):
name: str
tricks: Tuple[str, ...]
@dataclass(frozen=True)
class Registered(DomainEvent):
name: str
@classmethod
def register(cls, name: str) -> Tuple["Dog", List[DomainEvent]]:
event = cls.Registered(
originator_id=uuid4(),
originator_version=1,
timestamp=DomainEvent.create_timestamp(),
name=name,
)
return cast(Dog, cls.mutate(event, None)), [event]
@dataclass(frozen=True)
class TrickAdded(DomainEvent):
trick: str
def add_trick(self, trick: str) -> Tuple["Dog", List[DomainEvent]]:
event = self.trigger_event(self.TrickAdded, trick=trick)
return cast(Dog, type(self).mutate(event, self)), [event]
@classmethod
def mutate(cls, event: DomainEvent, aggregate: Optional["Dog"]) -> Optional["Dog"]:
"""Mutates aggregate with event."""
if isinstance(event, cls.Registered):
return Dog(
id=event.originator_id,
version=event.originator_version,
created_on=event.timestamp,
name=event.name,
tricks=tuple(),
)
elif isinstance(event, cls.TrickAdded):
assert aggregate is not None
return Dog(
id=aggregate.id,
version=event.originator_version,
created_on=event.timestamp,
name=aggregate.name,
tricks=aggregate.tricks + (event.trick,),
)
elif isinstance(event, Snapshot):
return Dog(
id=event.state["id"],
version=event.state["version"],
created_on=event.state["created_on"],
name=event.state["name"],
tricks=tuple(event.state["tricks"]), # comes back from JSON as a list
)
else:
raise NotImplementedError(event) # pragma: no cover
Application¶
from typing import Any, Dict
from uuid import UUID
from eventsourcing.application import Application
from eventsourcing.examples.aggregate5.domainmodel import Dog
class DogSchool(Application):
is_snapshotting_enabled = True
def register_dog(self, name: str) -> UUID:
dog, events = Dog.register(name)
self.save(*events)
return dog.id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog = self.repository.get(dog_id, projector_func=Dog.projector)
dog, events = dog.add_trick(trick)
self.save(*events)
def get_dog(self, dog_id: UUID) -> Dict[str, Any]:
dog = self.repository.get(dog_id, projector_func=Dog.projector)
return {"name": dog.name, "tricks": dog.tricks}
Test case¶
from unittest import TestCase
from eventsourcing.examples.aggregate5.application import DogSchool
from eventsourcing.examples.aggregate5.domainmodel import Dog
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
school.take_snapshot(dog_id, version=3, projector_func=Dog.projector)
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead", "fetch ball")