Aggregate 4 - Custom base classes

This example shows another variation of the Dog aggregate class used in the tutorial and module docs.

In contrast with the previous examples, this example does not use the library Aggregate class. Instead, it defines its own Aggregate and DomainEvent base classes. Similar to the previous examples, the Aggregate class is a normal (mutable) Python class and the DomainEvent class is a frozen Python dataclass. A projector() class method is defined on the Aggregate class.

The aggregate event classes are explicitly defined, and the command method bodies explicitly trigger events. Like the previous example, this example uses a separate function to apply events to the current aggregate state.

As in the previous example, the application code simply uses the aggregate class as if it were a normal Python object class. However, the aggregate projector function must be supplied when getting an aggregate from the repository and when taking snapshots.

Domain model

from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, List, Optional, Type, TypeVar, cast
from uuid import UUID, uuid4

from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import Snapshot

class DomainEvent:
    originator_version: int
    originator_id: UUID
    timestamp: datetime

    def create_timestamp() -> datetime:

TAggregate = TypeVar("TAggregate", bound="Aggregate")

class Aggregate:
    id: UUID
    version: int
    created_on: datetime

    def __init__(self, event: DomainEvent): = event.originator_id
        self.version = event.originator_version
        self.created_on = event.timestamp

    def trigger_event(
        event_class: Type[DomainEvent],
        **kwargs: Any,
    ) -> None:
        kwargs = kwargs.copy()
            originator_version=self.version + 1,
        new_event = event_class(**kwargs)

    def apply(self, event: DomainEvent) -> None:
        """Applies event to aggregate."""

    def collect_events(self) -> List[DomainEvent]:
        events, self.pending_events = self.pending_events, []
        return events

    def projector(
        cls: Type[TAggregate],
        _: Optional[TAggregate],
        events: Iterable[DomainEvent],
    ) -> Optional[TAggregate]:
        aggregate = object.__new__(cls)
        for event in events:
        return aggregate

    def pending_events(self) -> List[DomainEvent]:
        return type(self).__pending_events[id(self)]

    def pending_events(self, pending_events: List[DomainEvent]) -> None:
        type(self).__pending_events[id(self)] = pending_events

    __pending_events: Dict[int, List[DomainEvent]] = defaultdict(list)

    def __del__(self) -> None:
        except KeyError:

class Dog(Aggregate):
    class Registered(DomainEvent):
        name: str

    class TrickAdded(DomainEvent):
        trick: str

    def register(cls, name: str) -> "Dog":
        event = cls.Registered(
        dog = cast(Dog, cls.projector(None, [event]))
        return dog

    def add_trick(self, trick: str) -> None:
        self.trigger_event(self.TrickAdded, trick=trick)

    def apply(self, event: DomainEvent) -> None:
        """Applies event to aggregate."""

    def _(self, event: Registered) -> None:
        super().__init__(event) =
        self.tricks: List[str] = []

    def _(self, event: TrickAdded) -> None:
        self.version = event.originator_version

    def _(self, event: Snapshot) -> None:


from typing import Any, Dict
from uuid import UUID

from eventsourcing.application import Application
from eventsourcing.examples.aggregate4.domainmodel import Dog

class DogSchool(Application):
    is_snapshotting_enabled = True

    def register_dog(self, name: str) -> UUID:
        dog = Dog.register(name)

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog = self.repository.get(dog_id, projector_func=Dog.projector)

    def get_dog(self, dog_id: UUID) -> Dict[str, Any]:
        dog = self.repository.get(dog_id, projector_func=Dog.projector)
        return {"name":, "tricks": tuple(dog.tricks)}

Test case

from unittest import TestCase

from eventsourcing.examples.aggregate4.application import DogSchool
from eventsourcing.examples.aggregate4.domainmodel import Dog

class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Select notifications.
        notifications =, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        school.take_snapshot(dog_id, version=3, projector_func=Dog.projector)
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead", "fetch ball")