Aggregate 7 - Pydantic and orjson¶
This example shows the Dog
class used in the tutorial and module docs.
Similar to the previous example, the model is expressed in a functional
style, and does not use the library Aggregate
class.
In contrast to the previous example, the aggregate and event classes are
defined as Pydantic immutable models, rather than as Python frozen dataclasses.
The application class in this example uses the persistence classes
PydanticMapper
and OrjsonTranscoder
. Pydantic is responsible
for converting domain model objects to object types that orjson can
serialise, and for reconstructing model objects from JSON objects
that have been deserialised by orjson. The application class also
uses the custom Snapshot
class, which is defined as a Pydantic
model.
Domain model¶
from __future__ import annotations
from datetime import datetime, timezone
from functools import singledispatch
from time import monotonic
from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar
from uuid import UUID, uuid4
from pydantic import BaseModel
from eventsourcing.application import MutatorFunction, ProjectorFunction
from eventsourcing.utils import get_topic
class DomainEvent(BaseModel):
originator_id: UUID
originator_version: int
timestamp: datetime
class Config:
allow_mutation = False
def create_timestamp() -> datetime:
return datetime.fromtimestamp(monotonic(), timezone.utc)
class Aggregate(BaseModel):
id: UUID
version: int
created_on: datetime
class Config:
allow_mutation = False
class Snapshot(DomainEvent):
topic: str
state: Dict[str, Any]
class Config:
allow_mutation = False
@classmethod
def take(cls, aggregate: Aggregate) -> Snapshot:
return Snapshot(
originator_id=aggregate.id,
originator_version=aggregate.version,
timestamp=create_timestamp(),
topic=get_topic(type(aggregate)),
state=aggregate.dict(),
)
TAggregate = TypeVar("TAggregate", bound=Aggregate)
def aggregate_projector(
mutator: MutatorFunction[TAggregate],
) -> ProjectorFunction[TAggregate]:
def project_aggregate(
aggregate: Optional[TAggregate], events: Iterable[DomainEvent]
) -> Optional[TAggregate]:
for event in events:
aggregate = mutator(event, aggregate)
return aggregate
return project_aggregate
class Dog(Aggregate):
name: str
tricks: Tuple[str, ...]
class DogRegistered(DomainEvent):
name: str
class TrickAdded(DomainEvent):
trick: str
def register_dog(name: str) -> DomainEvent:
return DogRegistered(
originator_id=uuid4(),
originator_version=1,
timestamp=create_timestamp(),
name=name,
)
def add_trick(dog: Dog, trick: str) -> DomainEvent:
return TrickAdded(
originator_id=dog.id,
originator_version=dog.version + 1,
timestamp=create_timestamp(),
trick=trick,
)
@singledispatch
def mutate_dog(event: DomainEvent, dog: Optional[Dog]) -> Optional[Dog]:
"""Mutates aggregate with event."""
@mutate_dog.register
def _(event: DogRegistered, _: None) -> Dog:
return Dog(
id=event.originator_id,
version=event.originator_version,
created_on=event.timestamp,
name=event.name,
tricks=(),
)
@mutate_dog.register
def _(event: TrickAdded, dog: Dog) -> Dog:
return Dog(
id=dog.id,
version=event.originator_version,
created_on=event.timestamp,
name=dog.name,
tricks=dog.tricks + (event.trick,),
)
@mutate_dog.register
def _(event: Snapshot, _: None) -> Dog:
return Dog(
id=event.state["id"],
version=event.state["version"],
created_on=event.state["created_on"],
name=event.state["name"],
tricks=event.state["tricks"],
)
project_dog = aggregate_projector(mutate_dog)
Application¶
from typing import Any, Dict
from uuid import UUID
from eventsourcing.application import Application
from eventsourcing.examples.aggregate7.domainmodel import (
Snapshot,
add_trick,
project_dog,
register_dog,
)
from eventsourcing.examples.aggregate7.persistence import (
OrjsonTranscoder,
PydanticMapper,
)
from eventsourcing.persistence import Mapper, Transcoder
class DogSchool(Application):
is_snapshotting_enabled = True
snapshot_class = Snapshot
def register_dog(self, name: str) -> UUID:
event = register_dog(name)
self.save(event)
return event.originator_id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog = self.repository.get(dog_id, projector_func=project_dog)
self.save(add_trick(dog, trick))
def get_dog(self, dog_id: UUID) -> Dict[str, Any]:
dog = self.repository.get(dog_id, projector_func=project_dog)
return {"name": dog.name, "tricks": dog.tricks}
def construct_mapper(self) -> Mapper:
return self.factory.mapper(
transcoder=self.construct_transcoder(),
mapper_class=PydanticMapper,
)
def construct_transcoder(self) -> Transcoder:
return OrjsonTranscoder()
Persistence¶
from typing import Any, Dict, cast
import orjson
from pydantic import BaseModel
from eventsourcing.domain import DomainEventProtocol
from eventsourcing.persistence import (
Mapper,
ProgrammingError,
StoredEvent,
Transcoder,
Transcoding,
)
from eventsourcing.utils import get_topic, resolve_topic
class PydanticMapper(Mapper):
def to_stored_event(self, domain_event: DomainEventProtocol) -> StoredEvent:
topic = get_topic(domain_event.__class__)
event_state = cast(BaseModel, domain_event).dict()
stored_state = self.transcoder.encode(event_state)
if self.compressor:
stored_state = self.compressor.compress(stored_state) # pragma: no cover
if self.cipher:
stored_state = self.cipher.encrypt(stored_state) # pragma: no cover
return StoredEvent(
originator_id=domain_event.originator_id,
originator_version=domain_event.originator_version,
topic=topic,
state=stored_state,
)
def to_domain_event(self, stored: StoredEvent) -> DomainEventProtocol:
stored_state = stored.state
if self.cipher:
stored_state = self.cipher.decrypt(stored_state) # pragma: no cover
if self.compressor:
stored_state = self.compressor.decompress(stored_state) # pragma: no cover
event_state: Dict[str, Any] = self.transcoder.decode(stored_state)
cls = resolve_topic(stored.topic)
return cls(**event_state)
class OrjsonTranscoder(Transcoder):
def encode(self, obj: Any) -> bytes:
return orjson.dumps(obj)
def decode(self, data: bytes) -> Any:
return orjson.loads(data)
def register(self, transcoding: Transcoding) -> None:
raise ProgrammingError("Please use Pydantic BaseModel") # pragma: no cover
Test case¶
from unittest import TestCase
from eventsourcing.examples.aggregate7.application import DogSchool
from eventsourcing.examples.aggregate7.domainmodel import project_dog
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
school.take_snapshot(dog_id, version=3, projector_func=project_dog)
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead", "fetch ball")