Aggregate 8 - Pydantic with declarative syntax

This example shows how to use Pydantic with the library’s declarative syntax.

Similar to example 1, aggregates are expressed using the library’s declarative syntax. This is the most concise way of defining an event-sourced aggregate.

Similar to example 7, domain event and custom value objects are defined using Pydantic. The main advantage of using Pydantic here is that any custom value objects used in the domain model will be automatically serialised and deserialised, without needing also to define custom transcoding classes.

Pydantic model for mutable aggregate

The code below shows how to define base classes for mutable aggregates that use Pydantic.

from __future__ import annotations

import typing
from datetime import datetime
from typing import Any
from uuid import UUID, uuid4

from pydantic import ConfigDict, TypeAdapter

from eventsourcing.domain import (
    BaseAggregate,
    CanInitAggregate,
    CanMutateAggregate,
    CanSnapshotAggregate,
)
from examples.aggregate7.immutablemodel import DomainEvent, Immutable

datetime_adapter = TypeAdapter(datetime)


class SnapshotState(Immutable):
    model_config = ConfigDict(extra="allow")

    def __init__(self, **kwargs: Any) -> None:
        for key in ["_created_on", "_modified_on"]:
            kwargs[key] = datetime_adapter.validate_python(kwargs[key])
        super().__init__(**kwargs)


class AggregateSnapshot(DomainEvent, CanSnapshotAggregate[UUID]):
    topic: str
    state: Any

    def __init_subclass__(cls, **kwargs: Any) -> None:
        super().__init_subclass__(**kwargs)
        type_of_snapshot_state = typing.get_type_hints(cls)["state"]
        try:
            assert issubclass(
                type_of_snapshot_state, SnapshotState
            ), type_of_snapshot_state
        except (TypeError, AssertionError) as e:
            msg = (
                f"Subclass of {SnapshotState}"
                f" is required as the annotated type of 'state' on "
                f"{cls}, got: {type_of_snapshot_state}"
            )
            raise TypeError(msg) from e


class Aggregate(BaseAggregate[UUID]):
    @staticmethod
    def create_id(*_: Any, **__: Any) -> UUID:
        """Returns a new aggregate ID."""
        return uuid4()

    class Event(DomainEvent, CanMutateAggregate[UUID]):
        pass

    class Created(Event, CanInitAggregate[UUID]):
        originator_topic: str

Domain model

The code below shows how to define a mutable aggregate with the library’s declarative syntax, using the Pydantic module for mutable aggregates

from __future__ import annotations

from eventsourcing.domain import event
from examples.aggregate7.immutablemodel import Immutable
from examples.aggregate8.mutablemodel import Aggregate, AggregateSnapshot, SnapshotState


class Trick(Immutable):
    name: str


class DogSnapshotState(SnapshotState):
    name: str
    tricks: list[Trick]


class Dog(Aggregate):
    class Snapshot(AggregateSnapshot):
        state: DogSnapshotState

    @event("Registered")
    def __init__(self, name: str) -> None:
        self.name = name
        self.tricks: list[Trick] = []

    @event("TrickAdded")
    def add_trick(self, trick: Trick) -> None:
        self.tricks.append(trick)

Application

The DogSchool application in this example uses the PydanticApplication class from example 7.

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from examples.aggregate7.orjsonpydantic import PydanticApplication
from examples.aggregate8.domainmodel import Dog, Trick

if TYPE_CHECKING:
    from uuid import UUID


class DogSchool(PydanticApplication):
    is_snapshotting_enabled = True

    def register_dog(self, name: str) -> UUID:
        dog = Dog(name)
        self.save(dog)
        return dog.id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog: Dog = self.repository.get(dog_id)
        dog.add_trick(Trick(name=trick))
        self.save(dog)

    def get_dog(self, dog_id: UUID) -> dict[str, Any]:
        dog: Dog = self.repository.get(dog_id)
        return {
            "name": dog.name,
            "tricks": tuple([t.name for t in dog.tricks]),
            "created_on": dog.created_on,
            "modified_on": dog.modified_on,
        }

Test case

The TestDogSchool test case shows how the DogSchool application can be used.

from __future__ import annotations

from datetime import datetime
from unittest import TestCase

from examples.aggregate8.application import DogSchool


class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        school.take_snapshot(dog_id, version=3)
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqual(dog["tricks"], ("roll over", "play dead", "fetch ball"))
        self.assertIsInstance(dog["created_on"], datetime)
        self.assertIsInstance(dog["modified_on"], datetime)

Code reference

class examples.aggregate8.mutablemodel.SnapshotState(**kwargs: Any)[source]

Bases: Immutable

model_config: ClassVar[ConfigDict] = {'extra': 'allow', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_fields: ClassVar[Dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class examples.aggregate8.mutablemodel.AggregateSnapshot(*, originator_id: UUID, originator_version: int, topic: str, state: Any, timestamp: datetime)[source]

Bases: DomainEvent, CanSnapshotAggregate[UUID]

topic: str
state: Any
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'state': FieldInfo(annotation=Any, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True), 'topic': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

originator_id_type

alias of UUID

class examples.aggregate8.mutablemodel.Aggregate(*args: Any, **kwargs: Any)[source]

Bases: BaseAggregate[UUID]

static create_id(*_: Any, **__: Any) UUID[source]

Returns a new aggregate ID.

class Event(*, originator_id: UUID, originator_version: int, timestamp: datetime)[source]

Bases: DomainEvent, CanMutateAggregate[UUID]

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

originator_id_type

alias of UUID

class Created(*, originator_id: UUID, originator_version: int, timestamp: datetime, originator_topic: str)[source]

Bases: Event, CanInitAggregate[UUID]

originator_topic: str

String describing the path to an aggregate class.

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_topic': FieldInfo(annotation=str, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

originator_id_type

alias of UUID

class examples.aggregate8.domainmodel.Trick(*, name: str)[source]

Bases: Immutable

name: str
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'name': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class examples.aggregate8.domainmodel.DogSnapshotState(*, name: str, tricks: list[Trick], **kwargs: Any)[source]

Bases: SnapshotState

name: str
tricks: list[Trick]
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'allow', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'name': FieldInfo(annotation=str, required=True), 'tricks': FieldInfo(annotation=list[Trick], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class examples.aggregate8.domainmodel.Dog(*args: Any, **kwargs: Any)[source]

Bases: Aggregate

class Snapshot(*, originator_id: UUID, originator_version: int, topic: str, state: DogSnapshotState, timestamp: datetime)[source]

Bases: AggregateSnapshot

state: DogSnapshotState
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'state': FieldInfo(annotation=DogSnapshotState, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True), 'topic': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

originator_id_type

alias of UUID

add_trick = <eventsourcing.domain.UnboundCommandMethodDecorator object>[source]
class Created(*, originator_id: UUID, originator_version: int, timestamp: datetime, originator_topic: str)

Bases: Event, Created

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_topic': FieldInfo(annotation=str, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

originator_id_type

alias of UUID

class Event(*, originator_id: UUID, originator_version: int, timestamp: datetime)

Bases: Event

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

originator_id_type

alias of UUID

class Registered(*, originator_id: UUID, originator_version: int, timestamp: datetime, originator_topic: str, name: str)

Bases: Created, Event

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'name': FieldInfo(annotation=str, required=True), 'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_topic': FieldInfo(annotation=str, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

originator_id_type

alias of UUID

name: str
class TrickAdded(*, originator_id: UUID, originator_version: int, timestamp: datetime, trick: Trick)

Bases: DecoratorEvent, Event

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True), 'trick': FieldInfo(annotation=Trick, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

trick: Trick
class examples.aggregate8.application.DogSchool(env: Mapping[str, str] | None = None)[source]

Bases: PydanticApplication

is_snapshotting_enabled: bool = True
register_dog(name: str) UUID[source]
add_trick(dog_id: UUID, trick: str) None[source]
get_dog(dog_id: UUID) dict[str, Any][source]
name = 'DogSchool'
class examples.aggregate8.test_application.TestDogSchool(methodName='runTest')[source]

Bases: TestCase

test_dog_school() None[source]