Aggregate 5 - Immutable aggregate

This example shows how to define and use your own immutable aggregate base class.

Base classes

In this example, the base classes DomainEvent and Aggregate are defined independently of the library.

The DomainEvent class is defined as a “frozen” Python dataclass.

@dataclass(frozen=True, kw_only=True)
class DomainEvent:
    originator_id: UUID
    originator_version: int
    timestamp: datetime = field(default_factory=datetime_now_with_tzinfo)
    metadata: dict[str, str] = field(default_factory=get_metadata_from_context)
    event_id: UUID = field(default_factory=uuid4)

The Aggregate base class in this example is also defined as a “frozen” Python dataclass. This has implications for the aggregate command methods, which must return the events that they trigger.

@dataclass(frozen=True)
class Aggregate:
    id: UUID
    version: int
    created_on: datetime
    modified_on: datetime

    def trigger_event(
        self,
        event_class: type[DomainEvent],
        **kwargs: Any,
    ) -> DomainEvent:
        kwargs = kwargs.copy()
        kwargs.update(
            originator_id=self.id,
            originator_version=self.version + 1,
        )
        return event_class(**kwargs)

    @classmethod
    def projector(
        cls,
        aggregate: Self | None,
        events: Iterable[DomainEvent],
    ) -> Self | None:
        for event in events:
            aggregate = cls.mutate(event, aggregate)
        return aggregate

    @singledispatchmethod[Any]
    @staticmethod
    def mutate(event: DomainEvent, aggregate: TAggregate | None) -> TAggregate | None:
        """Mutates aggregate with event."""

    @dataclass(frozen=True)
    class Snapshot(DomainEvent):
        state: dict[str, Any]

        @classmethod
        def take(cls, aggregate: Aggregate) -> Aggregate.Snapshot:
            return Aggregate.Snapshot(
                originator_id=aggregate.id,
                originator_version=aggregate.version,
                state=aggregate.__dict__,
            )

It defines a trigger_event() method, which can be called by aggregate command methods, and which does the common work of constructing an event object with an incremented version number and a new timestamp.

It defines a projector() class method which reconstructs an aggregate object by iterating over events, calling the aggregate class’s mutate() method for each event.

It also defines a Snapshot class which is a DomainEvent that can carry the state of an aggregate, and which has a take() method that can construct a snapshot object from an aggregate object.

Domain model

The Dog aggregate class is defined as immutable frozen data class that extends the aggregate base class. The aggregate event classes, Registered and TrickAdded, are explicitly defined as nested subclasses.

The Dog aggregate class defines a mutate() method, which evolves aggregate state by constructing a new instance of the aggregate class each time it is called, according to the type of event it is called with. Support for reconstructing an aggregate object from a snapshot object is included in this method.

@dataclass(frozen=True)
class Dog(Aggregate):
    name: str
    tricks: tuple[str, ...]

    @dataclass(frozen=True)
    class Registered(DomainEvent):
        name: str

    @dataclass(frozen=True)
    class TrickAdded(DomainEvent):
        trick: str

    @staticmethod
    def register(name: str) -> tuple[Dog, DomainEvent]:
        event = Dog.Registered(
            originator_id=uuid4(),
            originator_version=1,
            name=name,
        )
        dog = Dog.mutate(event, None)
        return dog, event

    def add_trick(self, trick: str) -> tuple[Dog, DomainEvent]:
        event = self.trigger_event(Dog.TrickAdded, trick=trick)
        dog = Dog.mutate(event, self)
        return dog, event

    @singledispatchmethod
    @staticmethod
    def mutate(event: DomainEvent, aggregate: Dog | None) -> Dog:  # noqa: ARG004
        """Mutates aggregate with event."""
        msg = f"Event type not supported: {type(event)}"
        raise ProgrammingError(msg)

    @mutate.register
    @staticmethod
    def _(event: Dog.Registered, _: Dog | None) -> Dog:
        return Dog(
            id=event.originator_id,
            version=event.originator_version,
            created_on=event.timestamp,
            modified_on=event.timestamp,
            name=event.name,
            tricks=(),
        )

    @mutate.register
    @staticmethod
    def _(event: Dog.TrickAdded, aggregate: Dog | None) -> Dog:
        assert aggregate is not None
        return Dog(
            id=aggregate.id,
            version=event.originator_version,
            created_on=aggregate.created_on,
            modified_on=event.timestamp,
            name=aggregate.name,
            tricks=(*aggregate.tricks, event.trick),
        )

    @mutate.register
    @staticmethod
    def _(event: Dog.Snapshot, _: Dog | None) -> Dog:
        return Dog(
            id=event.state["id"],
            version=event.state["version"],
            created_on=event.state["created_on"],
            modified_on=event.state["modified_on"],
            name=event.state["name"],
            tricks=tuple(event.state["tricks"]),  # comes back from JSON as a list
        )

Application

The DogSchool application in this example uses the library’s Application class. It must receive the new events that are returned by the aggregate command methods, and pass them to its save() method. The aggregate projector function must also be supplied when reconstructing an aggregate from the repository, and when taking snapshots.

class DogSchool(Application):
    is_snapshotting_enabled = True

    def register_dog(self, name: str) -> UUID:
        dog, event = Dog.register(name)
        self.save(event)
        return dog.id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog = self.repository.get(dog_id, projector_func=Dog.projector)
        dog, event = dog.add_trick(trick)
        self.save(event)

    def get_dog(self, dog_id: UUID) -> dict[str, Any]:
        dog = self.repository.get(dog_id, projector_func=Dog.projector)
        return {"name": dog.name, "tricks": dog.tricks}

Test case

The TestDogSchool test case shows how the DogSchool application can be used.

class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        with put_metadata_in_context({"user_id": "user-1"}):
            dog_id = school.register_dog("Fido")
            school.add_trick(dog_id, "roll over")
            school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        with put_metadata_in_context({"user_id": "admin-1"}):
            school.take_snapshot(dog_id, version=3, projector_func=Dog.projector)

        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Continue with snapshotted aggregate.
        with put_metadata_in_context({"user_id": "user-1"}):
            school.add_trick(dog_id, "fetch ball")

        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead", "fetch ball")

        # Check metadata on events.
        events = list(school.events.get(dog_id))
        assert len(events) > 0
        for event in events:
            assert event.metadata.get("user_id") == "user-1"

        # Check metadata on snapshots.
        assert school.snapshots is not None
        snapshots = list(school.snapshots.get(dog_id))
        assert len(snapshots) > 0
        for snapshot in snapshots:
            assert snapshot.metadata.get("user_id") == "admin-1"

Code reference

class examples.aggregate5.baseclasses.DomainEvent(*, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]

Bases: object

originator_id: UUID
originator_version: int
timestamp: datetime
metadata: dict[str, str]
event_id: UUID
class examples.aggregate5.baseclasses.Aggregate(id: 'UUID', version: 'int', created_on: 'datetime', modified_on: 'datetime')[source]

Bases: object

id: UUID
version: int
created_on: datetime
modified_on: datetime
trigger_event(event_class: type[DomainEvent], **kwargs: Any) DomainEvent[source]
classmethod projector(aggregate: Self | None, events: Iterable[DomainEvent]) Self | None[source]
mutate(aggregate: TAggregate | None) TAggregate | None[source]

Mutates aggregate with event.

class Snapshot(state: 'dict[str, Any]', *, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]

Bases: DomainEvent

state: dict[str, Any]
classmethod take(aggregate: Aggregate) Snapshot[source]
class examples.aggregate5.domainmodel.Dog(id: 'UUID', version: 'int', created_on: 'datetime', modified_on: 'datetime', name: 'str', tricks: 'tuple[str, ...]')[source]

Bases: Aggregate

name: str
tricks: tuple[str, ...]
class Registered(name: 'str', *, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]

Bases: DomainEvent

name: str
class TrickAdded(trick: 'str', *, originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]

Bases: DomainEvent

trick: str
static register(name: str) tuple[Dog, DomainEvent][source]
add_trick(trick: str) tuple[Dog, DomainEvent][source]
mutate(aggregate: Dog | None) Dog[source]
mutate(event: Registered, _: Dog | None) Dog
mutate(event: TrickAdded, aggregate: Dog | None) Dog
mutate(event: Snapshot, _: Dog | None) Dog

Mutates aggregate with event.

class examples.aggregate5.application.DogSchool(env: Mapping[str, str] | None = None)[source]

Bases: Application

is_snapshotting_enabled: bool = True
register_dog(name: str) UUID[source]
add_trick(dog_id: UUID, trick: str) None[source]
get_dog(dog_id: UUID) dict[str, Any][source]
aggregate_id_type

alias of UUID

name = 'DogSchool'
class examples.aggregate5.test_application.TestDogSchool(methodName='runTest')[source]

Bases: TestCase

test_dog_school() None[source]