Aggregate 7 - Pydantic and orjson¶
This example shows how to use Pydantic to define immutable aggregate and event classes.
The main advantage of using Pydantic here is that any custom value objects used in the domain model will be automatically serialised and deserialised, without needing also to define custom transcoding classes.
This approach is demonstrated in the example below with the Trick
class,
which is used in both aggregate events and aggregate state, and which is reconstructed from serialised string
values, representing only the name of the trick, from both recorded aggregate events and from recorded snapshots.
Pydantic mapper and orjson transcoder¶
The application class in this example uses a mapper that supports Pydantic and a transcoder that uses orjson.
The PydanticMapper
class is a
mapper that supports Pydantic. It is responsible for converting
domain model objects to object types that orjson can serialise, and for
reconstructing model objects from JSON objects that have been deserialised by orjson.
class PydanticMapper(Mapper[UUID]):
def to_stored_event(self, domain_event: DomainEventProtocol[UUID]) -> StoredEvent:
topic = get_topic(domain_event.__class__)
event_state = cast(BaseModel, domain_event).model_dump(mode="json")
stored_state = self.transcoder.encode(event_state)
if self.compressor:
stored_state = self.compressor.compress(stored_state)
if self.cipher:
stored_state = self.cipher.encrypt(stored_state)
return StoredEvent(
originator_id=domain_event.originator_id,
originator_version=domain_event.originator_version,
topic=topic,
state=stored_state,
)
def to_domain_event(self, stored_event: StoredEvent) -> DomainEventProtocol[UUID]:
stored_state = stored_event.state
if self.cipher:
stored_state = self.cipher.decrypt(stored_state)
if self.compressor:
stored_state = self.compressor.decompress(stored_state)
event_state: dict[str, Any] = self.transcoder.decode(stored_state)
cls = resolve_topic(stored_event.topic)
return cls(**event_state)
The OrjsonTranscoder
class is a
transcoder that uses orjson, possibly the fastest JSON transcoder
available in Python.
class OrjsonTranscoder(Transcoder):
def encode(self, obj: Any) -> bytes:
return orjson.dumps(obj)
def decode(self, data: bytes) -> Any:
return orjson.loads(data)
The PydanticApplication
class is a
subclass of the library’s Application
class
which is configured to use PydanticMapper
and OrjsonTranscoder
.
class PydanticApplication(Application[UUID]):
env: ClassVar[dict[str, str]] = {
"TRANSCODER_TOPIC": get_topic(OrjsonTranscoder),
"MAPPER_TOPIC": get_topic(PydanticMapper),
}
Pydantic model for immutable aggregate¶
The code below shows how to define base classes for immutable aggregates that use Pydantic.
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar
from uuid import UUID
from pydantic import BaseModel, ConfigDict
from eventsourcing.domain import datetime_now_with_tzinfo
from eventsourcing.utils import get_topic
if TYPE_CHECKING:
from collections.abc import Iterable
class Immutable(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
class DomainEvent(Immutable):
originator_id: UUID
originator_version: int
timestamp: datetime
class Aggregate(Immutable):
id: UUID
version: int
created_on: datetime
modified_on: datetime
class Snapshot(DomainEvent):
topic: str
state: dict[str, Any]
@classmethod
def take(cls, aggregate: Aggregate) -> Snapshot:
return Snapshot(
originator_id=aggregate.id,
originator_version=aggregate.version,
timestamp=datetime_now_with_tzinfo(),
topic=get_topic(type(aggregate)),
state=aggregate.model_dump(),
)
TAggregate = TypeVar("TAggregate", bound=Aggregate)
MutatorFunction = Callable[..., Optional[TAggregate]]
def aggregate_projector(
mutator: MutatorFunction[TAggregate],
) -> Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None]:
def project_aggregate(
aggregate: TAggregate | None, events: Iterable[DomainEvent]
) -> TAggregate | None:
for event in events:
aggregate = mutator(event, aggregate)
return aggregate
return project_aggregate
Domain model¶
The code below shows how to define an immutable aggregate in a functional style, using the Pydantic module for immutable aggregates.
from __future__ import annotations
from functools import singledispatch
from uuid import uuid4
from eventsourcing.domain import datetime_now_with_tzinfo
from examples.aggregate7.immutablemodel import (
Aggregate,
DomainEvent,
Immutable,
Snapshot,
aggregate_projector,
)
class Trick(Immutable):
name: str
class Dog(Aggregate):
name: str
tricks: tuple[Trick, ...]
class DogRegistered(DomainEvent):
name: str
class TrickAdded(DomainEvent):
trick: Trick
def register_dog(name: str) -> DomainEvent:
return DogRegistered(
originator_id=uuid4(),
originator_version=1,
timestamp=datetime_now_with_tzinfo(),
name=name,
)
def add_trick(dog: Dog, trick: Trick) -> DomainEvent:
return TrickAdded(
originator_id=dog.id,
originator_version=dog.version + 1,
timestamp=datetime_now_with_tzinfo(),
trick=trick,
)
@singledispatch
def mutate_dog(_: DomainEvent, __: Dog | None) -> Dog | None:
"""Mutates aggregate with event."""
@mutate_dog.register
def _(event: DogRegistered, _: None) -> Dog:
return Dog(
id=event.originator_id,
version=event.originator_version,
created_on=event.timestamp,
modified_on=event.timestamp,
name=event.name,
tricks=(),
)
@mutate_dog.register
def _(event: TrickAdded, dog: Dog) -> Dog:
return Dog(
id=dog.id,
version=event.originator_version,
created_on=dog.created_on,
modified_on=event.timestamp,
name=dog.name,
tricks=(*dog.tricks, event.trick),
)
@mutate_dog.register
def _(event: Snapshot, _: None) -> Dog:
return Dog(
id=event.state["id"],
version=event.state["version"],
created_on=event.state["created_on"],
modified_on=event.state["modified_on"],
name=event.state["name"],
tricks=event.state["tricks"],
)
project_dog = aggregate_projector(mutate_dog)
Application¶
The DogSchool
application in this example uses the
PydanticApplication
. It must receive the new events that are returned
by the aggregate command methods, and pass them to its save()
method. The aggregate projector function must also be supplied when reconstructing an aggregate from the
repository, and when taking snapshots.
class DogSchool(PydanticApplication):
is_snapshotting_enabled = True
snapshot_class = Snapshot
def register_dog(self, name: str) -> UUID:
event = register_dog(name)
self.save(event)
return event.originator_id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog = self.repository.get(dog_id, projector_func=project_dog)
self.save(add_trick(dog, Trick(name=trick)))
def get_dog(self, dog_id: UUID) -> dict[str, Any]:
dog = self.repository.get(dog_id, projector_func=project_dog)
return {
"name": dog.name,
"tricks": tuple([t.name for t in dog.tricks]),
"created_on": dog.created_on,
"modified_on": dog.modified_on,
}
Test case¶
The TestDogSchool
test case shows how the
DogSchool
application can be used.
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead"))
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
school.take_snapshot(dog_id, version=3, projector_func=project_dog)
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead"))
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead", "fetch ball"))
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
Code reference¶
- class examples.aggregate7.immutablemodel.Immutable[source]¶
Bases:
BaseModel
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_fields: ClassVar[Dict[str, FieldInfo]] = {}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- class examples.aggregate7.immutablemodel.DomainEvent(*, originator_id: UUID, originator_version: int, timestamp: datetime)[source]¶
Bases:
Immutable
- originator_id: UUID¶
- originator_version: int¶
- timestamp: datetime¶
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- class examples.aggregate7.immutablemodel.Aggregate(*, id: UUID, version: int, created_on: datetime, modified_on: datetime)[source]¶
Bases:
Immutable
- id: UUID¶
- version: int¶
- created_on: datetime¶
- modified_on: datetime¶
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'created_on': FieldInfo(annotation=datetime, required=True), 'id': FieldInfo(annotation=UUID, required=True), 'modified_on': FieldInfo(annotation=datetime, required=True), 'version': FieldInfo(annotation=int, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- class examples.aggregate7.immutablemodel.Snapshot(*, originator_id: UUID, originator_version: int, timestamp: datetime, topic: str, state: dict[str, Any])[source]¶
Bases:
DomainEvent
- topic: str¶
- state: dict[str, Any]¶
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'state': FieldInfo(annotation=dict[str, Any], required=True), 'timestamp': FieldInfo(annotation=datetime, required=True), 'topic': FieldInfo(annotation=str, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- examples.aggregate7.immutablemodel.aggregate_projector(mutator: MutatorFunction[TAggregate]) Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None] [source]¶
- class examples.aggregate7.orjsonpydantic.PydanticMapper(transcoder: Transcoder, compressor: Compressor | None = None, cipher: Cipher | None = None)[source]¶
Bases:
Mapper
[UUID
]- to_stored_event(domain_event: DomainEventProtocol[UUID]) StoredEvent [source]¶
Converts the given domain event to a
StoredEvent
object.
- to_domain_event(stored_event: StoredEvent) DomainEventProtocol[UUID] [source]¶
Converts the given
StoredEvent
to a domain event object.
- class examples.aggregate7.orjsonpydantic.OrjsonTranscoder[source]¶
Bases:
Transcoder
- class examples.aggregate7.orjsonpydantic.PydanticApplication(env: Mapping[str, str] | None = None)[source]¶
Bases:
Application
[UUID
]- env: ClassVar[dict[str, str]] = {'MAPPER_TOPIC': 'examples.aggregate7.orjsonpydantic:PydanticMapper', 'TRANSCODER_TOPIC': 'examples.aggregate7.orjsonpydantic:OrjsonTranscoder'}¶
- name = 'PydanticApplication'¶
- class examples.aggregate7.domainmodel.Trick(*, name: str)[source]¶
Bases:
Immutable
- name: str¶
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'name': FieldInfo(annotation=str, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- class examples.aggregate7.domainmodel.Dog(*, id: UUID, version: int, created_on: datetime, modified_on: datetime, name: str, tricks: tuple[Trick, ...])[source]¶
Bases:
Aggregate
- name: str¶
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'created_on': FieldInfo(annotation=datetime, required=True), 'id': FieldInfo(annotation=UUID, required=True), 'modified_on': FieldInfo(annotation=datetime, required=True), 'name': FieldInfo(annotation=str, required=True), 'tricks': FieldInfo(annotation=tuple[Trick, ...], required=True), 'version': FieldInfo(annotation=int, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- class examples.aggregate7.domainmodel.DogRegistered(*, originator_id: UUID, originator_version: int, timestamp: datetime, name: str)[source]¶
Bases:
DomainEvent
- name: str¶
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'name': FieldInfo(annotation=str, required=True), 'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- class examples.aggregate7.domainmodel.TrickAdded(*, originator_id: UUID, originator_version: int, timestamp: datetime, trick: Trick)[source]¶
Bases:
DomainEvent
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'originator_id': FieldInfo(annotation=UUID, required=True), 'originator_version': FieldInfo(annotation=int, required=True), 'timestamp': FieldInfo(annotation=datetime, required=True), 'trick': FieldInfo(annotation=Trick, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- examples.aggregate7.domainmodel.register_dog(name: str) DomainEvent [source]¶
- examples.aggregate7.domainmodel.add_trick(dog: Dog, trick: Trick) DomainEvent [source]¶
- examples.aggregate7.domainmodel.mutate_dog(_: DomainEvent, __: Dog | None) Dog | None [source]¶
- examples.aggregate7.domainmodel.mutate_dog(event: DogRegistered, _: None) Dog
- examples.aggregate7.domainmodel.mutate_dog(event: TrickAdded, dog: Dog) Dog
- examples.aggregate7.domainmodel.mutate_dog(event: Snapshot, _: None) Dog
Mutates aggregate with event.
- class examples.aggregate7.application.DogSchool(env: Mapping[str, str] | None = None)[source]¶
Bases:
PydanticApplication
- is_snapshotting_enabled: bool = True¶
- name = 'DogSchool'¶