Aggregate 11 - String IDs¶
This example shows an aggregate that uses the library’s declarative syntax for aggregates, as described in the tutorial and module docs, but with arbitrary string IDs. Many users of KurrentDB, for example, prefer to prefix stream names with the name of a stream category. This example shows how this style can be adopted when using this library.
Domain model¶
The Dog
class in this example uses the library’s
aggregate base class and the event decorator
to define aggregate event classes from command method signatures. The event class names
are given as the argument to the event decorator. The event attributes are defined automatically
by the decorator to match the command method arguments. The bodies of the command methods are used
to evolve the state of an aggregate instance, both when a new event is triggered and when an aggregate
is reconstructed from stored events.
from __future__ import annotations
import uuid
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from eventsourcing.domain import (
BaseAggregate,
CanInitAggregate,
CanMutateAggregate,
CanSnapshotAggregate,
MetaDomainEvent,
event,
)
if TYPE_CHECKING:
from datetime import datetime
@dataclass(frozen=True)
class DomainEvent(metaclass=MetaDomainEvent):
originator_id: str
originator_version: int
timestamp: datetime
def __post_init__(self) -> None:
if not isinstance(self.originator_id, str):
msg = (
f"{type(self)} "
f"was initialized with a non-str originator_id: "
f"{self.originator_id!r}"
)
raise TypeError(msg)
@dataclass(frozen=True)
class Snapshot(DomainEvent, CanSnapshotAggregate[str]):
topic: str
state: dict[str, Any]
class Aggregate(BaseAggregate[str]):
@dataclass(frozen=True)
class Event(DomainEvent, CanMutateAggregate[str]):
pass
@dataclass(frozen=True)
class Created(Event, CanInitAggregate[str]):
originator_topic: str
class Dog(Aggregate):
INITIAL_VERSION = 0
@staticmethod
def create_id() -> str:
return "dog-" + str(uuid.uuid4())
@event("Registered")
def __init__(self, name: str) -> None:
self.name = name
self.tricks: list[str] = []
@event("TrickAdded")
def add_trick(self, trick: str) -> None:
self.tricks.append(trick)
Application¶
The DogSchool
application class in this example uses the
library’s application base class. It fully encapsulates the
Dog
aggregate, defining command and query methods
that use the event-sourced aggregate class as if it were a normal Python object class.
from __future__ import annotations
from typing import Any
from eventsourcing.application import Application
from examples.aggregate11.domainmodel import Dog, Snapshot
class DogSchool(Application[str]):
is_snapshotting_enabled = True
snapshot_class = Snapshot
def register_dog(self, name: str) -> str:
dog = Dog(name)
self.save(dog)
return dog.id
def add_trick(self, dog_id: str, trick: str) -> None:
dog: Dog = self.repository.get(dog_id)
dog.add_trick(trick)
self.save(dog)
def get_dog(self, dog_id: str) -> dict[str, Any]:
dog: Dog = self.repository.get(dog_id)
return {"name": dog.name, "tricks": tuple(dog.tricks)}
Test case¶
The TestDogSchool
test case shows how the
DogSchool
application can be used. It demonstrates
arbitrary string can be used as aggregate IDs, with both the POPO and SQLite persistence modules,
and of course with KurrentDB.
class TestDogSchool(TestCase):
def setUp(self) -> None:
self.env: dict[str, str] = {}
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool(self.env)
max_notification_id = school.recorder.max_notification_id()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
self.assertEqual("Fido", dog["name"])
self.assertEqual(("roll over", "play dead"), dog["tricks"])
# Select notifications.
notifications = school.notification_log.select(
start=max_notification_id, limit=10, inclusive_of_start=False
)
self.assertEqual(3, len(notifications))
# Take snapshot.
school.take_snapshot(dog_id, version=3)
dog = school.get_dog(dog_id)
self.assertEqual("Fido", dog["name"])
self.assertEqual(("roll over", "play dead"), dog["tricks"])
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
self.assertEqual("Fido", dog["name"])
self.assertEqual(("roll over", "play dead", "fetch ball"), dog["tricks"])
def test_dog_school_with_sqlite(self) -> None:
self.env["PERSISTENCE_MODULE"] = "eventsourcing.sqlite"
self.env["SQLITE_DBNAME"] = ":memory:"
self.test_dog_school()
@skipIf("eventsourcing_kurrentdb" not in sys.modules, "KurrentDB not installed")
def test_dog_school_with_kurrentdb(self) -> None:
self.env["PERSISTENCE_MODULE"] = "eventsourcing_kurrentdb"
self.env["KURRENTDB_URI"] = "esdb://localhost:2113?Tls=false"
self.test_dog_school()
Code reference¶
- class examples.aggregate11.domainmodel.DomainEvent(originator_id: 'str', originator_version: 'int', timestamp: 'datetime')[source]¶
Bases:
object
- originator_id: str¶
- originator_version: int¶
- timestamp: datetime¶
- __init__(originator_id: str, originator_version: int, timestamp: datetime) None ¶
- class examples.aggregate11.domainmodel.Snapshot(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')[source]¶
Bases:
DomainEvent
,CanSnapshotAggregate
[str
]- topic: str¶
- state: dict[str, Any]¶
- __init__(originator_id: str, originator_version: int, timestamp: datetime, topic: str, state: dict[str, Any]) None ¶
- originator_id_type¶
alias of
str
- class examples.aggregate11.domainmodel.Aggregate(*args: Any, **kwargs: Any)[source]¶
Bases:
BaseAggregate
[str
]- class Event(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')[source]¶
Bases:
DomainEvent
,CanMutateAggregate
[str
]- __init__(originator_id: str, originator_version: int, timestamp: datetime) None ¶
- originator_id_type¶
alias of
str
- originator_id: str¶
UUID identifying an aggregate to which the event belongs.
- originator_version: int¶
Integer identifying the version of the aggregate when the event occurred.
- timestamp: datetime¶
Timezone-aware
datetime
object representing when an event occurred.
- class Created(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')[source]¶
Bases:
Event
,CanInitAggregate
[str
]- originator_topic: str¶
String describing the path to an aggregate class.
- __init__(originator_id: str, originator_version: int, timestamp: datetime, originator_topic: str) None ¶
- originator_id_type¶
alias of
str
- class examples.aggregate11.domainmodel.Dog(*args: Any, **kwargs: Any)[source]¶
Bases:
Aggregate
- INITIAL_VERSION: int = 0¶
- class Created(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')¶
-
- __init__(originator_id: str, originator_version: int, timestamp: datetime, originator_topic: str) None ¶
- originator_id_type¶
alias of
str
- class Event(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')¶
Bases:
Event
- __init__(originator_id: str, originator_version: int, timestamp: datetime) None ¶
- originator_id_type¶
alias of
str
- class Registered(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime', originator_topic: 'str')¶
-
- __init__(originator_id: str, originator_version: int, timestamp: datetime, originator_topic: str, name: str) None ¶
- originator_id_type¶
alias of
str
- name: str¶
- class TrickAdded(self, originator_id: 'str', originator_version: 'int', timestamp: 'datetime')¶
Bases:
DecoratorEvent
,Event
- __init__(originator_id: str, originator_version: int, timestamp: datetime, trick: str) None ¶
- trick: str¶
- class examples.aggregate11.application.DogSchool(env: Mapping[str, str] | None = None)[source]¶
Bases:
Application
[str
]- is_snapshotting_enabled: bool = True¶
- name = 'DogSchool'¶