Aggregate 10 - msgspec with declarative syntax

This example shows how to use msgspec with the library’s declarative syntax.

Similar to example 1, aggregates are expressed using the library’s declarative syntax. This is the most concise way of defining an event-sourced aggregate.

Similar to example 9, domain event and custom value objects are defined using msgspec. The main advantage of using msgspec here is that any custom value objects used in the domain model will be automatically serialised and deserialised, without needing also to define custom transcoding classes. The advantage of msgspec structs over Pydantic v2 is performance.

msgspec model for mutable aggregate

The code below shows how to define base classes for mutable aggregates that use msgspec.

from __future__ import annotations

import typing
from datetime import datetime  # noqa: TC003
from typing import TYPE_CHECKING, Any, cast
from uuid import UUID, uuid4

from eventsourcing.domain import (
    BaseAggregate,
    CanInitAggregate,
    CanMutateAggregate,
    CanSnapshotAggregate,
    datetime_now_with_tzinfo,
)
from eventsourcing.utils import get_topic, resolve_topic
from examples.aggregate9.immutablemodel import DomainEvent, Immutable

if TYPE_CHECKING:
    from eventsourcing.domain import MutableOrImmutableAggregate


class SnapshotState(Immutable, frozen=True):
    created_on: datetime
    modified_on: datetime


class AggregateSnapshot(DomainEvent, CanSnapshotAggregate[UUID], frozen=True):
    topic: str
    state: Any

    @classmethod
    def take(cls, aggregate: MutableOrImmutableAggregate[UUID]) -> AggregateSnapshot:
        type_of_snapshot_state = typing.get_type_hints(cls)["state"]
        aggregate_state = dict(aggregate.__dict__)
        aggregate_state.pop("_id")
        aggregate_state.pop("_version")
        aggregate_state["created_on"] = aggregate_state.pop("_created_on")
        aggregate_state["modified_on"] = aggregate_state.pop("_modified_on")
        aggregate_state.pop("_pending_events")
        snapshot_state = type_of_snapshot_state(**aggregate_state)
        return cls(
            originator_id=aggregate.id,
            originator_version=aggregate.version,
            timestamp=datetime_now_with_tzinfo(),
            topic=get_topic(type(aggregate)),
            state=snapshot_state,
        )

    def mutate(self, _: None) -> Aggregate:
        """Reconstructs the snapshotted :class:`Aggregate` object."""
        cls = cast("type[Aggregate]", resolve_topic(self.topic))
        aggregate_state: dict[str, Any] = {
            key: getattr(self.state, key) for key in type(self.state).__struct_fields__
        }
        aggregate_state["_id"] = self.originator_id
        aggregate_state["_version"] = self.originator_version
        aggregate_state["_created_on"] = self.state.created_on
        aggregate_state["_modified_on"] = self.state.modified_on
        aggregate_state["_version"] = self.originator_version
        aggregate_state["_pending_events"] = []
        aggregate = object.__new__(cls)
        aggregate.__dict__.update(aggregate_state)
        return aggregate


class AggregateEvent(DomainEvent, CanMutateAggregate[UUID], frozen=True):
    def _as_dict(self) -> dict[str, Any]:
        return {key: getattr(self, key) for key in self.__struct_fields__}


class Aggregate(BaseAggregate[UUID]):
    @classmethod
    def create_id(cls, *_: Any, **__: Any) -> UUID:
        return uuid4()

    class Event(AggregateEvent, frozen=True):
        pass

    class Created(Event, CanInitAggregate[UUID], frozen=True):
        originator_topic: str

Domain model

The code below shows how to define a mutable aggregate with the library’s declarative syntax, using the msgspec module for mutable aggregates

from __future__ import annotations

from eventsourcing.domain import event
from examples.aggregate9.immutablemodel import Immutable
from examples.aggregate10.mutablemodel import (
    Aggregate,
    AggregateSnapshot,
    SnapshotState,
)


class Trick(Immutable, frozen=True):
    name: str


class DogSnapshotState(SnapshotState, frozen=True):
    name: str
    tricks: list[Trick]


class Dog(Aggregate):
    class Snapshot(AggregateSnapshot, frozen=True):
        state: DogSnapshotState

    @event("Registered")
    def __init__(self, name: str) -> None:
        self.name = name
        self.tricks: list[Trick] = []

    @event("TrickAdded")
    def add_trick(self, trick: Trick) -> None:
        self.tricks.append(trick)

Application

The DogSchool application in this example uses the MsgspecApplication class from example 9.

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from examples.aggregate9.msgspecstructs import MsgspecApplication
from examples.aggregate10.domainmodel import Dog, Trick

if TYPE_CHECKING:
    from uuid import UUID


class DogSchool(MsgspecApplication):
    is_snapshotting_enabled = True

    def register_dog(self, name: str) -> UUID:
        dog = Dog(name)
        self.save(dog)
        return dog.id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog: Dog = self.repository.get(dog_id)
        dog.add_trick(Trick(name=trick))
        self.save(dog)

    def get_dog(self, dog_id: UUID) -> dict[str, Any]:
        dog: Dog = self.repository.get(dog_id)
        return {
            "name": dog.name,
            "tricks": tuple([t.name for t in dog.tricks]),
            "created_on": dog.created_on,
            "modified_on": dog.modified_on,
        }

Test case

The TestDogSchool test case shows how the DogSchool application can be used.

from __future__ import annotations

from datetime import datetime
from unittest import TestCase

from examples.aggregate10.application import DogSchool


class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        school.take_snapshot(dog_id, version=3)
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead", "fetch ball"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

Code reference

class examples.aggregate10.mutablemodel.SnapshotState(created_on: datetime, modified_on: datetime)[source]

Bases: Immutable

created_on: datetime
modified_on: datetime
class examples.aggregate10.mutablemodel.AggregateSnapshot(originator_id: UUID, originator_version: int, timestamp: datetime, topic: str, state: Any)[source]

Bases: DomainEvent, CanSnapshotAggregate[UUID]

topic: str
state: Any
classmethod take(aggregate: MutableOrImmutableAggregate[UUID]) AggregateSnapshot[source]

Creates a snapshot of the given Aggregate object.

mutate(_: None) Aggregate[source]

Reconstructs the snapshotted Aggregate object.

originator_id_type

alias of UUID

class examples.aggregate10.mutablemodel.AggregateEvent(originator_id: UUID, originator_version: int, timestamp: datetime)[source]

Bases: DomainEvent, CanMutateAggregate[UUID]

originator_id_type

alias of UUID

class examples.aggregate10.mutablemodel.Aggregate(*args: Any, **kwargs: Any)[source]

Bases: BaseAggregate[UUID]

classmethod create_id(*_: Any, **__: Any) UUID[source]

Returns a new aggregate ID.

class Event(originator_id: UUID, originator_version: int, timestamp: datetime)[source]

Bases: AggregateEvent

originator_id_type

alias of UUID

class Created(originator_id: UUID, originator_version: int, timestamp: datetime, originator_topic: str)[source]

Bases: Event, CanInitAggregate[UUID]

originator_topic: str

String describing the path to an aggregate class.

originator_id_type

alias of UUID

class examples.aggregate10.domainmodel.Trick(name: str)[source]

Bases: Immutable

name: str
class examples.aggregate10.domainmodel.DogSnapshotState(created_on: datetime, modified_on: datetime, name: str, tricks: list[Trick])[source]

Bases: SnapshotState

name: str
tricks: list[Trick]
class examples.aggregate10.domainmodel.Dog(*args: Any, **kwargs: Any)[source]

Bases: Aggregate

add_trick = <eventsourcing.domain.UnboundCommandMethodDecorator object>[source]
class Event(originator_id: UUID, originator_version: int, timestamp: datetime)

Bases: Event

originator_id_type

alias of UUID

class TrickAdded(originator_id: UUID, originator_version: int, timestamp: datetime, trick: Trick)

Bases: DecoratorEvent, Event

trick: Trick
class examples.aggregate10.application.DogSchool(env: Mapping[str, str] | None = None)[source]

Bases: MsgspecApplication

is_snapshotting_enabled: bool = True
register_dog(name: str) UUID[source]
add_trick(dog_id: UUID, trick: str) None[source]
get_dog(dog_id: UUID) dict[str, Any][source]
name = 'DogSchool'
class examples.aggregate10.test_application.TestDogSchool(methodName='runTest')[source]

Bases: TestCase

test_dog_school() None[source]