Aggregate 5 - Immutable aggregate

This example shows another variation of the Dog aggregate class used in the tutorial and module docs.

Like in the previous example, this example also does not use the library’s Aggregate class. Instead, it defines its own Aggregate and DomainEvent base classes. In contrast to the previous examples, the aggregate is defined as a frozen data class so that it is an immutable object. This has implications for the aggregate command methods, which must return the events that they trigger.

The Dog aggregate is an immutable frozen data class, but it is otherwise similar to the previous example. It explicitly defines event classes. And it explicitly triggers events in command methods. However, it has a mutate() method which evolves aggregate state by constructing a new instance of the aggregate class for each event.

The application code in this example must receive the new events that are triggered when calling the aggregate command methods, and pass them to the save() method. The aggregate projector function must also be supplied when getting an aggregate from the repository and when taking snapshots.

Domain model

from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Iterable, Optional, Tuple, Type, TypeVar
from uuid import UUID, uuid4

from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import Snapshot


@dataclass(frozen=True)
class DomainEvent:
    originator_id: UUID
    originator_version: int
    timestamp: datetime

    @staticmethod
    def create_timestamp() -> datetime:
        return datetime.now(tz=timezone.utc)


TAggregate = TypeVar("TAggregate", bound="Aggregate")


@dataclass(frozen=True)
class Aggregate:
    id: UUID
    version: int
    created_on: datetime
    modified_on: datetime

    def trigger_event(
        self,
        event_class: Type[DomainEvent],
        **kwargs: Any,
    ) -> DomainEvent:
        kwargs = kwargs.copy()
        kwargs.update(
            originator_id=self.id,
            originator_version=self.version + 1,
            timestamp=event_class.create_timestamp(),
        )
        return event_class(**kwargs)

    @classmethod
    def projector(
        cls: Type[TAggregate],
        aggregate: Optional[TAggregate],
        events: Iterable[DomainEvent],
    ) -> Optional[TAggregate]:
        for event in events:
            aggregate = cls.mutate(event, aggregate)
        return aggregate

    @singledispatchmethod
    @staticmethod
    def mutate(event: DomainEvent, aggregate: Any) -> Any:
        """Mutates aggregate with event."""


@dataclass(frozen=True)
class Dog(Aggregate):
    name: str
    tricks: Tuple[str, ...]

    @dataclass(frozen=True)
    class Registered(DomainEvent):
        name: str

    @dataclass(frozen=True)
    class TrickAdded(DomainEvent):
        trick: str

    @staticmethod
    def register(name: str) -> Tuple[Dog, DomainEvent]:
        event = Dog.Registered(
            originator_id=uuid4(),
            originator_version=1,
            timestamp=DomainEvent.create_timestamp(),
            name=name,
        )
        dog = Dog.mutate(event, None)
        return dog, event

    def add_trick(self, trick: str) -> Tuple[Dog, DomainEvent]:
        event = self.trigger_event(Dog.TrickAdded, trick=trick)
        dog = Dog.mutate(event, self)
        return dog, event

    @singledispatchmethod
    @classmethod
    def mutate(cls, event: DomainEvent, aggregate: Optional[Dog]) -> Optional[Dog]:
        """Mutates aggregate with event."""

    @mutate.register
    @classmethod
    def _(cls, event: Dog.Registered, _: Optional[Dog]) -> Dog:
        return Dog(
            id=event.originator_id,
            version=event.originator_version,
            created_on=event.timestamp,
            modified_on=event.timestamp,
            name=event.name,
            tricks=tuple(),
        )

    @mutate.register
    @classmethod
    def _(cls, event: Dog.TrickAdded, aggregate: Optional[Dog]) -> Dog:
        assert aggregate is not None
        return Dog(
            id=aggregate.id,
            version=event.originator_version,
            created_on=aggregate.created_on,
            modified_on=event.timestamp,
            name=aggregate.name,
            tricks=aggregate.tricks + (event.trick,),
        )

    @mutate.register
    @classmethod
    def _(cls, event: Snapshot, _: Optional[Dog]) -> Dog:
        return Dog(
            id=event.state["id"],
            version=event.state["version"],
            created_on=event.state["created_on"],
            modified_on=event.state["modified_on"],
            name=event.state["name"],
            tricks=tuple(event.state["tricks"]),  # comes back from JSON as a list
        )

Application

from __future__ import annotations

from typing import Any, Dict
from uuid import UUID

from eventsourcing.application import Application
from eventsourcing.examples.aggregate5.domainmodel import Dog


class DogSchool(Application):
    is_snapshotting_enabled = True

    def register_dog(self, name: str) -> UUID:
        dog, event = Dog.register(name)
        self.save(event)
        return dog.id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog = self.repository.get(dog_id, projector_func=Dog.projector)
        dog, event = dog.add_trick(trick)
        self.save(event)

    def get_dog(self, dog_id: UUID) -> Dict[str, Any]:
        dog = self.repository.get(dog_id, projector_func=Dog.projector)
        return {"name": dog.name, "tricks": dog.tricks}

Test case

from __future__ import annotations

from unittest import TestCase

from eventsourcing.examples.aggregate5.application import DogSchool
from eventsourcing.examples.aggregate5.domainmodel import Dog


class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        school.take_snapshot(dog_id, version=3, projector_func=Dog.projector)
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead", "fetch ball")