Aggregate 4 - Custom base classes

This example shows another variation of the Dog aggregate class used in the tutorial and module docs.

In contrast with the previous examples, this example does not use the library Aggregate class. Instead, it defines its own Aggregate and DomainEvent base classes. Similar to the previous examples, the Aggregate class is a normal (mutable) Python class and the DomainEvent class is a frozen Python data class. A projector() class method is defined on the Aggregate class.

The aggregate event classes are explicitly defined, and the command method bodies explicitly trigger events. Like the previous example, this example uses a separate function to apply events to the current aggregate state.

As in the previous example, the application code simply uses the aggregate class as if it were a normal Python object class. However, the aggregate projector function must be supplied when getting an aggregate from the repository and when taking snapshots.

Domain model

from __future__ import annotations

from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, List, Optional, Type, TypeVar, cast
from uuid import UUID, uuid4

from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import Snapshot


@dataclass(frozen=True)
class DomainEvent:
    originator_version: int
    originator_id: UUID
    timestamp: datetime

    @staticmethod
    def create_timestamp() -> datetime:
        return datetime.now(tz=timezone.utc)


TAggregate = TypeVar("TAggregate", bound="Aggregate")


class Aggregate:
    id: UUID
    version: int
    created_on: datetime

    def __init__(self, event: DomainEvent):
        self.id = event.originator_id
        self.version = event.originator_version
        self.created_on = event.timestamp

    def trigger_event(
        self,
        event_class: Type[DomainEvent],
        **kwargs: Any,
    ) -> None:
        kwargs = kwargs.copy()
        kwargs.update(
            originator_id=self.id,
            originator_version=self.version + 1,
            timestamp=event_class.create_timestamp(),
        )
        new_event = event_class(**kwargs)
        self.apply(new_event)
        self.pending_events.append(new_event)

    @singledispatchmethod
    def apply(self, event: DomainEvent) -> None:
        """Applies event to aggregate."""

    def collect_events(self) -> List[DomainEvent]:
        events, self.pending_events = self.pending_events, []
        return events

    @classmethod
    def projector(
        cls: Type[TAggregate],
        _: Optional[TAggregate],
        events: Iterable[DomainEvent],
    ) -> Optional[TAggregate]:
        aggregate = object.__new__(cls)
        for event in events:
            aggregate.apply(event)
        return aggregate

    @property
    def pending_events(self) -> List[DomainEvent]:
        return type(self).__pending_events[id(self)]

    @pending_events.setter
    def pending_events(self, pending_events: List[DomainEvent]) -> None:
        type(self).__pending_events[id(self)] = pending_events

    __pending_events: Dict[int, List[DomainEvent]] = defaultdict(list)

    def __del__(self) -> None:
        try:
            type(self).__pending_events.pop(id(self))
        except KeyError:
            pass


class Dog(Aggregate):
    @dataclass(frozen=True)
    class Registered(DomainEvent):
        name: str

    @dataclass(frozen=True)
    class TrickAdded(DomainEvent):
        trick: str

    @classmethod
    def register(cls, name: str) -> "Dog":
        event = cls.Registered(
            originator_id=uuid4(),
            originator_version=1,
            timestamp=DomainEvent.create_timestamp(),
            name=name,
        )
        dog = cast(Dog, cls.projector(None, [event]))
        dog.pending_events.append(event)
        return dog

    def add_trick(self, trick: str) -> None:
        self.trigger_event(self.TrickAdded, trick=trick)

    @singledispatchmethod
    def apply(self, event: DomainEvent) -> None:
        """Applies event to aggregate."""

    @apply.register(Registered)
    def _(self, event: Registered) -> None:
        super().__init__(event)
        self.name = event.name
        self.tricks: List[str] = []

    @apply.register(TrickAdded)
    def _(self, event: TrickAdded) -> None:
        self.tricks.append(event.trick)
        self.version = event.originator_version

    @apply.register(Snapshot)
    def _(self, event: Snapshot) -> None:
        self.__dict__.update(event.state)

Application

from __future__ import annotations

from typing import Any, Dict
from uuid import UUID

from eventsourcing.application import Application
from eventsourcing.examples.aggregate4.domainmodel import Dog


class DogSchool(Application):
    is_snapshotting_enabled = True

    def register_dog(self, name: str) -> UUID:
        dog = Dog.register(name)
        self.save(dog)
        return dog.id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog = self.repository.get(dog_id, projector_func=Dog.projector)
        dog.add_trick(trick)
        self.save(dog)

    def get_dog(self, dog_id: UUID) -> Dict[str, Any]:
        dog = self.repository.get(dog_id, projector_func=Dog.projector)
        return {"name": dog.name, "tricks": tuple(dog.tricks)}

Test case

from __future__ import annotations

from unittest import TestCase

from eventsourcing.examples.aggregate4.application import DogSchool
from eventsourcing.examples.aggregate4.domainmodel import Dog


class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        school.take_snapshot(dog_id, version=3, projector_func=Dog.projector)
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead", "fetch ball")