Aggregate 11 - String IDs¶
This example shows an aggregate that uses the library’s declarative syntax for aggregates, as described in the tutorial and module docs, but with arbitrary string IDs. Many users of KurrentDB, for example, prefer to prefix stream names with the name of a stream category. This example shows how this style can be adopted when using this library.
Domain model¶
If you want to use string IDs, the important thing is to derive aggregate and event classes that
use str values for the aggregate IDs.
This example uses the library’s generic base class to
define an Aggregate class that expects string IDs.
The Dog aggregate class is derived from this. The
important thing is to specify str as the type argument of the generic base class, and
to define event classes that will work with this kind of aggregate.
The Dog then uses the event decorator to define
aggregate event classes from command method signatures. Each event class name is given as an argument to an event
decorator. The event attributes are then defined automatically by the decorator to match the command method arguments.
The bodies of the command methods are used to evolve the state of an aggregate instance, both when
a new event is triggered and when an aggregate is reconstructed from stored events.
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from uuid import UUID, uuid4
from eventsourcing.domain import (
BaseAggregate,
CanInitAggregate,
CanMutateAggregate,
CanSnapshotAggregate,
MetaDomainEvent,
datetime_now_with_tzinfo,
event,
get_metadata_from_context,
)
if TYPE_CHECKING:
from datetime import datetime
@dataclass(frozen=True, kw_only=True)
class DomainEvent(metaclass=MetaDomainEvent):
originator_id: str
originator_version: int
timestamp: datetime = field(default_factory=datetime_now_with_tzinfo)
metadata: dict[str, str] = field(default_factory=get_metadata_from_context)
event_id: UUID = field(default_factory=uuid4)
def __post_init__(self) -> None:
if not isinstance(self.originator_id, str):
msg = (
f"{type(self)} "
f"was initialized with a non-str originator_id: "
f"{self.originator_id!r}"
)
raise TypeError(msg)
@dataclass(frozen=True, kw_only=True)
class Snapshot(DomainEvent, CanSnapshotAggregate[str]):
topic: str
state: dict[str, Any]
class Aggregate(BaseAggregate[str]):
@dataclass(frozen=True)
class Event(DomainEvent, CanMutateAggregate[str]):
pass
@dataclass(frozen=True, kw_only=True)
class Created(Event, CanInitAggregate[str]):
originator_topic: str
class Dog(Aggregate):
INITIAL_VERSION = 0
@staticmethod
def create_id() -> str:
return "dog-" + str(uuid.uuid4())
@event("Registered")
def __init__(self, name: str) -> None:
self.name = name
self.tricks: list[str] = []
@event("TrickAdded")
def add_trick(self, trick: str) -> None:
self.tricks.append(trick)
Application¶
The DogSchool application class in this example uses the
library’s application base class.
The important thing is to specify str as the type argument of the generic base class, which
will condition the persistence modules to respect the string IDs, and also setup the static typing
so you can validate your code with a static type checker such as Mypy or Pyright.
The DogSchool application class fully encapsulates the
Dog aggregate, defining command and query methods
that use the event-sourced aggregate class as if it were a normal Python object class.
from __future__ import annotations
from typing import Any
from eventsourcing.application import Application
from examples.aggregate11.domainmodel import Dog, Snapshot
class DogSchool(Application[str]):
is_snapshotting_enabled = True
snapshot_class = Snapshot
def register_dog(self, name: str) -> str:
dog = Dog(name)
self.save(dog)
return dog.id
def add_trick(self, dog_id: str, trick: str) -> None:
dog: Dog = self.repository.get(dog_id)
dog.add_trick(trick)
self.save(dog)
def get_dog(self, dog_id: str) -> dict[str, Any]:
dog: Dog = self.repository.get(dog_id)
return {"name": dog.name, "tricks": tuple(dog.tricks)}
Test case¶
The TestDogSchool test case shows how the
DogSchool application can be used. It demonstrates
arbitrary string can be used as aggregate IDs, with both the POPO and SQLite persistence modules,
and of course with KurrentDB.
class TestDogSchool(TestCase):
def setUp(self) -> None:
self.env: dict[str, str] = {}
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool(self.env)
max_notification_id = school.recorder.max_notification_id()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
self.assertEqual("Fido", dog["name"])
self.assertEqual(("roll over", "play dead"), dog["tricks"])
# Select notifications.
notifications = school.notification_log.select(
start=max_notification_id, limit=10, inclusive_of_start=False
)
self.assertEqual(3, len(notifications))
# Take snapshot.
school.take_snapshot(dog_id, version=3)
dog = school.get_dog(dog_id)
self.assertEqual("Fido", dog["name"])
self.assertEqual(("roll over", "play dead"), dog["tricks"])
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
self.assertEqual("Fido", dog["name"])
self.assertEqual(("roll over", "play dead", "fetch ball"), dog["tricks"])
def test_dog_school_with_sqlite(self) -> None:
self.env["PERSISTENCE_MODULE"] = "eventsourcing.sqlite"
self.env["SQLITE_DBNAME"] = ":memory:"
self.env["ORIGINATOR_ID_TYPE"] = "text"
self.test_dog_school()
def test_dog_school_with_postgres(self) -> None:
self.env["PERSISTENCE_MODULE"] = "eventsourcing.postgres"
self.env["POSTGRES_DBNAME"] = "eventsourcing"
self.env["POSTGRES_HOST"] = "127.0.0.1"
self.env["POSTGRES_USER"] = "eventsourcing"
self.env["POSTGRES_PASSWORD"] = "eventsourcing" # noqa: S105
self.env["ORIGINATOR_ID_TYPE"] = "text"
self.test_dog_school()
@skipIf("eventsourcing_kurrentdb" not in sys.modules, "KurrentDB not installed")
def test_dog_school_with_kurrentdb(self) -> None:
self.env["PERSISTENCE_MODULE"] = "eventsourcing_kurrentdb"
self.env["KURRENTDB_URI"] = "esdb://localhost:2113?Tls=false"
self.test_dog_school()
Code reference¶
- class examples.aggregate11.domainmodel.DomainEvent(*, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]¶
Bases:
object- originator_id: str¶
- originator_version: int¶
- timestamp: datetime¶
- metadata: dict[str, str]¶
- event_id: UUID¶
- __init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>) None¶
- class examples.aggregate11.domainmodel.Snapshot(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]¶
Bases:
DomainEvent,CanSnapshotAggregate[str]- topic: str¶
- state: dict[str, Any]¶
- __init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str] = <factory>, event_id: UUID = <factory>, topic: str, state: dict[str, Any]) None¶
- originator_id_type¶
alias of
str
- class examples.aggregate11.domainmodel.Aggregate(*args: Any, **kwargs: Any)[source]¶
Bases:
BaseAggregate[str]- class Event(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]¶
Bases:
DomainEvent,CanMutateAggregate[str]- __init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>) None¶
- originator_id_type¶
alias of
str
- originator_id: str¶
UUID identifying an aggregate to which the event belongs.
- originator_version: int¶
Integer identifying the version of the aggregate when the event occurred.
- timestamp: datetime¶
Timezone-aware
datetimeobject representing when an event occurred.
- metadata: dict[str, str]¶
Event metadata.
- event_id: UUID¶
Event identifier.
- class Created(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)[source]¶
Bases:
Event,CanInitAggregate[str]- originator_topic: str¶
String describing the path to an aggregate class.
- __init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, originator_topic: str) None¶
- originator_id_type¶
alias of
str
- class examples.aggregate11.domainmodel.Dog(*args: Any, **kwargs: Any)[source]¶
Bases:
Aggregate- INITIAL_VERSION: int = 0¶
- tricks: list[str]¶
- class Created(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)¶
-
- __init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, originator_topic: str) None¶
- originator_id_type¶
alias of
str
- class Event(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)¶
Bases:
Event- __init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>) None¶
- originator_id_type¶
alias of
str
- class Registered(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>, originator_topic: 'str')¶
-
- __init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, originator_topic: str, name: str) None¶
- originator_id_type¶
alias of
str
- name: str¶
- class TrickAdded(self, *, originator_id: 'str', originator_version: 'int', timestamp: 'datetime' = <factory>, metadata: 'dict[str, str]'=<factory>, event_id: 'UUID' = <factory>)¶
Bases:
DecoratedFuncCaller,Event- __init__(*, originator_id: str, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, trick: str) None¶
- originator_id_type¶
alias of
str
- trick: str¶
- class examples.aggregate11.application.DogSchool(env: Mapping[str, str] | None = None)[source]¶
Bases:
Application[str]- is_snapshotting_enabled: bool = True¶
- aggregate_id_type¶
alias of
str
- name = 'DogSchool'¶