Aggregate 5 - Immutable aggregate¶
This example shows another variation of the Dog
aggregate class used
in the tutorial and module docs.
Like in the previous example, this example also does not use the library’s
Aggregate
class. Instead, it defines its own
Aggregate
and DomainEvent
base classes. In contrast to the previous
examples, the aggregate is defined as a frozen data class so that it is an
immutable object. This has implications for the aggregate command methods, which must
return the events that they trigger.
The Dog
aggregate is an immutable frozen data class, but it is otherwise similar
to the previous example. It explicitly defines event classes. And it explicitly
triggers events in command methods. However, it has a mutate()
method which
evolves aggregate state by constructing a new instance of the aggregate class
for each event.
The application code in this example must receive the new events that
are triggered when calling the aggregate command methods, and pass them
to the save()
method. The aggregate projector function must also be
supplied when getting an aggregate from the repository and when taking snapshots.
Domain model¶
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Iterable, Optional, Tuple, Type, TypeVar
from uuid import UUID, uuid4
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import Snapshot
@dataclass(frozen=True)
class DomainEvent:
originator_id: UUID
originator_version: int
timestamp: datetime
@staticmethod
def create_timestamp() -> datetime:
return datetime.now(tz=timezone.utc)
TAggregate = TypeVar("TAggregate", bound="Aggregate")
@dataclass(frozen=True)
class Aggregate:
id: UUID
version: int
created_on: datetime
modified_on: datetime
def trigger_event(
self,
event_class: Type[DomainEvent],
**kwargs: Any,
) -> DomainEvent:
kwargs = kwargs.copy()
kwargs.update(
originator_id=self.id,
originator_version=self.version + 1,
timestamp=event_class.create_timestamp(),
)
return event_class(**kwargs)
@classmethod
def projector(
cls: Type[TAggregate],
aggregate: Optional[TAggregate],
events: Iterable[DomainEvent],
) -> Optional[TAggregate]:
for event in events:
aggregate = cls.mutate(event, aggregate)
return aggregate
@singledispatchmethod
@staticmethod
def mutate(event: DomainEvent, aggregate: Any) -> Any:
"""Mutates aggregate with event."""
@dataclass(frozen=True)
class Dog(Aggregate):
name: str
tricks: Tuple[str, ...]
@dataclass(frozen=True)
class Registered(DomainEvent):
name: str
@dataclass(frozen=True)
class TrickAdded(DomainEvent):
trick: str
@staticmethod
def register(name: str) -> Tuple[Dog, DomainEvent]:
event = Dog.Registered(
originator_id=uuid4(),
originator_version=1,
timestamp=DomainEvent.create_timestamp(),
name=name,
)
dog = Dog.mutate(event, None)
return dog, event
def add_trick(self, trick: str) -> Tuple[Dog, DomainEvent]:
event = self.trigger_event(Dog.TrickAdded, trick=trick)
dog = Dog.mutate(event, self)
return dog, event
@singledispatchmethod
@classmethod
def mutate(cls, event: DomainEvent, aggregate: Optional[Dog]) -> Optional[Dog]:
"""Mutates aggregate with event."""
@mutate.register
@classmethod
def _(cls, event: Dog.Registered, _: Optional[Dog]) -> Dog:
return Dog(
id=event.originator_id,
version=event.originator_version,
created_on=event.timestamp,
modified_on=event.timestamp,
name=event.name,
tricks=tuple(),
)
@mutate.register
@classmethod
def _(cls, event: Dog.TrickAdded, aggregate: Optional[Dog]) -> Dog:
assert aggregate is not None
return Dog(
id=aggregate.id,
version=event.originator_version,
created_on=aggregate.created_on,
modified_on=event.timestamp,
name=aggregate.name,
tricks=aggregate.tricks + (event.trick,),
)
@mutate.register
@classmethod
def _(cls, event: Snapshot, _: Optional[Dog]) -> Dog:
return Dog(
id=event.state["id"],
version=event.state["version"],
created_on=event.state["created_on"],
modified_on=event.state["modified_on"],
name=event.state["name"],
tricks=tuple(event.state["tricks"]), # comes back from JSON as a list
)
Application¶
from __future__ import annotations
from typing import Any, Dict
from uuid import UUID
from eventsourcing.application import Application
from eventsourcing.examples.aggregate5.domainmodel import Dog
class DogSchool(Application):
is_snapshotting_enabled = True
def register_dog(self, name: str) -> UUID:
dog, event = Dog.register(name)
self.save(event)
return dog.id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog = self.repository.get(dog_id, projector_func=Dog.projector)
dog, event = dog.add_trick(trick)
self.save(event)
def get_dog(self, dog_id: UUID) -> Dict[str, Any]:
dog = self.repository.get(dog_id, projector_func=Dog.projector)
return {"name": dog.name, "tricks": dog.tricks}
Test case¶
from __future__ import annotations
from unittest import TestCase
from eventsourcing.examples.aggregate5.application import DogSchool
from eventsourcing.examples.aggregate5.domainmodel import Dog
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
school.take_snapshot(dog_id, version=3, projector_func=Dog.projector)
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead", "fetch ball")