Aggregate 7 - Pydantic immutable

This example shows how to use Pydantic to define immutable aggregate and event classes.

The main advantage of using Pydantic is that any custom value objects used in the domain model will be automatically serialised and deserialised, without needing also to define custom transcoding classes. Pydantic is also quite a lot faster at serialisation and deserialisation than the Python Standard Library’s json package.

This approach is demonstrated with the Trick class, which is used in both aggregate events and aggregate state, and which is reconstructed from serialised string values, representing only the name of the trick, from both recorded aggregate events and from recorded snapshots.

Pydantic immutable model

The library’s eventsourcing.pydantic.immutablemodel module defines base classes for immutable domain events and aggregates that use Pydantic.

class Immutable(BaseModel):
    model_config = ConfigDict(extra="forbid", frozen=True)
class DomainEvent(Immutable, Generic[TAggregateID]):
    originator_id: TAggregateID
    originator_version: int
    timestamp: datetime = Field(default_factory=datetime_now_with_tzinfo)
    metadata: dict[str, str] = Field(default_factory=get_metadata_from_context)
    event_id: UUID = Field(default_factory=uuid4)
class Aggregate(Immutable, Generic[TAggregateID]):
    id: TAggregateID
    version: int
    created_on: datetime
    modified_on: datetime

Also included is a generic function for building an immutable aggregate projector function from an immutable aggregate mutator function.

def aggregate_projector(
    mutator: MutatorFunction[TAggregate],
) -> Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None]:
    def project_aggregate(
        aggregate: TAggregate | None, events: Iterable[DomainEvent]
    ) -> TAggregate | None:
        for event in events:
            aggregate = mutator(event, aggregate)
        return aggregate

    return project_aggregate

Pydantic mapper

The PydanticMapper class is a mapper that supports Pydantic. It is responsible for serialising and deserialising Pydantic immutable model objects.

class PydanticMapper(Mapper[TAggregateID]):
    def to_stored_event(
        self, domain_event: DomainEventProtocol[TAggregateID]
    ) -> StoredEvent:
        topic = get_topic(domain_event.__class__)
        assert isinstance(domain_event, DomainEvent)
        stored_state = domain_event.model_dump_json().encode()
        if self.compressor:
            stored_state = self.compressor.compress(stored_state)
        if self.cipher:
            stored_state = self.cipher.encrypt(stored_state)
        return StoredEvent(
            originator_id=domain_event.originator_id,
            originator_version=domain_event.originator_version,
            topic=topic,
            state=stored_state,
        )

    def to_domain_event(
        self, stored_event: StoredEvent
    ) -> DomainEventProtocol[TAggregateID]:
        stored_state = stored_event.state
        if self.cipher:
            stored_state = self.cipher.decrypt(stored_state)
        if self.compressor:
            stored_state = self.compressor.decompress(stored_state)
        cls = resolve_topic(stored_event.topic)
        assert issubclass(cls, DomainEvent)
        return cls.model_validate_json(stored_state.decode())

Pydantic application

The PydanticApplication class is configured to use the Pydantic mapper. It is a subclass of the library’s Application class.

class PydanticApplication(Application[TAggregateID]):
    def construct_mapper(self) -> Mapper[TAggregateID]:
        return PydanticMapper(
            transcoder=NullTranscoder(),
            cipher=self.factory.cipher(),
            compressor=self.factory.compressor(),
        )

Domain model

The code below shows how to define an immutable Dog aggregate in a functional style, using the Pydantic immutable model.

from __future__ import annotations

from functools import singledispatch
from uuid import UUID, uuid4

from eventsourcing.pydantic.immutablemodel import (
    Aggregate,
    DomainEvent,
    Immutable,
    Snapshot,
    aggregate_projector,
)


class Trick(Immutable):
    name: str


class Dog(Aggregate[UUID]):
    name: str
    tricks: tuple[Trick, ...]


class DogRegistered(DomainEvent[UUID]):
    name: str


class TrickAdded(DomainEvent[UUID]):
    trick: Trick


def register_dog(name: str) -> DomainEvent[UUID]:
    return DogRegistered(
        originator_id=uuid4(),
        originator_version=1,
        name=name,
    )


def add_trick(dog: Dog, trick: Trick) -> DomainEvent:
    return TrickAdded(
        originator_id=dog.id,
        originator_version=dog.version + 1,
        trick=trick,
    )


@singledispatch
def mutate_dog(_: DomainEvent, __: Dog | None) -> Dog | None:
    """Mutates aggregate with event."""


@mutate_dog.register
def _(event: DogRegistered, _: None) -> Dog:
    return Dog(
        id=event.originator_id,
        version=event.originator_version,
        created_on=event.timestamp,
        modified_on=event.timestamp,
        name=event.name,
        tricks=(),
    )


@mutate_dog.register
def _(event: TrickAdded, dog: Dog) -> Dog:
    return Dog(
        id=dog.id,
        version=event.originator_version,
        created_on=dog.created_on,
        modified_on=event.timestamp,
        name=dog.name,
        tricks=(*dog.tricks, event.trick),
    )


@mutate_dog.register
def _(event: Snapshot, _: None) -> Dog:
    return Dog(
        id=event.state["id"],
        version=event.state["version"],
        created_on=event.state["created_on"],
        modified_on=event.state["modified_on"],
        name=event.state["name"],
        tricks=event.state["tricks"],
    )


project_dog = aggregate_projector(mutate_dog)

Application

The DogSchool application in this example uses the Pydantic application. It must receive the new events that are returned by the aggregate command methods, and pass them to its save() method. The aggregate projector function must also be supplied when reconstructing an aggregate from the repository, and when taking snapshots.

class DogSchool(PydanticApplication):
    is_snapshotting_enabled = True
    snapshot_class = Snapshot[UUID]

    def register_dog(self, name: str) -> UUID:
        event = register_dog(name)
        self.save(event)
        return event.originator_id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog = self.repository.get(dog_id, projector_func=project_dog)
        self.save(add_trick(dog, Trick(name=trick)))

    def get_dog(self, dog_id: UUID) -> dict[str, Any]:
        dog = self.repository.get(dog_id, projector_func=project_dog)
        return {
            "name": dog.name,
            "tricks": tuple([t.name for t in dog.tricks]),
            "created_on": dog.created_on,
            "modified_on": dog.modified_on,
        }

Test case

The TestDogSchool test case shows how the DogSchool application can be used.

class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        with put_metadata_in_context({"user_id": "user-1"}):
            dog_id = school.register_dog("Fido")
            school.add_trick(dog_id, "roll over")
            school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        with put_metadata_in_context({"user_id": "admin-1"}):
            school.take_snapshot(dog_id, version=3, projector_func=project_dog)
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

        # Continue with snapshotted aggregate.
        with put_metadata_in_context({"user_id": "user-1"}):
            school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead", "fetch ball"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

        # Check metadata on events.
        events = list(school.events.get(dog_id))
        assert len(events) > 0
        for event in events:
            assert event.metadata.get("user_id") == "user-1"

        # Check metadata on snapshots.
        assert school.snapshots is not None
        snapshots = list(school.snapshots.get(dog_id))
        assert len(snapshots) > 0
        for snapshot in snapshots:
            assert snapshot.metadata.get("user_id") == "admin-1"

Code reference

class eventsourcing.pydantic.immutablemodel.Immutable[source]

Bases: BaseModel

model_config = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class eventsourcing.pydantic.immutablemodel.DomainEvent(*, originator_id: TAggregateID, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>)[source]

Bases: Immutable, Generic[TAggregateID]

originator_id: TAggregateID
originator_version: int
timestamp: datetime
metadata: dict[str, str]
event_id: UUID
model_config = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class eventsourcing.pydantic.immutablemodel.DomainEvent(*, originator_id: TAggregateID, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>)[source]

Bases: Immutable, Generic[TAggregateID]

originator_id: TAggregateID
originator_version: int
timestamp: datetime
metadata: dict[str, str]
event_id: UUID
model_config = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class eventsourcing.pydantic.immutablemodel.Aggregate(*, id: TAggregateID, version: int, created_on: datetime, modified_on: datetime)[source]

Bases: Immutable, Generic[TAggregateID]

id: TAggregateID
version: int
created_on: datetime
modified_on: datetime
model_config = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class eventsourcing.pydantic.immutablemodel.Aggregate(*, id: TAggregateID, version: int, created_on: datetime, modified_on: datetime)[source]

Bases: Immutable, Generic[TAggregateID]

id: TAggregateID
version: int
created_on: datetime
modified_on: datetime
model_config = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class eventsourcing.pydantic.immutablemodel.Snapshot(*, originator_id: ~eventsourcing.domain.TAggregateID, originator_version: int, timestamp: ~datetime.datetime = <factory>, metadata: dict[str, str] = <factory>, event_id: ~uuid.UUID = <factory>, topic: str, state: dict[str, ~typing.Any])[source]

Bases: DomainEvent

topic: str
state: dict[str, Any]
classmethod take(aggregate: Aggregate) Snapshot[source]
model_config = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

eventsourcing.pydantic.immutablemodel.aggregate_projector(mutator: MutatorFunction[TAggregate]) Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None][source]
class eventsourcing.pydantic.mapper.PydanticMapper(transcoder: Transcoder, compressor: Compressor | None = None, cipher: Cipher | None = None)[source]

Bases: Mapper[TAggregateID]

to_stored_event(domain_event: DomainEventProtocol[TAggregateID]) StoredEvent[source]

Converts the given domain event to a StoredEvent object.

to_domain_event(stored_event: StoredEvent) DomainEventProtocol[TAggregateID][source]

Converts the given StoredEvent to a domain event object.

class eventsourcing.pydantic.application.PydanticApplication(env: Mapping[str, str] | None = None)[source]

Bases: Application[TAggregateID]

construct_mapper() Mapper[TAggregateID][source]

Constructs a Mapper for use by the application.

aggregate_id_type

alias of UUID

name = 'PydanticApplication'
class examples.aggregate7.domainmodel.Trick(*, name: str)[source]

Bases: Immutable

name: str
model_config = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class examples.aggregate7.domainmodel.Dog(*, id: UUID, version: int, created_on: datetime, modified_on: datetime, name: str, tricks: tuple[Trick, ...])[source]

Bases: Aggregate[UUID]

name: str
tricks: tuple[Trick, ...]
model_config = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class examples.aggregate7.domainmodel.DogRegistered(*, originator_id: UUID, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, name: str)[source]

Bases: DomainEvent[UUID]

name: str
model_config = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class examples.aggregate7.domainmodel.TrickAdded(*, originator_id: UUID, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, trick: Trick)[source]

Bases: DomainEvent[UUID]

trick: Trick
model_config = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

examples.aggregate7.domainmodel.register_dog(name: str) DomainEvent[UUID][source]
examples.aggregate7.domainmodel.add_trick(dog: Dog, trick: Trick) DomainEvent[source]
examples.aggregate7.domainmodel.mutate_dog(_: DomainEvent, __: Dog | None) Dog | None[source]
examples.aggregate7.domainmodel.mutate_dog(event: DogRegistered, _: None) Dog
examples.aggregate7.domainmodel.mutate_dog(event: TrickAdded, dog: Dog) Dog
examples.aggregate7.domainmodel.mutate_dog(event: Snapshot, _: None) Dog

Mutates aggregate with event.

class examples.aggregate7.application.DogSchool(env: Mapping[str, str] | None = None)[source]

Bases: PydanticApplication

is_snapshotting_enabled: bool = True
snapshot_class

alias of Snapshot[UUID]

register_dog(name: str) UUID[source]
add_trick(dog_id: UUID, trick: str) None[source]
get_dog(dog_id: UUID) dict[str, Any][source]
aggregate_id_type

alias of UUID

name = 'DogSchool'
class examples.aggregate7.test_application.TestDogSchool(methodName='runTest')[source]

Bases: TestCase

test_dog_school() None[source]