Aggregate 7 - Pydantic and orjson

This example shows how to use Pydantic to define immutable aggregate and event classes.

The main advantage of using Pydantic here is that any custom value objects used in the domain model will be automatically serialised and deserialised, without needing also to define custom transcoding classes.

This approach is demonstrated in the example below with the Trick class, which is used in both aggregate events and aggregate state, and which is reconstructed from serialised string values, representing only the name of the trick, from both recorded aggregate events and from recorded snapshots.

Pydantic mapper and orjson transcoder

The application class in this example uses a mapper that supports Pydantic and a transcoder that uses orjson.

The PydanticMapper class is a mapper that supports Pydantic. It is responsible for converting domain model objects to object types that orjson can serialise, and for reconstructing model objects from JSON objects that have been deserialised by orjson.

class PydanticMapper(Mapper[UUID]):
    def to_stored_event(self, domain_event: DomainEventProtocol[UUID]) -> StoredEvent:
        topic = get_topic(domain_event.__class__)
        event_state = cast(BaseModel, domain_event).model_dump(mode="json")
        stored_state = self.transcoder.encode(event_state)
        if self.compressor:
            stored_state = self.compressor.compress(stored_state)
        if self.cipher:
            stored_state = self.cipher.encrypt(stored_state)
        return StoredEvent(
            originator_id=domain_event.originator_id,
            originator_version=domain_event.originator_version,
            topic=topic,
            state=stored_state,
        )

    def to_domain_event(self, stored_event: StoredEvent) -> DomainEventProtocol[UUID]:
        stored_state = stored_event.state
        if self.cipher:
            stored_state = self.cipher.decrypt(stored_state)
        if self.compressor:
            stored_state = self.compressor.decompress(stored_state)
        event_state: dict[str, Any] = self.transcoder.decode(stored_state)
        cls = resolve_topic(stored_event.topic)
        return cls(**event_state)

The OrjsonTranscoder class is a transcoder that uses orjson, possibly the fastest JSON transcoder available in Python.

class OrjsonTranscoder(Transcoder):
    def encode(self, obj: Any) -> bytes:
        return orjson.dumps(obj)

    def decode(self, data: bytes) -> Any:
        return orjson.loads(data)

The PydanticApplication class is a subclass of the library’s Application class which is configured to use PydanticMapper and OrjsonTranscoder.

class PydanticApplication(Application[UUID]):
    env: ClassVar[dict[str, str]] = {
        "TRANSCODER_TOPIC": get_topic(OrjsonTranscoder),
        "MAPPER_TOPIC": get_topic(PydanticMapper),
    }

Pydantic model for immutable aggregate

The code below shows how to define base classes for immutable aggregates that use Pydantic.

from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar
from uuid import UUID

from pydantic import BaseModel, ConfigDict

from eventsourcing.domain import datetime_now_with_tzinfo
from eventsourcing.utils import get_topic

if TYPE_CHECKING:
    from collections.abc import Iterable


class Immutable(BaseModel):
    model_config = ConfigDict(extra="forbid", frozen=True)


class DomainEvent(Immutable):
    originator_id: UUID
    originator_version: int
    timestamp: datetime


class Aggregate(Immutable):
    id: UUID
    version: int
    created_on: datetime
    modified_on: datetime


class Snapshot(DomainEvent):
    topic: str
    state: dict[str, Any]

    @classmethod
    def take(cls, aggregate: Aggregate) -> Snapshot:
        return Snapshot(
            originator_id=aggregate.id,
            originator_version=aggregate.version,
            timestamp=datetime_now_with_tzinfo(),
            topic=get_topic(type(aggregate)),
            state=aggregate.model_dump(),
        )


TAggregate = TypeVar("TAggregate", bound=Aggregate)

MutatorFunction = Callable[..., Optional[TAggregate]]


def aggregate_projector(
    mutator: MutatorFunction[TAggregate],
) -> Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None]:
    def project_aggregate(
        aggregate: TAggregate | None, events: Iterable[DomainEvent]
    ) -> TAggregate | None:
        for event in events:
            aggregate = mutator(event, aggregate)
        return aggregate

    return project_aggregate

Domain model

The code below shows how to define an immutable aggregate in a functional style, using the Pydantic module for immutable aggregates.

from __future__ import annotations

from functools import singledispatch
from uuid import uuid4

from eventsourcing.domain import datetime_now_with_tzinfo
from examples.aggregate7.immutablemodel import (
    Aggregate,
    DomainEvent,
    Immutable,
    Snapshot,
    aggregate_projector,
)


class Trick(Immutable):
    name: str


class Dog(Aggregate):
    name: str
    tricks: tuple[Trick, ...]


class DogRegistered(DomainEvent):
    name: str


class TrickAdded(DomainEvent):
    trick: Trick


def register_dog(name: str) -> DomainEvent:
    return DogRegistered(
        originator_id=uuid4(),
        originator_version=1,
        timestamp=datetime_now_with_tzinfo(),
        name=name,
    )


def add_trick(dog: Dog, trick: Trick) -> DomainEvent:
    return TrickAdded(
        originator_id=dog.id,
        originator_version=dog.version + 1,
        timestamp=datetime_now_with_tzinfo(),
        trick=trick,
    )


@singledispatch
def mutate_dog(_: DomainEvent, __: Dog | None) -> Dog | None:
    """Mutates aggregate with event."""


@mutate_dog.register
def _(event: DogRegistered, _: None) -> Dog:
    return Dog(
        id=event.originator_id,
        version=event.originator_version,
        created_on=event.timestamp,
        modified_on=event.timestamp,
        name=event.name,
        tricks=(),
    )


@mutate_dog.register
def _(event: TrickAdded, dog: Dog) -> Dog:
    return Dog(
        id=dog.id,
        version=event.originator_version,
        created_on=dog.created_on,
        modified_on=event.timestamp,
        name=dog.name,
        tricks=(*dog.tricks, event.trick),
    )


@mutate_dog.register
def _(event: Snapshot, _: None) -> Dog:
    return Dog(
        id=event.state["id"],
        version=event.state["version"],
        created_on=event.state["created_on"],
        modified_on=event.state["modified_on"],
        name=event.state["name"],
        tricks=event.state["tricks"],
    )


project_dog = aggregate_projector(mutate_dog)

Application

The DogSchool application in this example uses the PydanticApplication. It must receive the new events that are returned by the aggregate command methods, and pass them to its save() method. The aggregate projector function must also be supplied when reconstructing an aggregate from the repository, and when taking snapshots.

class DogSchool(PydanticApplication):
    is_snapshotting_enabled = True
    snapshot_class = Snapshot

    def register_dog(self, name: str) -> UUID:
        event = register_dog(name)
        self.save(event)
        return event.originator_id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog = self.repository.get(dog_id, projector_func=project_dog)
        self.save(add_trick(dog, Trick(name=trick)))

    def get_dog(self, dog_id: UUID) -> dict[str, Any]:
        dog = self.repository.get(dog_id, projector_func=project_dog)
        return {
            "name": dog.name,
            "tricks": tuple([t.name for t in dog.tricks]),
            "created_on": dog.created_on,
            "modified_on": dog.modified_on,
        }

Test case

The TestDogSchool test case shows how the DogSchool application can be used.

class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        school.take_snapshot(dog_id, version=3, projector_func=project_dog)
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead", "fetch ball"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

Code reference

class examples.aggregate7.immutablemodel.Immutable[source]

Bases: BaseModel

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_fields: ClassVar[Dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class examples.aggregate7.immutablemodel.DomainEvent(*, originator_id: UUID, originator_version: int, timestamp: datetime)[source]

Bases: Immutable

originator_id: UUID
originator_version: int
timestamp: datetime
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class examples.aggregate7.immutablemodel.Aggregate(*, id: UUID, version: int, created_on: datetime, modified_on: datetime)[source]

Bases: Immutable

id: UUID
version: int
created_on: datetime
modified_on: datetime
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'created_on': FieldInfo(annotation=datetime, required=True), 'id': FieldInfo(annotation=UUID, required=True), 'modified_on': FieldInfo(annotation=datetime, required=True), 'version': FieldInfo(annotation=int, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class examples.aggregate7.immutablemodel.Snapshot(*, originator_id: UUID, originator_version: int, timestamp: datetime, topic: str, state: dict[str, Any])[source]

Bases: DomainEvent

topic: str
state: dict[str, Any]
classmethod take(aggregate: Aggregate) Snapshot[source]
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'state': FieldInfo(annotation=dict[str, Any], required=True), 'timestamp': FieldInfo(annotation=datetime, required=True), 'topic': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

examples.aggregate7.immutablemodel.aggregate_projector(mutator: MutatorFunction[TAggregate]) Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None][source]
class examples.aggregate7.orjsonpydantic.PydanticMapper(transcoder: Transcoder, compressor: Compressor | None = None, cipher: Cipher | None = None)[source]

Bases: Mapper[UUID]

to_stored_event(domain_event: DomainEventProtocol[UUID]) StoredEvent[source]

Converts the given domain event to a StoredEvent object.

to_domain_event(stored_event: StoredEvent) DomainEventProtocol[UUID][source]

Converts the given StoredEvent to a domain event object.

class examples.aggregate7.orjsonpydantic.OrjsonTranscoder[source]

Bases: Transcoder

encode(obj: Any) bytes[source]

Encodes obj as bytes.

decode(data: bytes) Any[source]

Decodes obj from bytes.

class examples.aggregate7.orjsonpydantic.PydanticApplication(env: Mapping[str, str] | None = None)[source]

Bases: Application[UUID]

env: ClassVar[dict[str, str]] = {'MAPPER_TOPIC': 'examples.aggregate7.orjsonpydantic:PydanticMapper', 'TRANSCODER_TOPIC': 'examples.aggregate7.orjsonpydantic:OrjsonTranscoder'}
name = 'PydanticApplication'
class examples.aggregate7.domainmodel.Trick(*, name: str)[source]

Bases: Immutable

name: str
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'name': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class examples.aggregate7.domainmodel.Dog(*, id: UUID, version: int, created_on: datetime, modified_on: datetime, name: str, tricks: tuple[Trick, ...])[source]

Bases: Aggregate

name: str
tricks: tuple[Trick, ...]
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'created_on': FieldInfo(annotation=datetime, required=True), 'id': FieldInfo(annotation=UUID, required=True), 'modified_on': FieldInfo(annotation=datetime, required=True), 'name': FieldInfo(annotation=str, required=True), 'tricks': FieldInfo(annotation=tuple[Trick, ...], required=True), 'version': FieldInfo(annotation=int, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class examples.aggregate7.domainmodel.DogRegistered(*, originator_id: UUID, originator_version: int, timestamp: datetime, name: str)[source]

Bases: DomainEvent

name: str
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'name': FieldInfo(annotation=str, required=True), 'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class examples.aggregate7.domainmodel.TrickAdded(*, originator_id: UUID, originator_version: int, timestamp: datetime, trick: Trick)[source]

Bases: DomainEvent

trick: Trick
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True), 'trick': FieldInfo(annotation=Trick, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

examples.aggregate7.domainmodel.register_dog(name: str) DomainEvent[source]
examples.aggregate7.domainmodel.add_trick(dog: Dog, trick: Trick) DomainEvent[source]
examples.aggregate7.domainmodel.mutate_dog(_: DomainEvent, __: Dog | None) Dog | None[source]
examples.aggregate7.domainmodel.mutate_dog(event: DogRegistered, _: None) Dog
examples.aggregate7.domainmodel.mutate_dog(event: TrickAdded, dog: Dog) Dog
examples.aggregate7.domainmodel.mutate_dog(event: Snapshot, _: None) Dog

Mutates aggregate with event.

class examples.aggregate7.application.DogSchool(env: Mapping[str, str] | None = None)[source]

Bases: PydanticApplication

is_snapshotting_enabled: bool = True
snapshot_class

alias of Snapshot

register_dog(name: str) UUID[source]
add_trick(dog_id: UUID, trick: str) None[source]
get_dog(dog_id: UUID) dict[str, Any][source]
name = 'DogSchool'
class examples.aggregate7.test_application.TestDogSchool(methodName='runTest')[source]

Bases: TestCase

test_dog_school() None[source]