Aggregate 9 - msgspec structs

This example shows how to use msgspec structs to define immutable aggregate and event classes.

Like with Pydantic, the main advantage of using msgspec here is that any custom value objects used in the domain model will be automatically serialised and deserialised, without needing also to define custom transcoding classes. This is demonstrated in the example below with the Trick class, which is used in both aggregate events and aggregate state, and which is reconstructed from serialised string values, representing only the name of the trick, from both recorded aggregate events and from recorded snapshots.

The advantage of msgspec structs over Pydantic v2 is performance. The tables below show relative performance of msgspec, Pydantic v2, and the Python Standard Library for mapping between domain events and stored events. The benchmarks were done with pytest-benchmark.

Encoding domain events to stored events

Name

Encode time (ns)

OPS (Kops/s)

msgspec

1121 (1.0x)

862 (1.0x)

pydantic

2688 (2.40x)

352 (0.41x)

json

5083 (4.54x)

184 (0.21x)

Decoding stored events to domain events

Name

Decode time (ns)

OPS (Kops/s)

msgspec

679 (1.0x)

1416 (1.0x)

pydantic

2750 (4.05x)

346 (0.24x)

json

3208 (4.72x)

296 (0.21x)

Msgspec mapper

The MsgspecMapper class is a mapper that supports msgspec structs. It is responsible for converting domain model objects to Python bytes objects, and for reconstructing model objects from Python bytes objects.

class MsgspecMapper(Mapper[UUID]):
    def to_stored_event(self, domain_event: DomainEventProtocol[UUID]) -> StoredEvent:
        topic = get_topic(domain_event.__class__)
        stored_state = msgspec.json.encode(domain_event)
        if self.compressor:
            stored_state = self.compressor.compress(stored_state)
        if self.cipher:
            stored_state = self.cipher.encrypt(stored_state)
        return StoredEvent(
            originator_id=domain_event.originator_id,
            originator_version=domain_event.originator_version,
            topic=topic,
            state=stored_state,
        )

    def to_domain_event(self, stored_event: StoredEvent) -> DomainEventProtocol[UUID]:
        stored_state = stored_event.state
        if self.cipher:
            stored_state = self.cipher.decrypt(stored_state)
        if self.compressor:
            stored_state = self.compressor.decompress(stored_state)
        cls = resolve_topic(stored_event.topic)
        return msgspec.json.decode(stored_state, type=cls)

The MsgspecApplication class is a subclass of the library’s Application class which is configured to use MsgspecMapper.

class MsgspecApplication(Application[UUID]):
    env: ClassVar[dict[str, str]] = {
        "MAPPER_TOPIC": get_topic(MsgspecMapper),
        "TRANSCODER_TOPIC": get_topic(NullTranscoder),
    }

Msgspec model for immutable aggregate

The code below shows how to define base classes for immutable aggregates that use msgspec structs.

from __future__ import annotations

from datetime import datetime  # noqa: TC003
from typing import TYPE_CHECKING, Callable, Optional, TypeVar
from uuid import UUID  # noqa: TC003

import msgspec

from eventsourcing.domain import datetime_now_with_tzinfo
from eventsourcing.utils import get_topic

if TYPE_CHECKING:
    from collections.abc import Iterable


class Immutable(msgspec.Struct, frozen=True):
    pass


class DomainEvent(Immutable, frozen=True):
    originator_id: UUID
    originator_version: int
    timestamp: datetime


class Aggregate(Immutable, frozen=True):
    id: UUID
    version: int
    created_on: datetime
    modified_on: datetime


class Snapshot(DomainEvent, frozen=True):
    topic: str
    state: bytes

    @classmethod
    def take(cls, aggregate: Aggregate) -> Snapshot:
        return Snapshot(
            originator_id=aggregate.id,
            originator_version=aggregate.version,
            timestamp=datetime_now_with_tzinfo(),
            topic=get_topic(type(aggregate)),
            state=msgspec.json.encode(aggregate),
        )


TAggregate = TypeVar("TAggregate", bound=Aggregate)

MutatorFunction = Callable[..., Optional[TAggregate]]


def aggregate_projector(
    mutator: MutatorFunction[TAggregate],
) -> Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None]:
    def project_aggregate(
        aggregate: TAggregate | None, events: Iterable[DomainEvent]
    ) -> TAggregate | None:
        for event in events:
            aggregate = mutator(event, aggregate)
        return aggregate

    return project_aggregate

Domain model

The code below shows how to define an immutable aggregate in a functional style, using the msgspec module for immutable aggregates

from __future__ import annotations

from functools import singledispatch
from uuid import uuid4

import msgspec.json

from eventsourcing.domain import datetime_now_with_tzinfo
from examples.aggregate9.immutablemodel import (
    Aggregate,
    DomainEvent,
    Immutable,
    Snapshot,
    aggregate_projector,
)


class Trick(Immutable, frozen=True):
    name: str


class Dog(Aggregate, frozen=True):
    name: str
    tricks: tuple[Trick, ...]


class DogRegistered(DomainEvent, frozen=True):
    name: str


class TrickAdded(DomainEvent, frozen=True):
    trick: Trick


def register_dog(name: str) -> DomainEvent:
    return DogRegistered(
        originator_id=uuid4(),
        originator_version=1,
        timestamp=datetime_now_with_tzinfo(),
        name=name,
    )


def add_trick(dog: Dog, trick: Trick) -> DomainEvent:
    return TrickAdded(
        originator_id=dog.id,
        originator_version=dog.version + 1,
        timestamp=datetime_now_with_tzinfo(),
        trick=trick,
    )


@singledispatch
def mutate_dog(_: DomainEvent, __: Dog | None) -> Dog | None:
    """Mutates aggregate with event."""


@mutate_dog.register
def _(event: DogRegistered, _: None) -> Dog:
    return Dog(
        id=event.originator_id,
        version=event.originator_version,
        created_on=event.timestamp,
        modified_on=event.timestamp,
        name=event.name,
        tricks=(),
    )


@mutate_dog.register
def _(event: TrickAdded, dog: Dog) -> Dog:
    return Dog(
        id=dog.id,
        version=event.originator_version,
        created_on=dog.created_on,
        modified_on=event.timestamp,
        name=dog.name,
        tricks=(*dog.tricks, event.trick),
    )


@mutate_dog.register
def _(event: Snapshot, _: None) -> Dog:
    return msgspec.json.decode(event.state, type=Dog)


project_dog = aggregate_projector(mutate_dog)

Application

The DogSchool application in this example uses the MsgspecApplication. It must receive the new events that are returned by the aggregate command methods, and pass them to its save() method. The aggregate projector function must also be supplied when reconstructing an aggregate from the repository, and when taking snapshots.

class DogSchool(MsgspecApplication):
    is_snapshotting_enabled = True
    snapshot_class = Snapshot

    def register_dog(self, name: str) -> UUID:
        event = register_dog(name)
        self.save(event)
        return event.originator_id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog = self.repository.get(dog_id, projector_func=project_dog)
        self.save(add_trick(dog, Trick(name=trick)))

    def get_dog(self, dog_id: UUID) -> dict[str, Any]:
        dog = self.repository.get(dog_id, projector_func=project_dog)
        return {
            "id": dog.id,
            "name": dog.name,
            "tricks": tuple([t.name for t in dog.tricks]),
            "created_on": dog.created_on,
            "modified_on": dog.modified_on,
        }

Test case

The TestDogSchool test case shows how the DogSchool application can be used.

class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead"))
        self.assertIsInstance(dog["id"], UUID)
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        school.take_snapshot(dog_id, version=3, projector_func=project_dog)
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead", "fetch ball"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

Code reference

class examples.aggregate9.immutablemodel.Immutable[source]

Bases: Struct

class examples.aggregate9.immutablemodel.DomainEvent(originator_id: UUID, originator_version: int, timestamp: datetime)[source]

Bases: Immutable

originator_id: UUID
originator_version: int
timestamp: datetime
class examples.aggregate9.immutablemodel.Aggregate(id: UUID, version: int, created_on: datetime, modified_on: datetime)[source]

Bases: Immutable

id: UUID
version: int
created_on: datetime
modified_on: datetime
class examples.aggregate9.immutablemodel.Snapshot(originator_id: UUID, originator_version: int, timestamp: datetime, topic: str, state: bytes)[source]

Bases: DomainEvent

topic: str
state: bytes
classmethod take(aggregate: Aggregate) Snapshot[source]
examples.aggregate9.immutablemodel.aggregate_projector(mutator: MutatorFunction[TAggregate]) Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None][source]
class examples.aggregate9.msgspecstructs.MsgspecMapper(transcoder: Transcoder, compressor: Compressor | None = None, cipher: Cipher | None = None)[source]

Bases: Mapper[UUID]

to_stored_event(domain_event: DomainEventProtocol[UUID]) StoredEvent[source]

Converts the given domain event to a StoredEvent object.

to_domain_event(stored_event: StoredEvent) DomainEventProtocol[UUID][source]

Converts the given StoredEvent to a domain event object.

class examples.aggregate9.msgspecstructs.NullTranscoder[source]

Bases: Transcoder

encode(obj: Any) bytes[source]

Encodes obj as bytes.

decode(data: bytes) Any[source]

Decodes obj from bytes.

class examples.aggregate9.msgspecstructs.MsgspecApplication(env: Mapping[str, str] | None = None)[source]

Bases: Application[UUID]

env: ClassVar[dict[str, str]] = {'MAPPER_TOPIC': 'examples.aggregate9.msgspecstructs:MsgspecMapper', 'TRANSCODER_TOPIC': 'examples.aggregate9.msgspecstructs:NullTranscoder'}
name = 'MsgspecApplication'
class examples.aggregate9.domainmodel.Trick(name: str)[source]

Bases: Immutable

name: str
class examples.aggregate9.domainmodel.Dog(id: UUID, version: int, created_on: datetime, modified_on: datetime, name: str, tricks: tuple[Trick, ...])[source]

Bases: Aggregate

name: str
tricks: tuple[Trick, ...]
class examples.aggregate9.domainmodel.DogRegistered(originator_id: UUID, originator_version: int, timestamp: datetime, name: str)[source]

Bases: DomainEvent

name: str
class examples.aggregate9.domainmodel.TrickAdded(originator_id: UUID, originator_version: int, timestamp: datetime, trick: Trick)[source]

Bases: DomainEvent

trick: Trick
examples.aggregate9.domainmodel.register_dog(name: str) DomainEvent[source]
examples.aggregate9.domainmodel.add_trick(dog: Dog, trick: Trick) DomainEvent[source]
examples.aggregate9.domainmodel.mutate_dog(_: DomainEvent, __: Dog | None) Dog | None[source]
examples.aggregate9.domainmodel.mutate_dog(event: DogRegistered, _: None) Dog
examples.aggregate9.domainmodel.mutate_dog(event: TrickAdded, dog: Dog) Dog
examples.aggregate9.domainmodel.mutate_dog(event: Snapshot, _: None) Dog

Mutates aggregate with event.

class examples.aggregate9.application.DogSchool(env: Mapping[str, str] | None = None)[source]

Bases: MsgspecApplication

is_snapshotting_enabled: bool = True
snapshot_class

alias of Snapshot

register_dog(name: str) UUID[source]
add_trick(dog_id: UUID, trick: str) None[source]
get_dog(dog_id: UUID) dict[str, Any][source]
name = 'DogSchool'
class examples.aggregate9.test_application.TestDogSchool(methodName='runTest')[source]

Bases: TestCase

test_dog_school() None[source]