Aggregate 10 - msgspec with declarative syntax¶
This example shows how to use msgspec with the library’s declarative syntax.
Similar to example 1, aggregates are expressed using the library’s declarative syntax. This is the most concise way of defining an event-sourced aggregate.
Similar to example 9, domain event and custom value objects are defined using msgspec. The main advantage of using msgspec here is that any custom value objects used in the domain model will be automatically serialised and deserialised, without needing also to define custom transcoding classes. The advantage of msgspec structs over Pydantic v2 is performance.
msgspec model for mutable aggregate¶
The code below shows how to define base classes for mutable aggregates that use msgspec.
from __future__ import annotations
import typing
from datetime import datetime # noqa: TC003
from typing import TYPE_CHECKING, Any, cast
from uuid import UUID, uuid4
from eventsourcing.domain import (
BaseAggregate,
CanInitAggregate,
CanMutateAggregate,
CanSnapshotAggregate,
datetime_now_with_tzinfo,
)
from eventsourcing.utils import get_topic, resolve_topic
from examples.aggregate9.immutablemodel import DomainEvent, Immutable
if TYPE_CHECKING:
from eventsourcing.domain import MutableOrImmutableAggregate
class SnapshotState(Immutable, frozen=True):
created_on: datetime
modified_on: datetime
class AggregateSnapshot(DomainEvent, CanSnapshotAggregate[UUID], frozen=True):
topic: str
state: Any
@classmethod
def take(cls, aggregate: MutableOrImmutableAggregate[UUID]) -> AggregateSnapshot:
type_of_snapshot_state = typing.get_type_hints(cls)["state"]
aggregate_state = dict(aggregate.__dict__)
aggregate_state.pop("_id")
aggregate_state.pop("_version")
aggregate_state["created_on"] = aggregate_state.pop("_created_on")
aggregate_state["modified_on"] = aggregate_state.pop("_modified_on")
aggregate_state.pop("_pending_events")
snapshot_state = type_of_snapshot_state(**aggregate_state)
return cls(
originator_id=aggregate.id,
originator_version=aggregate.version,
timestamp=datetime_now_with_tzinfo(),
topic=get_topic(type(aggregate)),
state=snapshot_state,
)
def mutate(self, _: None) -> Aggregate:
"""Reconstructs the snapshotted :class:`Aggregate` object."""
cls = cast("type[Aggregate]", resolve_topic(self.topic))
aggregate_state: dict[str, Any] = {
key: getattr(self.state, key) for key in type(self.state).__struct_fields__
}
aggregate_state["_id"] = self.originator_id
aggregate_state["_version"] = self.originator_version
aggregate_state["_created_on"] = self.state.created_on
aggregate_state["_modified_on"] = self.state.modified_on
aggregate_state["_version"] = self.originator_version
aggregate_state["_pending_events"] = []
aggregate = object.__new__(cls)
aggregate.__dict__.update(aggregate_state)
return aggregate
class AggregateEvent(DomainEvent, CanMutateAggregate[UUID], frozen=True):
def _as_dict(self) -> dict[str, Any]:
return {key: getattr(self, key) for key in self.__struct_fields__}
class Aggregate(BaseAggregate[UUID]):
@classmethod
def create_id(cls, *_: Any, **__: Any) -> UUID:
return uuid4()
class Event(AggregateEvent, frozen=True):
pass
class Created(Event, CanInitAggregate[UUID], frozen=True):
originator_topic: str
Domain model¶
The code below shows how to define a mutable aggregate with the library’s declarative syntax, using the msgspec module for mutable aggregates
from __future__ import annotations
from eventsourcing.domain import event
from examples.aggregate9.immutablemodel import Immutable
from examples.aggregate10.mutablemodel import (
Aggregate,
AggregateSnapshot,
SnapshotState,
)
class Trick(Immutable, frozen=True):
name: str
class DogSnapshotState(SnapshotState, frozen=True):
name: str
tricks: list[Trick]
class Dog(Aggregate):
class Snapshot(AggregateSnapshot, frozen=True):
state: DogSnapshotState
@event("Registered")
def __init__(self, name: str) -> None:
self.name = name
self.tricks: list[Trick] = []
@event("TrickAdded")
def add_trick(self, trick: Trick) -> None:
self.tricks.append(trick)
Application¶
The DogSchool
application in this example uses the
MsgspecApplication
class
from example 9.
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from examples.aggregate9.msgspecstructs import MsgspecApplication
from examples.aggregate10.domainmodel import Dog, Trick
if TYPE_CHECKING:
from uuid import UUID
class DogSchool(MsgspecApplication):
is_snapshotting_enabled = True
def register_dog(self, name: str) -> UUID:
dog = Dog(name)
self.save(dog)
return dog.id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog: Dog = self.repository.get(dog_id)
dog.add_trick(Trick(name=trick))
self.save(dog)
def get_dog(self, dog_id: UUID) -> dict[str, Any]:
dog: Dog = self.repository.get(dog_id)
return {
"name": dog.name,
"tricks": tuple([t.name for t in dog.tricks]),
"created_on": dog.created_on,
"modified_on": dog.modified_on,
}
Test case¶
The TestDogSchool
test case shows how the
DogSchool
application can be used.
from __future__ import annotations
from datetime import datetime
from unittest import TestCase
from examples.aggregate10.application import DogSchool
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead"))
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
school.take_snapshot(dog_id, version=3)
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead"))
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead", "fetch ball"))
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
Code reference¶
- class examples.aggregate10.mutablemodel.SnapshotState(created_on: datetime, modified_on: datetime)[source]¶
Bases:
Immutable
- created_on: datetime¶
- modified_on: datetime¶
- class examples.aggregate10.mutablemodel.AggregateSnapshot(originator_id: UUID, originator_version: int, timestamp: datetime, topic: str, state: Any)[source]¶
Bases:
DomainEvent
,CanSnapshotAggregate
[UUID
]- topic: str¶
- state: Any¶
- classmethod take(aggregate: MutableOrImmutableAggregate[UUID]) AggregateSnapshot [source]¶
Creates a snapshot of the given
Aggregate
object.
- originator_id_type¶
alias of
UUID
- class examples.aggregate10.mutablemodel.AggregateEvent(originator_id: UUID, originator_version: int, timestamp: datetime)[source]¶
Bases:
DomainEvent
,CanMutateAggregate
[UUID
]- originator_id_type¶
alias of
UUID
- class examples.aggregate10.mutablemodel.Aggregate(*args: Any, **kwargs: Any)[source]¶
Bases:
BaseAggregate
[UUID
]- class Event(originator_id: UUID, originator_version: int, timestamp: datetime)[source]¶
Bases:
AggregateEvent
- originator_id_type¶
alias of
UUID
- class examples.aggregate10.domainmodel.DogSnapshotState(created_on: datetime, modified_on: datetime, name: str, tricks: list[Trick])[source]¶
Bases:
SnapshotState
- name: str¶
- class examples.aggregate10.domainmodel.Dog(*args: Any, **kwargs: Any)[source]¶
Bases:
Aggregate
- class examples.aggregate10.application.DogSchool(env: Mapping[str, str] | None = None)[source]¶
Bases:
MsgspecApplication
- is_snapshotting_enabled: bool = True¶
- name = 'DogSchool'¶