Aggregate 7 - Pydantic immutable¶
This example shows how to use Pydantic to define immutable aggregate and event classes.
The main advantage of using Pydantic is that any custom value objects
used in the domain model will be automatically serialised and deserialised,
without needing also to define custom transcoding classes.
Pydantic is also quite a lot faster at serialisation and deserialisation than
the Python Standard Library’s json package.
This approach is demonstrated with the Trick class,
which is used in both aggregate events and aggregate state, and which is reconstructed from serialised string
values, representing only the name of the trick, from both recorded aggregate events and from recorded snapshots.
Pydantic immutable model¶
The library’s eventsourcing.pydantic.immutablemodel module defines base classes for immutable domain events
and aggregates that use Pydantic.
class Immutable(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
class DomainEvent(Immutable, Generic[TAggregateID]):
originator_id: TAggregateID
originator_version: int
timestamp: datetime = Field(default_factory=datetime_now_with_tzinfo)
metadata: dict[str, str] = Field(default_factory=get_metadata_from_context)
event_id: UUID = Field(default_factory=uuid4)
class Aggregate(Immutable, Generic[TAggregateID]):
id: TAggregateID
version: int
created_on: datetime
modified_on: datetime
Also included is a generic function for building an immutable aggregate projector function from an immutable aggregate mutator function.
def aggregate_projector(
mutator: MutatorFunction[TAggregate],
) -> Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None]:
def project_aggregate(
aggregate: TAggregate | None, events: Iterable[DomainEvent]
) -> TAggregate | None:
for event in events:
aggregate = mutator(event, aggregate)
return aggregate
return project_aggregate
Pydantic mapper¶
The PydanticMapper class is a
mapper that supports Pydantic. It is responsible for serialising and
deserialising Pydantic immutable model objects.
class PydanticMapper(Mapper[TAggregateID]):
def to_stored_event(
self, domain_event: DomainEventProtocol[TAggregateID]
) -> StoredEvent:
topic = get_topic(domain_event.__class__)
assert isinstance(domain_event, DomainEvent)
stored_state = domain_event.model_dump_json().encode()
if self.compressor:
stored_state = self.compressor.compress(stored_state)
if self.cipher:
stored_state = self.cipher.encrypt(stored_state)
return StoredEvent(
originator_id=domain_event.originator_id,
originator_version=domain_event.originator_version,
topic=topic,
state=stored_state,
)
def to_domain_event(
self, stored_event: StoredEvent
) -> DomainEventProtocol[TAggregateID]:
stored_state = stored_event.state
if self.cipher:
stored_state = self.cipher.decrypt(stored_state)
if self.compressor:
stored_state = self.compressor.decompress(stored_state)
cls = resolve_topic(stored_event.topic)
assert issubclass(cls, DomainEvent)
return cls.model_validate_json(stored_state.decode())
Pydantic application¶
The PydanticApplication class
is configured to use the Pydantic mapper. It is a subclass of the
library’s Application class.
class PydanticApplication(Application[TAggregateID]):
def construct_mapper(self) -> Mapper[TAggregateID]:
return PydanticMapper(
transcoder=NullTranscoder(),
cipher=self.factory.cipher(),
compressor=self.factory.compressor(),
)
Domain model¶
The code below shows how to define an immutable Dog aggregate in
a functional style, using the Pydantic immutable model.
from __future__ import annotations
from functools import singledispatch
from uuid import UUID, uuid4
from eventsourcing.pydantic.immutablemodel import (
Aggregate,
DomainEvent,
Immutable,
Snapshot,
aggregate_projector,
)
class Trick(Immutable):
name: str
class Dog(Aggregate[UUID]):
name: str
tricks: tuple[Trick, ...]
class DogRegistered(DomainEvent[UUID]):
name: str
class TrickAdded(DomainEvent[UUID]):
trick: Trick
def register_dog(name: str) -> DomainEvent[UUID]:
return DogRegistered(
originator_id=uuid4(),
originator_version=1,
name=name,
)
def add_trick(dog: Dog, trick: Trick) -> DomainEvent:
return TrickAdded(
originator_id=dog.id,
originator_version=dog.version + 1,
trick=trick,
)
@singledispatch
def mutate_dog(_: DomainEvent, __: Dog | None) -> Dog | None:
"""Mutates aggregate with event."""
@mutate_dog.register
def _(event: DogRegistered, _: None) -> Dog:
return Dog(
id=event.originator_id,
version=event.originator_version,
created_on=event.timestamp,
modified_on=event.timestamp,
name=event.name,
tricks=(),
)
@mutate_dog.register
def _(event: TrickAdded, dog: Dog) -> Dog:
return Dog(
id=dog.id,
version=event.originator_version,
created_on=dog.created_on,
modified_on=event.timestamp,
name=dog.name,
tricks=(*dog.tricks, event.trick),
)
@mutate_dog.register
def _(event: Snapshot, _: None) -> Dog:
return Dog(
id=event.state["id"],
version=event.state["version"],
created_on=event.state["created_on"],
modified_on=event.state["modified_on"],
name=event.state["name"],
tricks=event.state["tricks"],
)
project_dog = aggregate_projector(mutate_dog)
Application¶
The DogSchool application in this example uses the
Pydantic application. It must receive the new events that are returned
by the aggregate command methods, and pass them to its save()
method. The aggregate projector function must also be supplied when reconstructing an aggregate from the
repository, and when taking snapshots.
class DogSchool(PydanticApplication):
is_snapshotting_enabled = True
snapshot_class = Snapshot[UUID]
def register_dog(self, name: str) -> UUID:
event = register_dog(name)
self.save(event)
return event.originator_id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog = self.repository.get(dog_id, projector_func=project_dog)
self.save(add_trick(dog, Trick(name=trick)))
def get_dog(self, dog_id: UUID) -> dict[str, Any]:
dog = self.repository.get(dog_id, projector_func=project_dog)
return {
"name": dog.name,
"tricks": tuple([t.name for t in dog.tricks]),
"created_on": dog.created_on,
"modified_on": dog.modified_on,
}
Test case¶
The TestDogSchool test case shows how the
DogSchool application can be used.
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
with put_metadata_in_context({"user_id": "user-1"}):
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead"))
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
with put_metadata_in_context({"user_id": "admin-1"}):
school.take_snapshot(dog_id, version=3, projector_func=project_dog)
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead"))
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
# Continue with snapshotted aggregate.
with put_metadata_in_context({"user_id": "user-1"}):
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
self.assertEqual(dog["name"], "Fido")
self.assertEqual(dog["tricks"], ("roll over", "play dead", "fetch ball"))
self.assertIsInstance(dog["created_on"], datetime)
self.assertIsInstance(dog["modified_on"], datetime)
# Check metadata on events.
events = list(school.events.get(dog_id))
assert len(events) > 0
for event in events:
assert event.metadata.get("user_id") == "user-1"
# Check metadata on snapshots.
assert school.snapshots is not None
snapshots = list(school.snapshots.get(dog_id))
assert len(snapshots) > 0
for snapshot in snapshots:
assert snapshot.metadata.get("user_id") == "admin-1"
Code reference¶
- class eventsourcing.pydantic.immutablemodel.Immutable[source]¶
Bases:
BaseModel- model_config = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class eventsourcing.pydantic.immutablemodel.DomainEvent(*, originator_id: TAggregateID, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>)[source]¶
Bases:
Immutable,Generic[TAggregateID]- originator_id: TAggregateID¶
- originator_version: int¶
- timestamp: datetime¶
- metadata: dict[str, str]¶
- event_id: UUID¶
- model_config = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class eventsourcing.pydantic.immutablemodel.DomainEvent(*, originator_id: TAggregateID, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>)[source]¶
Bases:
Immutable,Generic[TAggregateID]- originator_id: TAggregateID¶
- originator_version: int¶
- timestamp: datetime¶
- metadata: dict[str, str]¶
- event_id: UUID¶
- model_config = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class eventsourcing.pydantic.immutablemodel.Aggregate(*, id: TAggregateID, version: int, created_on: datetime, modified_on: datetime)[source]¶
Bases:
Immutable,Generic[TAggregateID]- id: TAggregateID¶
- version: int¶
- created_on: datetime¶
- modified_on: datetime¶
- model_config = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class eventsourcing.pydantic.immutablemodel.Aggregate(*, id: TAggregateID, version: int, created_on: datetime, modified_on: datetime)[source]¶
Bases:
Immutable,Generic[TAggregateID]- id: TAggregateID¶
- version: int¶
- created_on: datetime¶
- modified_on: datetime¶
- model_config = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class eventsourcing.pydantic.immutablemodel.Snapshot(*, originator_id: ~eventsourcing.domain.TAggregateID, originator_version: int, timestamp: ~datetime.datetime = <factory>, metadata: dict[str, str] = <factory>, event_id: ~uuid.UUID = <factory>, topic: str, state: dict[str, ~typing.Any])[source]¶
Bases:
DomainEvent- topic: str¶
- state: dict[str, Any]¶
- model_config = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- eventsourcing.pydantic.immutablemodel.aggregate_projector(mutator: MutatorFunction[TAggregate]) Callable[[TAggregate | None, Iterable[DomainEvent]], TAggregate | None][source]¶
- class eventsourcing.pydantic.mapper.PydanticMapper(transcoder: Transcoder, compressor: Compressor | None = None, cipher: Cipher | None = None)[source]¶
Bases:
Mapper[TAggregateID]- to_stored_event(domain_event: DomainEventProtocol[TAggregateID]) StoredEvent[source]¶
Converts the given domain event to a
StoredEventobject.
- to_domain_event(stored_event: StoredEvent) DomainEventProtocol[TAggregateID][source]¶
Converts the given
StoredEventto a domain event object.
- class eventsourcing.pydantic.application.PydanticApplication(env: Mapping[str, str] | None = None)[source]¶
Bases:
Application[TAggregateID]- aggregate_id_type¶
alias of
UUID
- name = 'PydanticApplication'¶
- class examples.aggregate7.domainmodel.Trick(*, name: str)[source]¶
Bases:
Immutable- name: str¶
- model_config = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class examples.aggregate7.domainmodel.Dog(*, id: UUID, version: int, created_on: datetime, modified_on: datetime, name: str, tricks: tuple[Trick, ...])[source]¶
Bases:
Aggregate[UUID]- name: str¶
- model_config = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class examples.aggregate7.domainmodel.DogRegistered(*, originator_id: UUID, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, name: str)[source]¶
Bases:
DomainEvent[UUID]- name: str¶
- model_config = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class examples.aggregate7.domainmodel.TrickAdded(*, originator_id: UUID, originator_version: int, timestamp: datetime = <factory>, metadata: dict[str, str]=<factory>, event_id: UUID = <factory>, trick: Trick)[source]¶
Bases:
DomainEvent[UUID]- model_config = {'extra': 'forbid', 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- examples.aggregate7.domainmodel.register_dog(name: str) DomainEvent[UUID][source]¶
- examples.aggregate7.domainmodel.add_trick(dog: Dog, trick: Trick) DomainEvent[source]¶
- examples.aggregate7.domainmodel.mutate_dog(_: DomainEvent, __: Dog | None) Dog | None[source]¶
- examples.aggregate7.domainmodel.mutate_dog(event: DogRegistered, _: None) Dog
- examples.aggregate7.domainmodel.mutate_dog(event: TrickAdded, dog: Dog) Dog
- examples.aggregate7.domainmodel.mutate_dog(event: Snapshot, _: None) Dog
Mutates aggregate with event.