Aggregate 9 - msgspec structs¶
This example shows how to use msgspec structs to define immutable aggregate and event classes.
Like with Pydantic, the main advantage of using msgspec here is that any custom value objects
used in the domain model will be automatically serialised and deserialised, without needing
also to define custom transcoding classes. This is demonstrated in the
example below with the Trick
class, which is used
in both aggregate events and aggregate state, and which is reconstructed from serialised string
values, representing only the name of the trick, from both recorded aggregate events and from
recorded snapshots.
The advantage of msgspec structs over Pydantic v2 is performance. The tables below show relative performance of msgspec, Pydantic v2, and the Python Standard Library for mapping between domain events and stored events. The benchmarks were done with pytest-benchmark.
Name |
Encode time (ns) |
OPS (Kops/s) |
---|---|---|
msgspec |
1121 (1.0x) |
862 (1.0x) |
pydantic |
2688 (2.40x) |
352 (0.41x) |
json |
5083 (4.54x) |
184 (0.21x) |
Name |
Decode time (ns) |
OPS (Kops/s) |
---|---|---|
msgspec |
679 (1.0x) |
1416 (1.0x) |
pydantic |
2750 (4.05x) |
346 (0.24x) |
json |
3208 (4.72x) |
296 (0.21x) |
Msgspec mapper¶
The MsgspecMapper
class is a mapper that supports
msgspec structs. It is responsible for converting domain model objects to Python bytes objects, and for
reconstructing model objects from Python bytes objects.
class MsgspecMapper(Mapper[UUID]):
def to_stored_event(self, domain_event: DomainEventProtocol[UUID]) -> StoredEvent:
topic = get_topic(domain_event.__class__)
stored_state = msgspec.json.encode(domain_event)
if self.compressor:
stored_state = self.compressor.compress(stored_state)
if self.cipher:
stored_state = self.cipher.encrypt(stored_state)
return StoredEvent(
originator_id=domain_event.originator_id,
originator_version=domain_event.originator_version,
topic=topic,
state=stored_state,
)
def to_domain_event(self, stored_event: StoredEvent) -> DomainEventProtocol[UUID]:
stored_state = stored_event.state
if self.cipher:
stored_state = self.cipher.decrypt(stored_state)
if self.compressor:
stored_state = self.compressor.decompress(stored_state)
cls = resolve_topic(stored_event.topic)
return msgspec.json.decode(stored_state, type=cls)
The MsgspecApplication
class is a
subclass of the library’s Application
class
which is configured to use MsgspecMapper
.
class MsgspecApplication(Application[UUID]):
env: ClassVar[dict[str, str]] = {
"MAPPER_TOPIC": get_topic(MsgspecMapper),
"TRANSCODER_TOPIC": get_topic(NullTranscoder),
}
Msgspec model for immutable aggregate¶
The code below shows how to define base classes for immutable aggregates that use msgspec structs.
from __future__ import annotations
from datetime import datetime # noqa: TC003
from typing import TYPE_CHECKING, Callable, Optional, TypeVar
from uuid import UUID # noqa: TC003
import msgspec
from eventsourcing.domain import datetime_now_with_tzinfo
from eventsourcing.utils import get_topic
if TYPE_CHECKING:
from collections.abc import Iterable
class Immutable(msgspec.Struct, frozen=True):
pass
class DomainEvent(Immutable, frozen=True):
originator_id: UUID
originator_version: int
timestamp: datetime
class Aggregate(Immutable, frozen=True):
id: UUID
version: int
created_on: datetime
modified_on: datetime
class Snapshot(DomainEvent, frozen=True):
topic: str
state: bytes
@classmethod
def take(cls, aggregate: Aggregate) -> Snapshot:
return Snapshot(
originator_id=aggregate.id,
originator_version=aggregate.version,
timestamp=datetime_now_with_tzinfo(),
topic=get_topic(type(aggregate)),
state=msgspec.json.encode(aggregate),
)
TAggregate = TypeVar("TAggregate", bound=Aggregate)
MutatorFunction = Callable[..., Optional[TAggregate]]
def aggregate_projector(
mutator: MutatorFunction[TAggregate],
) -> Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None]:
def project_aggregate(
aggregate: TAggregate | None, events: Iterable[DomainEvent]
) -> TAggregate | None:
for event in events:
aggregate = mutator(event, aggregate)
return aggregate
return project_aggregate
Domain model¶
The code below shows how to define an immutable aggregate in a functional style, using the msgspec module for immutable aggregates
from __future__ import annotations
from functools import singledispatch
from uuid import uuid4
import msgspec.json
from eventsourcing.domain import datetime_now_with_tzinfo
from examples.aggregate9.immutablemodel import (
Aggregate,
DomainEvent,
Immutable,
Snapshot,
aggregate_projector,
)
class Trick(Immutable, frozen=True):
name: str
class Dog(Aggregate, frozen=True):
name: str
tricks: tuple[Trick, ...]
class DogRegistered(DomainEvent, frozen=True):
name: str
class TrickAdded(DomainEvent, frozen=True):
trick: Trick
def register_dog(name: str) -> DomainEvent:
return DogRegistered(
originator_id=uuid4(),
originator_version=1,
timestamp=datetime_now_with_tzinfo(),
name=name,
)
def add_trick(dog: Dog, trick: Trick) -> DomainEvent:
return TrickAdded(
originator_id=dog.id,
originator_version=dog.version + 1,
timestamp=datetime_now_with_tzinfo(),
trick=trick,
)
@singledispatch
def mutate_dog(_: DomainEvent, __: Dog | None) -> Dog | None:
"""Mutates aggregate with event."""
@mutate_dog.register
def _(event: DogRegistered, _: None) -> Dog:
return Dog(
id=event.originator_id,
version=event.originator_version,
created_on=event.timestamp,
modified_on=event.timestamp,
name=event.name,
tricks=(),
)
@mutate_dog.register
def _(event: TrickAdded, dog: Dog) -> Dog:
return Dog(
id=dog.id,
version=event.originator_version,
created_on=dog.created_on,
modified_on=event.timestamp,
name=dog.name,
tricks=(*dog.tricks, event.trick),
)
@mutate_dog.register
def _(event: Snapshot, _: None) -> Dog:
return msgspec.json.decode(event.state, type=Dog)
project_dog = aggregate_projector(mutate_dog)
Application¶
The DogSchool
application in this example uses the
MsgspecApplication
. It must receive the new events that are returned
by the aggregate command methods, and pass them to its save()
method. The aggregate projector function must also be supplied when reconstructing an aggregate from the
repository, and when taking snapshots.
class DogSchool(MsgspecApplication):
is_snapshotting_enabled = True
snapshot_class = Snapshot
def register_dog(self, name: str) -> UUID:
event = register_dog(name)
self.save(event)
return event.originator_id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog = self.repository.get(dog_id, projector_func=project_dog)
self.save(add_trick(dog, Trick(name=trick)))
def get_dog(self, dog_id: UUID) -> dict[str, Any]:
dog = self.repository.get(dog_id, projector_func=project_dog)
return {
"id": dog.id,
"name": dog.name,
"tricks": tuple([t.name for t in dog.tricks]),
"created_on": dog.created_on,
"modified_on": dog.modified_on,
}
Test case¶
The TestDogSchool
test case shows how the
DogSchool
application can be used.
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead"))
self.assertIsInstance(dog["id"], UUID)
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
school.take_snapshot(dog_id, version=3, projector_func=project_dog)
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead"))
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead", "fetch ball"))
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
Code reference¶
- class examples.aggregate9.immutablemodel.DomainEvent(originator_id: UUID, originator_version: int, timestamp: datetime)[source]¶
Bases:
Immutable
- originator_id: UUID¶
- originator_version: int¶
- timestamp: datetime¶
- class examples.aggregate9.immutablemodel.Aggregate(id: UUID, version: int, created_on: datetime, modified_on: datetime)[source]¶
Bases:
Immutable
- id: UUID¶
- version: int¶
- created_on: datetime¶
- modified_on: datetime¶
- class examples.aggregate9.immutablemodel.Snapshot(originator_id: UUID, originator_version: int, timestamp: datetime, topic: str, state: bytes)[source]¶
Bases:
DomainEvent
- topic: str¶
- state: bytes¶
- examples.aggregate9.immutablemodel.aggregate_projector(mutator: MutatorFunction[TAggregate]) Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None] [source]¶
- class examples.aggregate9.msgspecstructs.MsgspecMapper(transcoder: Transcoder, compressor: Compressor | None = None, cipher: Cipher | None = None)[source]¶
Bases:
Mapper
[UUID
]- to_stored_event(domain_event: DomainEventProtocol[UUID]) StoredEvent [source]¶
Converts the given domain event to a
StoredEvent
object.
- to_domain_event(stored_event: StoredEvent) DomainEventProtocol[UUID] [source]¶
Converts the given
StoredEvent
to a domain event object.
- class examples.aggregate9.msgspecstructs.NullTranscoder[source]¶
Bases:
Transcoder
- class examples.aggregate9.msgspecstructs.MsgspecApplication(env: Mapping[str, str] | None = None)[source]¶
Bases:
Application
[UUID
]- env: ClassVar[dict[str, str]] = {'MAPPER_TOPIC': 'examples.aggregate9.msgspecstructs:MsgspecMapper', 'TRANSCODER_TOPIC': 'examples.aggregate9.msgspecstructs:NullTranscoder'}¶
- name = 'MsgspecApplication'¶
- class examples.aggregate9.domainmodel.Dog(id: UUID, version: int, created_on: datetime, modified_on: datetime, name: str, tricks: tuple[Trick, ...])[source]¶
Bases:
Aggregate
- name: str¶
- class examples.aggregate9.domainmodel.DogRegistered(originator_id: UUID, originator_version: int, timestamp: datetime, name: str)[source]¶
Bases:
DomainEvent
- name: str¶
- class examples.aggregate9.domainmodel.TrickAdded(originator_id: UUID, originator_version: int, timestamp: datetime, trick: Trick)[source]¶
Bases:
DomainEvent
- examples.aggregate9.domainmodel.register_dog(name: str) DomainEvent [source]¶
- examples.aggregate9.domainmodel.add_trick(dog: Dog, trick: Trick) DomainEvent [source]¶
- examples.aggregate9.domainmodel.mutate_dog(_: DomainEvent, __: Dog | None) Dog | None [source]¶
- examples.aggregate9.domainmodel.mutate_dog(event: DogRegistered, _: None) Dog
- examples.aggregate9.domainmodel.mutate_dog(event: TrickAdded, dog: Dog) Dog
- examples.aggregate9.domainmodel.mutate_dog(event: Snapshot, _: None) Dog
Mutates aggregate with event.
- class examples.aggregate9.application.DogSchool(env: Mapping[str, str] | None = None)[source]¶
Bases:
MsgspecApplication
- is_snapshotting_enabled: bool = True¶
- name = 'DogSchool'¶