Aggregate 4 - Custom base classes

This example shows how to define and use your own mutable aggregate base class.

Base classes

In this example, the base classes DomainEvent and Aggregate are defined independently of the library.

The DomainEvent class is defined as a “frozen” Python dataclass.

@dataclass(frozen=True)
class DomainEvent:
    originator_version: int
    originator_id: UUID
    timestamp: datetime

The Aggregate base class in this example is coded to have the common aspects of mutable aggregate objects. It is coded to conform with the library’s protocol classes CollectEventsProtocol and MutableAggregateProtocol so that it can be used with the library’s Application class.

@dataclass
class Aggregate:
    id: UUID
    version: int
    created_on: datetime
    modified_on: datetime
    _pending_events: list[DomainEvent]

    @dataclass(frozen=True)
    class Snapshot(DomainEvent):
        topic: str
        state: dict[str, Any]

        @classmethod
        def take(
            cls,
            aggregate: Aggregate,
        ) -> Aggregate.Snapshot:
            aggregate_state = dict(aggregate.__dict__)
            aggregate_state.pop("_pending_events")
            return Aggregate.Snapshot(
                originator_id=aggregate.id,
                originator_version=aggregate.version,
                timestamp=datetime_now_with_tzinfo(),
                topic=get_topic(type(aggregate)),
                state=aggregate_state,
            )

    def trigger_event(
        self,
        event_class: type[DomainEvent],
        **kwargs: Any,
    ) -> None:
        kwargs = kwargs.copy()
        kwargs.update(
            originator_id=self.id,
            originator_version=self.version + 1,
            timestamp=datetime_now_with_tzinfo(),
        )
        new_event = event_class(**kwargs)
        self.apply_event(new_event)
        self.append_event(new_event)

    def append_event(self, *events: DomainEvent) -> None:
        self._pending_events.extend(events)

    def collect_events(self) -> list[DomainEvent]:
        events, self._pending_events = self._pending_events, []
        return events

    @singledispatchmethod
    def apply_event(self, event: DomainEvent) -> None:
        msg = f"For {type(event).__qualname__}"
        raise NotImplementedError(msg)

    @apply_event.register(Snapshot)
    def _(self, event: Snapshot) -> None:
        self.__dict__.update(event.state)

    @classmethod
    def project_events(
        cls,
        _: Self | None,
        events: Iterable[DomainEvent],
    ) -> Self:
        aggregate: Self = Aggregate.__new__(cls)
        for event in events:
            aggregate.apply_event(event)
        return aggregate

    def __new__(cls, *args: Any, **kwargs: Any) -> Self:
        aggregate = super().__new__(cls, *args, **kwargs)
        aggregate._pending_events = []
        return aggregate

It has a Snapshot class, which has a take() method that can create a snapshot of an aggregate object.

It has a trigger_event() method, which constructs new domain event objects, applies them to the aggregate, and appends them to an internal list of “pending” events”.

It has a collect_events() method, which drains the internal list of new “pending” events”, so that they can be recorded.

Like in example 3, it has an apply() method which is decorated with the @singledispatchmethod decorator. A method that supports the Snapshot class is registered, so that an aggregate can be reconstructed from a snapshot.

It also has a projector() class method, which can reconstruct an aggregate from a list of domain events.

Domain model

The Dog class in this example does not use the library’s aggregate base class. It is expressed using the independent base classes DomainEvent and Aggregate defined above.

The aggregate event classes are explicitly defined, and the command method bodies explicitly trigger events.

@dataclass
class Dog(Aggregate):
    name: str
    tricks: list[str]

    @dataclass(frozen=True)
    class Registered(DomainEvent):
        name: str

    @dataclass(frozen=True)
    class TrickAdded(DomainEvent):
        trick: str

    @classmethod
    def register(cls, name: str) -> Dog:
        event = cls.Registered(
            originator_id=uuid4(),
            originator_version=1,
            timestamp=datetime_now_with_tzinfo(),
            name=name,
        )
        dog = cls.project_events(None, [event])
        dog.append_event(event)
        return dog

    def add_trick(self, trick: str) -> None:
        self.trigger_event(self.TrickAdded, trick=trick)

    @singledispatchmethod
    def apply_event(self, event: DomainEvent) -> None:
        super().apply_event(event)

    @apply_event.register(Registered)
    def _(self, event: Registered) -> None:
        self.id = event.originator_id
        self.version = event.originator_version
        self.created_on = event.timestamp
        self.modified_on = event.timestamp
        self.name = event.name
        self.tricks = []

    @apply_event.register(TrickAdded)
    def _(self, event: TrickAdded) -> None:
        self.tricks.append(event.trick)
        self.version = event.originator_version
        self.modified_on = event.timestamp

Like in example 3, it has an apply() method which is decorated with the @singledispatchmethod decorator. Methods that support aggregate events classes are registered.

Application

As in the previous examples, the DogSchool application class simply uses the aggregate class as if it were a normal Python object class. However, the aggregate projector function must be supplied when getting an aggregate from the repository and when taking snapshots.

class DogSchool(Application[UUID]):
    is_snapshotting_enabled = True

    def register_dog(self, name: str) -> UUID:
        dog = Dog.register(name)
        self.save(dog)
        return dog.id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog: Dog = self.repository.get(dog_id, projector_func=Dog.project_events)
        dog.add_trick(trick)
        self.save(dog)

    def get_dog(self, dog_id: UUID) -> dict[str, Any]:
        dog: Dog = self.repository.get(dog_id, projector_func=Dog.project_events)
        return {
            "name": dog.name,
            "tricks": tuple(dog.tricks),
            "created_on": dog.created_on,
            "modified_on": dog.modified_on,
        }

Test case

The TestDogSchool test case shows how the DogSchool application can be used.

class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        dog_id = school.register_dog("Fido")

        # Query application state.
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ()
        assert dog["created_on"] == dog["modified_on"]

        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")
        assert dog["created_on"] < dog["modified_on"]

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        school.take_snapshot(dog_id, version=3, projector_func=Dog.project_events)
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")
        assert dog["created_on"] < dog["modified_on"]

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead", "fetch ball")
        assert dog["created_on"] < dog["modified_on"]

Code reference

class examples.aggregate4.baseclasses.DomainEvent(originator_version: 'int', originator_id: 'UUID', timestamp: 'datetime')[source]

Bases: object

originator_version: int
originator_id: UUID
timestamp: datetime
class examples.aggregate4.baseclasses.Aggregate(*args: 'Any', **kwargs: 'Any')[source]

Bases: object

id: UUID
version: int
created_on: datetime
modified_on: datetime
class Snapshot(originator_version: 'int', originator_id: 'UUID', timestamp: 'datetime', topic: 'str', state: 'dict[str, Any]')[source]

Bases: DomainEvent

topic: str
state: dict[str, Any]
classmethod take(aggregate: Aggregate) Snapshot[source]
trigger_event(event_class: type[DomainEvent], **kwargs: Any) None[source]
append_event(*events: DomainEvent) None[source]
collect_events() list[DomainEvent][source]
apply_event(event: DomainEvent) None[source]
apply_event(event: Snapshot) None
classmethod project_events(_: Self | None, events: Iterable[DomainEvent]) Self[source]
class examples.aggregate4.domainmodel.Dog(id: 'UUID', version: 'int', created_on: 'datetime', modified_on: 'datetime', _pending_events: 'list[DomainEvent]', name: 'str', tricks: 'list[str]')[source]

Bases: Aggregate

name: str
tricks: list[str]
class Registered(originator_version: 'int', originator_id: 'UUID', timestamp: 'datetime', name: 'str')[source]

Bases: DomainEvent

name: str
class TrickAdded(originator_version: 'int', originator_id: 'UUID', timestamp: 'datetime', trick: 'str')[source]

Bases: DomainEvent

trick: str
classmethod register(name: str) Dog[source]
add_trick(trick: str) None[source]
apply_event(event: DomainEvent) None[source]
apply_event(event: Registered) None
apply_event(event: TrickAdded) None
class examples.aggregate4.application.DogSchool(env: Mapping[str, str] | None = None)[source]

Bases: Application[UUID]

is_snapshotting_enabled: bool = True
register_dog(name: str) UUID[source]
add_trick(dog_id: UUID, trick: str) None[source]
get_dog(dog_id: UUID) dict[str, Any][source]
name = 'DogSchool'
class examples.aggregate4.test_application.TestDogSchool(methodName='runTest')[source]

Bases: TestCase

test_dog_school() None[source]