Aggregate 5 - Immutable aggregate¶
This example shows how to define and use your own immutable aggregate base class.
Base classes¶
In this example, the base classes DomainEvent
and Aggregate
are defined independently of the library.
The DomainEvent
class is defined as a “frozen” Python dataclass
.
@dataclass(frozen=True)
class DomainEvent:
originator_id: UUID
originator_version: int
timestamp: datetime
@staticmethod
def create_timestamp() -> datetime:
return datetime.now(tz=timezone.utc)
The Aggregate
base class in this example is also defined as a “frozen” Python
dataclass
. This has implications for the aggregate command methods, which must
return the events that they trigger.
@dataclass(frozen=True)
class Aggregate:
id: UUID
version: int
created_on: datetime
modified_on: datetime
def trigger_event(
self,
event_class: type[DomainEvent],
**kwargs: Any,
) -> DomainEvent:
kwargs = kwargs.copy()
kwargs.update(
originator_id=self.id,
originator_version=self.version + 1,
timestamp=event_class.create_timestamp(),
)
return event_class(**kwargs)
@classmethod
def projector(
cls,
aggregate: Self | None,
events: Iterable[DomainEvent],
) -> Self | None:
for event in events:
aggregate = cls.mutate(event, aggregate)
return aggregate
@singledispatchmethod[Any]
@staticmethod
def mutate(event: DomainEvent, aggregate: TAggregate | None) -> TAggregate | None:
"""Mutates aggregate with event."""
@dataclass(frozen=True)
class Snapshot(DomainEvent):
state: dict[str, Any]
@classmethod
def take(cls, aggregate: Aggregate) -> Aggregate.Snapshot:
return Aggregate.Snapshot(
originator_id=aggregate.id,
originator_version=aggregate.version,
timestamp=DomainEvent.create_timestamp(),
state=aggregate.__dict__,
)
It defines a trigger_event()
method, which can be called by
aggregate command methods, and which does the common work of constructing an event object with an incremented
version number and a new timestamp.
It defines a projector()
class method which reconstructs an aggregate
object by iterating over events, calling the aggregate class’s mutate()
method for each event.
It also defines a Snapshot
class which is a
DomainEvent
that can carry the state of an aggregate,
and which has a take()
method that can
construct a snapshot object from an aggregate object.
Domain model¶
The Dog
aggregate class is defined as immutable frozen data class
that extends the aggregate base class. The aggregate event classes, Registered
and
TrickAdded
, are explicitly defined as nested subclasses.
The Dog
aggregate class defines a
mutate()
method, which evolves aggregate state by constructing a new
instance of the aggregate class each time it is called, according to the type of event it is called with. Support
for reconstructing an aggregate object from a snapshot object is included in this method.
@dataclass(frozen=True)
class Dog(Aggregate):
name: str
tricks: tuple[str, ...]
@dataclass(frozen=True)
class Registered(DomainEvent):
name: str
@dataclass(frozen=True)
class TrickAdded(DomainEvent):
trick: str
@staticmethod
def register(name: str) -> tuple[Dog, DomainEvent]:
event = Dog.Registered(
originator_id=uuid4(),
originator_version=1,
timestamp=DomainEvent.create_timestamp(),
name=name,
)
dog = Dog.mutate(event, None)
return dog, event
def add_trick(self, trick: str) -> tuple[Dog, DomainEvent]:
event = self.trigger_event(Dog.TrickAdded, trick=trick)
dog = Dog.mutate(event, self)
return dog, event
@singledispatchmethod["Dog"]
@staticmethod
def mutate(event: DomainEvent, aggregate: Dog | None) -> Dog: # noqa: ARG004
"""Mutates aggregate with event."""
msg = f"Event type not supported: {type(event)}"
raise ProgrammingError(msg)
@mutate.register
@staticmethod
def _(event: Dog.Registered, _: Dog | None) -> Dog:
return Dog(
id=event.originator_id,
version=event.originator_version,
created_on=event.timestamp,
modified_on=event.timestamp,
name=event.name,
tricks=(),
)
@mutate.register
@staticmethod
def _(event: Dog.TrickAdded, aggregate: Dog | None) -> Dog:
assert aggregate is not None
return Dog(
id=aggregate.id,
version=event.originator_version,
created_on=aggregate.created_on,
modified_on=event.timestamp,
name=aggregate.name,
tricks=(*aggregate.tricks, event.trick),
)
@mutate.register
@staticmethod
def _(event: Dog.Snapshot, _: Dog | None) -> Dog:
return Dog(
id=event.state["id"],
version=event.state["version"],
created_on=event.state["created_on"],
modified_on=event.state["modified_on"],
name=event.state["name"],
tricks=tuple(event.state["tricks"]), # comes back from JSON as a list
)
Application¶
The DogSchool
application in this example uses the library’s
Application
class. It must receive the new events that are returned
by the aggregate command methods, and pass them to its save()
method. The aggregate projector function must also be supplied when reconstructing an aggregate from the
repository, and when taking snapshots.
class DogSchool(Application[UUID]):
is_snapshotting_enabled = True
def register_dog(self, name: str) -> UUID:
dog, event = Dog.register(name)
self.save(event)
return dog.id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog = self.repository.get(dog_id, projector_func=Dog.projector)
dog, event = dog.add_trick(trick)
self.save(event)
def get_dog(self, dog_id: UUID) -> dict[str, Any]:
dog = self.repository.get(dog_id, projector_func=Dog.projector)
return {"name": dog.name, "tricks": dog.tricks}
Test case¶
The TestDogSchool
test case shows how the
DogSchool
application can be used.
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
school.take_snapshot(dog_id, version=3, projector_func=Dog.projector)
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead", "fetch ball")
Code reference¶
- class examples.aggregate5.baseclasses.DomainEvent(originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')[source]¶
Bases:
object
- originator_id: UUID¶
- originator_version: int¶
- timestamp: datetime¶
- class examples.aggregate5.baseclasses.Aggregate(id: 'UUID', version: 'int', created_on: 'datetime', modified_on: 'datetime')[source]¶
Bases:
object
- id: UUID¶
- version: int¶
- created_on: datetime¶
- modified_on: datetime¶
- trigger_event(event_class: type[DomainEvent], **kwargs: Any) DomainEvent [source]¶
- classmethod projector(aggregate: Self | None, events: Iterable[DomainEvent]) Self | None [source]¶
- class examples.aggregate5.domainmodel.Dog(id: 'UUID', version: 'int', created_on: 'datetime', modified_on: 'datetime', name: 'str', tricks: 'tuple[str, ...]')[source]¶
Bases:
Aggregate
- name: str¶
- tricks: tuple[str, ...]¶
- class Registered(originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime', name: 'str')[source]¶
Bases:
DomainEvent
- name: str¶
- class TrickAdded(originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime', trick: 'str')[source]¶
Bases:
DomainEvent
- trick: str¶
- static register(name: str) tuple[Dog, DomainEvent] [source]¶
- add_trick(trick: str) tuple[Dog, DomainEvent] [source]¶
- class examples.aggregate5.application.DogSchool(env: Mapping[str, str] | None = None)[source]¶
Bases:
Application
[UUID
]- is_snapshotting_enabled: bool = True¶
- name = 'DogSchool'¶