Aggregate 11 - String IDs

This example shows an aggregate that uses the library’s declarative syntax for aggregates, as described in the tutorial and module docs, but with arbitrary string IDs. Many users of KurrentDB, for example, prefer to prefix stream names with the name of a stream category. This example shows how this style can be adopted when using this library.

Domain model

The Dog class in this example uses the library’s aggregate base class and the event decorator to define aggregate event classes from command method signatures. The event class names are given as the argument to the event decorator. The event attributes are defined automatically by the decorator to match the command method arguments. The bodies of the command methods are used to evolve the state of an aggregate instance, both when a new event is triggered and when an aggregate is reconstructed from stored events.

from __future__ import annotations

import uuid
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

from eventsourcing.domain import (
    BaseAggregate,
    CanInitAggregate,
    CanMutateAggregate,
    CanSnapshotAggregate,
    MetaDomainEvent,
    event,
)

if TYPE_CHECKING:
    from datetime import datetime


@dataclass(frozen=True)
class DomainEvent(metaclass=MetaDomainEvent):
    originator_id: str
    originator_version: int
    timestamp: datetime

    def __post_init__(self) -> None:
        if not isinstance(self.originator_id, str):
            msg = (
                f"{type(self)} "
                f"was initialized with a non-str originator_id: "
                f"{self.originator_id!r}"
            )
            raise TypeError(msg)


@dataclass(frozen=True)
class Snapshot(DomainEvent, CanSnapshotAggregate[str]):
    topic: str
    state: dict[str, Any]


class Aggregate(BaseAggregate[str]):
    @dataclass(frozen=True)
    class Event(DomainEvent, CanMutateAggregate[str]):
        pass

    @dataclass(frozen=True)
    class Created(Event, CanInitAggregate[str]):
        originator_topic: str


class Dog(Aggregate):
    INITIAL_VERSION = 0

    @staticmethod
    def create_id() -> str:
        return "dog-" + str(uuid.uuid4())

    @event("Registered")
    def __init__(self, name: str) -> None:
        self.name = name
        self.tricks: list[str] = []

    @event("TrickAdded")
    def add_trick(self, trick: str) -> None:
        self.tricks.append(trick)

Application

The DogSchool application class in this example uses the library’s application base class. It fully encapsulates the Dog aggregate, defining command and query methods that use the event-sourced aggregate class as if it were a normal Python object class.

from __future__ import annotations

from typing import Any

from eventsourcing.application import Application
from examples.aggregate11.domainmodel import Dog, Snapshot


class DogSchool(Application[str]):
    is_snapshotting_enabled = True
    snapshot_class = Snapshot

    def register_dog(self, name: str) -> str:
        dog = Dog(name)
        self.save(dog)
        return dog.id

    def add_trick(self, dog_id: str, trick: str) -> None:
        dog: Dog = self.repository.get(dog_id)
        dog.add_trick(trick)
        self.save(dog)

    def get_dog(self, dog_id: str) -> dict[str, Any]:
        dog: Dog = self.repository.get(dog_id)
        return {"name": dog.name, "tricks": tuple(dog.tricks)}

Test case

The TestDogSchool test case shows how the DogSchool application can be used. It demonstrates arbitrary string can be used as aggregate IDs, with both the POPO and SQLite persistence modules, and of course with KurrentDB.

class TestDogSchool(TestCase):
    def setUp(self) -> None:
        self.env: dict[str, str] = {}

    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool(self.env)

        max_notification_id = school.recorder.max_notification_id()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        self.assertEqual("Fido", dog["name"])
        self.assertEqual(("roll over", "play dead"), dog["tricks"])

        # Select notifications.
        notifications = school.notification_log.select(
            start=max_notification_id, limit=10, inclusive_of_start=False
        )
        self.assertEqual(3, len(notifications))

        # Take snapshot.
        school.take_snapshot(dog_id, version=3)
        dog = school.get_dog(dog_id)
        self.assertEqual("Fido", dog["name"])
        self.assertEqual(("roll over", "play dead"), dog["tricks"])

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        self.assertEqual("Fido", dog["name"])
        self.assertEqual(("roll over", "play dead", "fetch ball"), dog["tricks"])

    def test_dog_school_with_sqlite(self) -> None:
        self.env["PERSISTENCE_MODULE"] = "eventsourcing.sqlite"
        self.env["SQLITE_DBNAME"] = ":memory:"
        self.test_dog_school()

    @skipIf("eventsourcing_kurrentdb" not in sys.modules, "KurrentDB not installed")
    def test_dog_school_with_kurrentdb(self) -> None:
        self.env["PERSISTENCE_MODULE"] = "eventsourcing_kurrentdb"
        self.env["KURRENTDB_URI"] = "esdb://localhost:2113?Tls=false"
        self.test_dog_school()

Code reference

class examples.aggregate11.domainmodel.DomainEvent(originator_id: 'str', originator_version: 'int', timestamp: 'datetime')[source]

Bases: object

originator_id: str
originator_version: int
timestamp: datetime
__init__(originator_id: str, originator_version: int, timestamp: datetime) None
class examples.aggregate11.domainmodel.Snapshot(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')[source]

Bases: DomainEvent, CanSnapshotAggregate[str]

topic: str
state: dict[str, Any]
__init__(originator_id: str, originator_version: int, timestamp: datetime, topic: str, state: dict[str, Any]) None
originator_id_type

alias of str

class examples.aggregate11.domainmodel.Aggregate(*args: Any, **kwargs: Any)[source]

Bases: BaseAggregate[str]

class Event(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')[source]

Bases: DomainEvent, CanMutateAggregate[str]

__init__(originator_id: str, originator_version: int, timestamp: datetime) None
originator_id_type

alias of str

originator_id: str

UUID identifying an aggregate to which the event belongs.

originator_version: int

Integer identifying the version of the aggregate when the event occurred.

timestamp: datetime

Timezone-aware datetime object representing when an event occurred.

class Created(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')[source]

Bases: Event, CanInitAggregate[str]

originator_topic: str

String describing the path to an aggregate class.

__init__(originator_id: str, originator_version: int, timestamp: datetime, originator_topic: str) None
originator_id_type

alias of str

class examples.aggregate11.domainmodel.Dog(*args: Any, **kwargs: Any)[source]

Bases: Aggregate

INITIAL_VERSION: int = 0
static create_id() str[source]

Returns a new aggregate ID.

__init__(name: str) None[source]
add_trick = <eventsourcing.domain.UnboundCommandMethodDecorator object>[source]
class Created(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')

Bases: Event, Created

__init__(originator_id: str, originator_version: int, timestamp: datetime, originator_topic: str) None
originator_id_type

alias of str

class Event(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')

Bases: Event

__init__(originator_id: str, originator_version: int, timestamp: datetime) None
originator_id_type

alias of str

class Registered(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime', originator_topic: 'str')

Bases: Created, Event

__init__(originator_id: str, originator_version: int, timestamp: datetime, originator_topic: str, name: str) None
originator_id_type

alias of str

name: str
class TrickAdded(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')

Bases: DecoratorEvent, Event

__init__(originator_id: str, originator_version: int, timestamp: datetime, trick: str) None
trick: str
class examples.aggregate11.application.DogSchool(env: Mapping[str, str] | None = None)[source]

Bases: Application[str]

is_snapshotting_enabled: bool = True
snapshot_class

alias of Snapshot

register_dog(name: str) str[source]
add_trick(dog_id: str, trick: str) None[source]
get_dog(dog_id: str) dict[str, Any][source]
name = 'DogSchool'
class examples.aggregate11.test_application.TestDogSchool(methodName='runTest')[source]

Bases: TestCase

setUp() None[source]

Hook method for setting up the test fixture before exercising it.

test_dog_school() None[source]
test_dog_school_with_sqlite() None[source]
test_dog_school_with_kurrentdb() None[source]