Aggregate 4 - Custom base classes¶
This example shows how to define and use your own mutable aggregate base class.
Base classes¶
In this example, the base classes DomainEvent
and Aggregate
are defined independently of the library.
The DomainEvent
class is defined as a
“frozen” Python dataclass
.
@dataclass(frozen=True)
class DomainEvent:
originator_version: int
originator_id: UUID
timestamp: datetime
The Aggregate
base class in this example is coded to
have the common aspects of mutable aggregate objects. It is coded to conform with the library’s
protocol classes CollectEventsProtocol
and MutableAggregateProtocol
so that it can be used with the library’s Application
class.
@dataclass
class Aggregate:
id: UUID
version: int
created_on: datetime
modified_on: datetime
_pending_events: list[DomainEvent]
@dataclass(frozen=True)
class Snapshot(DomainEvent):
topic: str
state: dict[str, Any]
@classmethod
def take(
cls,
aggregate: Aggregate,
) -> Aggregate.Snapshot:
aggregate_state = dict(aggregate.__dict__)
aggregate_state.pop("_pending_events")
return Aggregate.Snapshot(
originator_id=aggregate.id,
originator_version=aggregate.version,
timestamp=datetime_now_with_tzinfo(),
topic=get_topic(type(aggregate)),
state=aggregate_state,
)
def trigger_event(
self,
event_class: type[DomainEvent],
**kwargs: Any,
) -> None:
kwargs = kwargs.copy()
kwargs.update(
originator_id=self.id,
originator_version=self.version + 1,
timestamp=datetime_now_with_tzinfo(),
)
new_event = event_class(**kwargs)
self.apply_event(new_event)
self.append_event(new_event)
def append_event(self, *events: DomainEvent) -> None:
self._pending_events.extend(events)
def collect_events(self) -> list[DomainEvent]:
events, self._pending_events = self._pending_events, []
return events
@singledispatchmethod
def apply_event(self, event: DomainEvent) -> None:
msg = f"For {type(event).__qualname__}"
raise NotImplementedError(msg)
@apply_event.register(Snapshot)
def _(self, event: Snapshot) -> None:
self.__dict__.update(event.state)
@classmethod
def project_events(
cls,
_: Self | None,
events: Iterable[DomainEvent],
) -> Self:
aggregate: Self = Aggregate.__new__(cls)
for event in events:
aggregate.apply_event(event)
return aggregate
def __new__(cls, *args: Any, **kwargs: Any) -> Self:
aggregate = super().__new__(cls, *args, **kwargs)
aggregate._pending_events = []
return aggregate
It has a Snapshot
class, which
has a take()
method that
can create a snapshot of an aggregate object.
It has a trigger_event()
method, which
constructs new domain event objects, applies them to the aggregate, and appends them to an
internal list of “pending” events”.
It has a collect_events()
method, which
drains the internal list of new “pending” events”, so that they can be recorded.
Like in example 3, it has an
apply()
method which is
decorated with the @singledispatchmethod
decorator. A method that supports the Snapshot
class
is registered, so that an aggregate can be reconstructed from a snapshot.
It also has a projector()
class method, which
can reconstruct an aggregate from a list of domain events.
Domain model¶
The Dog
class in this example does not use the
library’s aggregate base class. It is expressed using the independent base classes
DomainEvent
and Aggregate
defined above.
The aggregate event classes are explicitly defined, and the command method bodies explicitly trigger events.
@dataclass
class Dog(Aggregate):
name: str
tricks: list[str]
@dataclass(frozen=True)
class Registered(DomainEvent):
name: str
@dataclass(frozen=True)
class TrickAdded(DomainEvent):
trick: str
@classmethod
def register(cls, name: str) -> Dog:
event = cls.Registered(
originator_id=uuid4(),
originator_version=1,
timestamp=datetime_now_with_tzinfo(),
name=name,
)
dog = cls.project_events(None, [event])
dog.append_event(event)
return dog
def add_trick(self, trick: str) -> None:
self.trigger_event(self.TrickAdded, trick=trick)
@singledispatchmethod
def apply_event(self, event: DomainEvent) -> None:
super().apply_event(event)
@apply_event.register(Registered)
def _(self, event: Registered) -> None:
self.id = event.originator_id
self.version = event.originator_version
self.created_on = event.timestamp
self.modified_on = event.timestamp
self.name = event.name
self.tricks = []
@apply_event.register(TrickAdded)
def _(self, event: TrickAdded) -> None:
self.tricks.append(event.trick)
self.version = event.originator_version
self.modified_on = event.timestamp
Like in example 3, it has an
apply()
method which is
decorated with the @singledispatchmethod
decorator. Methods that support aggregate events classes are registered.
Application¶
As in the previous examples, the DogSchool
application class simply uses the aggregate class as if it were a normal Python object
class. However, the aggregate projector function must be supplied when getting an
aggregate from the repository and when taking snapshots.
class DogSchool(Application[UUID]):
is_snapshotting_enabled = True
def register_dog(self, name: str) -> UUID:
dog = Dog.register(name)
self.save(dog)
return dog.id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog: Dog = self.repository.get(dog_id, projector_func=Dog.project_events)
dog.add_trick(trick)
self.save(dog)
def get_dog(self, dog_id: UUID) -> dict[str, Any]:
dog: Dog = self.repository.get(dog_id, projector_func=Dog.project_events)
return {
"name": dog.name,
"tricks": tuple(dog.tricks),
"created_on": dog.created_on,
"modified_on": dog.modified_on,
}
Test case¶
The TestDogSchool
test case shows how the
DogSchool
application can be used.
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
# Query application state.
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ()
assert dog["created_on"] == dog["modified_on"]
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
assert dog["created_on"] < dog["modified_on"]
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
school.take_snapshot(dog_id, version=3, projector_func=Dog.project_events)
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
assert dog["created_on"] < dog["modified_on"]
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead", "fetch ball")
assert dog["created_on"] < dog["modified_on"]
Code reference¶
- class examples.aggregate4.baseclasses.DomainEvent(originator_version: 'int', originator_id: 'UUID', timestamp: 'datetime')[source]¶
Bases:
object
- originator_version: int¶
- originator_id: UUID¶
- timestamp: datetime¶
- class examples.aggregate4.baseclasses.Aggregate(*args: 'Any', **kwargs: 'Any')[source]¶
Bases:
object
- id: UUID¶
- version: int¶
- created_on: datetime¶
- modified_on: datetime¶
- class Snapshot(originator_version: 'int', originator_id: 'UUID', timestamp: 'datetime', topic: 'str', state: 'dict[str, Any]')[source]¶
Bases:
DomainEvent
- topic: str¶
- state: dict[str, Any]¶
- trigger_event(event_class: type[DomainEvent], **kwargs: Any) None [source]¶
- append_event(*events: DomainEvent) None [source]¶
- collect_events() list[DomainEvent] [source]¶
- apply_event(event: DomainEvent) None [source]¶
- apply_event(event: Snapshot) None
- classmethod project_events(_: Self | None, events: Iterable[DomainEvent]) Self [source]¶
- class examples.aggregate4.domainmodel.Dog(id: 'UUID', version: 'int', created_on: 'datetime', modified_on: 'datetime', _pending_events: 'list[DomainEvent]', name: 'str', tricks: 'list[str]')[source]¶
Bases:
Aggregate
- name: str¶
- tricks: list[str]¶
- class Registered(originator_version: 'int', originator_id: 'UUID', timestamp: 'datetime', name: 'str')[source]¶
Bases:
DomainEvent
- name: str¶
- class TrickAdded(originator_version: 'int', originator_id: 'UUID', timestamp: 'datetime', trick: 'str')[source]¶
Bases:
DomainEvent
- trick: str¶
- apply_event(event: DomainEvent) None [source]¶
- apply_event(event: Registered) None
- apply_event(event: TrickAdded) None
- class examples.aggregate4.application.DogSchool(env: Mapping[str, str] | None = None)[source]¶
Bases:
Application
[UUID
]- is_snapshotting_enabled: bool = True¶
- name = 'DogSchool'¶