Aggregate 5 - Immutable aggregate

This example shows how to define and use your own immutable aggregate base class.

Base classes

In this example, the base classes DomainEvent and Aggregate are defined independently of the library.

The DomainEvent class is defined as a “frozen” Python dataclass.

@dataclass(frozen=True)
class DomainEvent:
    originator_id: UUID
    originator_version: int
    timestamp: datetime

    @staticmethod
    def create_timestamp() -> datetime:
        return datetime.now(tz=timezone.utc)

The Aggregate base class in this example is also defined as a “frozen” Python dataclass. This has implications for the aggregate command methods, which must return the events that they trigger.

@dataclass(frozen=True)
class Aggregate:
    id: UUID
    version: int
    created_on: datetime
    modified_on: datetime

    def trigger_event(
        self,
        event_class: type[DomainEvent],
        **kwargs: Any,
    ) -> DomainEvent:
        kwargs = kwargs.copy()
        kwargs.update(
            originator_id=self.id,
            originator_version=self.version + 1,
            timestamp=event_class.create_timestamp(),
        )
        return event_class(**kwargs)

    @classmethod
    def projector(
        cls,
        aggregate: Self | None,
        events: Iterable[DomainEvent],
    ) -> Self | None:
        for event in events:
            aggregate = cls.mutate(event, aggregate)
        return aggregate

    @singledispatchmethod[Any]
    @staticmethod
    def mutate(event: DomainEvent, aggregate: TAggregate | None) -> TAggregate | None:
        """Mutates aggregate with event."""

    @dataclass(frozen=True)
    class Snapshot(DomainEvent):
        state: dict[str, Any]

        @classmethod
        def take(cls, aggregate: Aggregate) -> Aggregate.Snapshot:
            return Aggregate.Snapshot(
                originator_id=aggregate.id,
                originator_version=aggregate.version,
                timestamp=DomainEvent.create_timestamp(),
                state=aggregate.__dict__,
            )

It defines a trigger_event() method, which can be called by aggregate command methods, and which does the common work of constructing an event object with an incremented version number and a new timestamp.

It defines a projector() class method which reconstructs an aggregate object by iterating over events, calling the aggregate class’s mutate() method for each event.

It also defines a Snapshot class which is a DomainEvent that can carry the state of an aggregate, and which has a take() method that can construct a snapshot object from an aggregate object.

Domain model

The Dog aggregate class is defined as immutable frozen data class that extends the aggregate base class. The aggregate event classes, Registered and TrickAdded, are explicitly defined as nested subclasses.

The Dog aggregate class defines a mutate() method, which evolves aggregate state by constructing a new instance of the aggregate class each time it is called, according to the type of event it is called with. Support for reconstructing an aggregate object from a snapshot object is included in this method.

@dataclass(frozen=True)
class Dog(Aggregate):
    name: str
    tricks: tuple[str, ...]

    @dataclass(frozen=True)
    class Registered(DomainEvent):
        name: str

    @dataclass(frozen=True)
    class TrickAdded(DomainEvent):
        trick: str

    @staticmethod
    def register(name: str) -> tuple[Dog, DomainEvent]:
        event = Dog.Registered(
            originator_id=uuid4(),
            originator_version=1,
            timestamp=DomainEvent.create_timestamp(),
            name=name,
        )
        dog = Dog.mutate(event, None)
        return dog, event

    def add_trick(self, trick: str) -> tuple[Dog, DomainEvent]:
        event = self.trigger_event(Dog.TrickAdded, trick=trick)
        dog = Dog.mutate(event, self)
        return dog, event

    @singledispatchmethod["Dog"]
    @staticmethod
    def mutate(event: DomainEvent, aggregate: Dog | None) -> Dog:  # noqa: ARG004
        """Mutates aggregate with event."""
        msg = f"Event type not supported: {type(event)}"
        raise ProgrammingError(msg)

    @mutate.register
    @staticmethod
    def _(event: Dog.Registered, _: Dog | None) -> Dog:
        return Dog(
            id=event.originator_id,
            version=event.originator_version,
            created_on=event.timestamp,
            modified_on=event.timestamp,
            name=event.name,
            tricks=(),
        )

    @mutate.register
    @staticmethod
    def _(event: Dog.TrickAdded, aggregate: Dog | None) -> Dog:
        assert aggregate is not None
        return Dog(
            id=aggregate.id,
            version=event.originator_version,
            created_on=aggregate.created_on,
            modified_on=event.timestamp,
            name=aggregate.name,
            tricks=(*aggregate.tricks, event.trick),
        )

    @mutate.register
    @staticmethod
    def _(event: Dog.Snapshot, _: Dog | None) -> Dog:
        return Dog(
            id=event.state["id"],
            version=event.state["version"],
            created_on=event.state["created_on"],
            modified_on=event.state["modified_on"],
            name=event.state["name"],
            tricks=tuple(event.state["tricks"]),  # comes back from JSON as a list
        )

Application

The DogSchool application in this example uses the library’s Application class. It must receive the new events that are returned by the aggregate command methods, and pass them to its save() method. The aggregate projector function must also be supplied when reconstructing an aggregate from the repository, and when taking snapshots.

class DogSchool(Application[UUID]):
    is_snapshotting_enabled = True

    def register_dog(self, name: str) -> UUID:
        dog, event = Dog.register(name)
        self.save(event)
        return dog.id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog = self.repository.get(dog_id, projector_func=Dog.projector)
        dog, event = dog.add_trick(trick)
        self.save(event)

    def get_dog(self, dog_id: UUID) -> dict[str, Any]:
        dog = self.repository.get(dog_id, projector_func=Dog.projector)
        return {"name": dog.name, "tricks": dog.tricks}

Test case

The TestDogSchool test case shows how the DogSchool application can be used.

class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        school.take_snapshot(dog_id, version=3, projector_func=Dog.projector)
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead", "fetch ball")

Code reference

class examples.aggregate5.baseclasses.DomainEvent(originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime')[source]

Bases: object

originator_id: UUID
originator_version: int
timestamp: datetime
static create_timestamp() datetime[source]
class examples.aggregate5.baseclasses.Aggregate(id: 'UUID', version: 'int', created_on: 'datetime', modified_on: 'datetime')[source]

Bases: object

id: UUID
version: int
created_on: datetime
modified_on: datetime
trigger_event(event_class: type[DomainEvent], **kwargs: Any) DomainEvent[source]
classmethod projector(aggregate: Self | None, events: Iterable[DomainEvent]) Self | None[source]
mutate(aggregate: TAggregate | None) TAggregate | None[source]

Mutates aggregate with event.

class Snapshot(originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime', state: 'dict[str, Any]')[source]

Bases: DomainEvent

state: dict[str, Any]
classmethod take(aggregate: Aggregate) Snapshot[source]
class examples.aggregate5.domainmodel.Dog(id: 'UUID', version: 'int', created_on: 'datetime', modified_on: 'datetime', name: 'str', tricks: 'tuple[str, ...]')[source]

Bases: Aggregate

name: str
tricks: tuple[str, ...]
class Registered(originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime', name: 'str')[source]

Bases: DomainEvent

name: str
class TrickAdded(originator_id: 'UUID', originator_version: 'int', timestamp: 'datetime', trick: 'str')[source]

Bases: DomainEvent

trick: str
static register(name: str) tuple[Dog, DomainEvent][source]
add_trick(trick: str) tuple[Dog, DomainEvent][source]
mutate(aggregate: Dog | None) Dog[source]
mutate(event: Registered, _: Dog | None) Dog
mutate(event: TrickAdded, aggregate: Dog | None) Dog
mutate(event: Snapshot, _: Dog | None) Dog

Mutates aggregate with event.

class examples.aggregate5.application.DogSchool(env: Mapping[str, str] | None = None)[source]

Bases: Application[UUID]

is_snapshotting_enabled: bool = True
register_dog(name: str) UUID[source]
add_trick(dog_id: UUID, trick: str) None[source]
get_dog(dog_id: UUID) dict[str, Any][source]
name = 'DogSchool'
class examples.aggregate5.test_application.TestDogSchool(methodName='runTest')[source]

Bases: TestCase

test_dog_school() None[source]