Aggregate 11 - String IDs

This example shows an aggregate that uses the library’s declarative syntax for aggregates, as described in the tutorial and module docs, but with arbitrary string IDs. Many users of KurrentDB, for example, prefer to prefix stream names with the name of a stream category. This example shows how this style can be adopted when using this library.

Domain model

If you want to use string IDs, the important thing is to derive aggregate and event classes that use str values for the aggregate IDs.

This example uses the library’s generic base class to define an Aggregate class that expects string IDs. The Dog aggregate class is derived from this. The important thing is to specify str as the type argument of the generic base class, and to define event classes that will work with this kind of aggregate.

The Dog then uses the event decorator to define aggregate event classes from command method signatures. Each event class name is given as an argument to an event decorator. The event attributes are then defined automatically by the decorator to match the command method arguments. The bodies of the command methods are used to evolve the state of an aggregate instance, both when a new event is triggered and when an aggregate is reconstructed from stored events.

from __future__ import annotations

import uuid
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from uuid import UUID, uuid4

from eventsourcing.domain import (
    BaseAggregate,
    CanInitAggregate,
    CanMutateAggregate,
    CanSnapshotAggregate,
    MetaDomainEvent,
    datetime_now_with_tzinfo,
    event,
    get_metadata_from_context,
)

if TYPE_CHECKING:
    from datetime import datetime


@dataclass(frozen=True, kw_only=True)
class DomainEvent(metaclass=MetaDomainEvent):
    originator_id: str
    originator_version: int
    timestamp: datetime = field(default_factory=datetime_now_with_tzinfo)
    metadata: dict[str, str] = field(default_factory=get_metadata_from_context)
    event_id: UUID = field(default_factory=uuid4)

    def __post_init__(self) -> None:
        if not isinstance(self.originator_id, str):
            msg = (
                f"{type(self)} "
                f"was initialized with a non-str originator_id: "
                f"{self.originator_id!r}"
            )
            raise TypeError(msg)


@dataclass(frozen=True, kw_only=True)
class Snapshot(DomainEvent, CanSnapshotAggregate[str]):
    topic: str
    state: dict[str, Any]


class Aggregate(BaseAggregate[str]):
    @dataclass(frozen=True)
    class Event(DomainEvent, CanMutateAggregate[str]):
        pass

    @dataclass(frozen=True, kw_only=True)
    class Created(Event, CanInitAggregate[str]):
        originator_topic: str


class Dog(Aggregate):
    INITIAL_VERSION = 0

    @staticmethod
    def create_id() -> str:
        return "dog-" + str(uuid.uuid4())

    @event("Registered")
    def __init__(self, name: str) -> None:
        self.name = name
        self.tricks: list[str] = []

    @event("TrickAdded")
    def add_trick(self, trick: str) -> None:
        self.tricks.append(trick)

Application

The DogSchool application class in this example uses the library’s application base class.

The important thing is to specify str as the type argument of the generic base class, which will condition the persistence modules to respect the string IDs, and also setup the static typing so you can validate your code with a static type checker such as Mypy or Pyright.

The DogSchool application class fully encapsulates the Dog aggregate, defining command and query methods that use the event-sourced aggregate class as if it were a normal Python object class.

from __future__ import annotations

from typing import Any

from eventsourcing.application import Application
from examples.aggregate11.domainmodel import Dog, Snapshot


class DogSchool(Application[str]):
    is_snapshotting_enabled = True
    snapshot_class = Snapshot

    def register_dog(self, name: str) -> str:
        dog = Dog(name)
        self.save(dog)
        return dog.id

    def add_trick(self, dog_id: str, trick: str) -> None:
        dog: Dog = self.repository.get(dog_id)
        dog.add_trick(trick)
        self.save(dog)

    def get_dog(self, dog_id: str) -> dict[str, Any]:
        dog: Dog = self.repository.get(dog_id)
        return {"name": dog.name, "tricks": tuple(dog.tricks)}

Test case

The TestDogSchool test case shows how the DogSchool application can be used. It demonstrates arbitrary string can be used as aggregate IDs, with both the POPO and SQLite persistence modules, and of course with KurrentDB.

class TestDogSchool(TestCase):
    def setUp(self) -> None:
        self.env: dict[str, str] = {}

    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool(self.env)

        max_notification_id = school.recorder.max_notification_id()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        self.assertEqual("Fido", dog["name"])
        self.assertEqual(("roll over", "play dead"), dog["tricks"])

        # Select notifications.
        notifications = school.notification_log.select(
            start=max_notification_id, limit=10, inclusive_of_start=False
        )
        self.assertEqual(3, len(notifications))

        # Take snapshot.
        school.take_snapshot(dog_id, version=3)
        dog = school.get_dog(dog_id)
        self.assertEqual("Fido", dog["name"])
        self.assertEqual(("roll over", "play dead"), dog["tricks"])

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        self.assertEqual("Fido", dog["name"])
        self.assertEqual(("roll over", "play dead", "fetch ball"), dog["tricks"])

    def test_dog_school_with_sqlite(self) -> None:
        self.env["PERSISTENCE_MODULE"] = "eventsourcing.sqlite"
        self.env["SQLITE_DBNAME"] = ":memory:"
        self.env["ORIGINATOR_ID_TYPE"] = "text"
        self.test_dog_school()

    def test_dog_school_with_postgres(self) -> None:
        self.env["PERSISTENCE_MODULE"] = "eventsourcing.postgres"
        self.env["POSTGRES_DBNAME"] = "eventsourcing"
        self.env["POSTGRES_HOST"] = "127.0.0.1"
        self.env["POSTGRES_USER"] = "eventsourcing"
        self.env["POSTGRES_PASSWORD"] = "eventsourcing"  # noqa: S105
        self.env["ORIGINATOR_ID_TYPE"] = "text"
        self.test_dog_school()

    @skipIf("eventsourcing_kurrentdb" not in sys.modules, "KurrentDB not installed")
    def test_dog_school_with_kurrentdb(self) -> None:
        self.env["PERSISTENCE_MODULE"] = "eventsourcing_kurrentdb"
        self.env["KURRENTDB_URI"] = "esdb://localhost:2113?Tls=false"
        self.test_dog_school()

Code reference

class examples.aggregate11.domainmodel.DomainEvent(*, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]

Bases: object

originator_id: str
originator_version: int
timestamp: datetime
metadata: dict[str, str]
event_id: UUID
__init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>) None
class examples.aggregate11.domainmodel.Snapshot(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]

Bases: DomainEvent, CanSnapshotAggregate[str]

topic: str
state: dict[str, Any]
__init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str] = <factory>, event_id: UUID = <factory>, topic: str, state: dict[str, Any]) None
originator_id_type

alias of str

class examples.aggregate11.domainmodel.Aggregate(*args: Any, **kwargs: Any)[source]

Bases: BaseAggregate[str]

class Event(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]

Bases: DomainEvent, CanMutateAggregate[str]

__init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>) None
originator_id_type

alias of str

originator_id: str

UUID identifying an aggregate to which the event belongs.

originator_version: int

Integer identifying the version of the aggregate when the event occurred.

timestamp: datetime

Timezone-aware datetime object representing when an event occurred.

metadata: dict[str, str]

Event metadata.

event_id: UUID

Event identifier.

class Created(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]

Bases: Event, CanInitAggregate[str]

originator_topic: str

String describing the path to an aggregate class.

__init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, originator_topic: str) None
originator_id_type

alias of str

class examples.aggregate11.domainmodel.Dog(*args: Any, **kwargs: Any)[source]

Bases: Aggregate

INITIAL_VERSION: int = 0
static create_id() str[source]

Returns a new aggregate ID.

__init__(name: str) None[source]
tricks: list[str]
add_trick(trick: str) None[source]
class Created(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)

Bases: Event, Created

__init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, originator_topic: str) None
originator_id_type

alias of str

class Event(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)

Bases: Event

__init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>) None
originator_id_type

alias of str

class Registered(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>, originator_topic: 'str')

Bases: Created, Event

__init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, originator_topic: str, name: str) None
originator_id_type

alias of str

name: str
class TrickAdded(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)

Bases: DecoratedFuncCaller, Event

__init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, trick: str) None
originator_id_type

alias of str

trick: str
class examples.aggregate11.application.DogSchool(env: Mapping[str, str] | None = None)[source]

Bases: Application[str]

is_snapshotting_enabled: bool = True
snapshot_class

alias of Snapshot

register_dog(name: str) str[source]
add_trick(dog_id: str, trick: str) None[source]
get_dog(dog_id: str) dict[str, Any][source]
aggregate_id_type

alias of str

name = 'DogSchool'
class examples.aggregate11.test_application.TestDogSchool(methodName='runTest')[source]

Bases: TestCase

setUp() None[source]

Hook method for setting up the test fixture before exercising it.

test_dog_school() None[source]
test_dog_school_with_sqlite() None[source]
test_dog_school_with_postgres() None[source]
test_dog_school_with_kurrentdb() None[source]