Aggregate 8 - Pydantic with declarative syntax¶
This example shows another variation of the Dog
aggregate class used
in the tutorial and module docs.
Similar to the first example, the aggregate is expressed using the library’s declarative syntax. And similar to the previous example, the model events are defined using Pydantic. To demonstrate both the declative and explicit syntax, one aggregate event class is defined implicitly from the method signature with the event class name defined in the command method decorator, and the other event class is defined explicitly and referenced in the command method decorator.
Similar to the previous example, the application class in this example
uses the persistence classes PydanticMapper
and OrjsonTranscoder
.
Pydantic is responsible for converting domain model objects to object types
that orjson can serialise, and for reconstructing model objects from JSON
objects that have been deserialised by orjson. The application class also
uses the custom Snapshot
class, which is defined as a Pydantic
model.
One advantage of using Pydantic here is that any custom value objects
will be automatically reconstructed without needing to define the
transcoding classes that would be needed when using the library’s
default JSONTranscoder
.
Domain model¶
from datetime import datetime
from typing import Any, Dict, List
from uuid import UUID
from pydantic import BaseModel
from eventsourcing.domain import (
Aggregate as BaseAggregate,
CanInitAggregate,
CanMutateAggregate,
CanSnapshotAggregate,
event,
)
class DomainEvent(BaseModel):
originator_id: UUID
originator_version: int
timestamp: datetime
class Config:
allow_mutation = False
class Aggregate(BaseAggregate):
class Event(DomainEvent, CanMutateAggregate):
pass
class Created(Event, CanInitAggregate):
originator_topic: str
class Snapshot(DomainEvent, CanSnapshotAggregate):
topic: str
state: Dict[str, Any]
class Dog(Aggregate):
@event("Registered")
def __init__(self, name: str) -> None:
self.name = name
self.tricks: List[str] = []
class TrickAdded(Aggregate.Event):
trick: str
@event(TrickAdded)
def add_trick(self, trick: str) -> None:
self.tricks.append(trick)
Application¶
from typing import Any, Dict
from uuid import UUID
from eventsourcing.application import Application
from eventsourcing.examples.aggregate8.domainmodel import Dog, Snapshot
from eventsourcing.examples.aggregate8.persistence import (
OrjsonTranscoder,
PydanticMapper,
)
from eventsourcing.persistence import Mapper, Transcoder
class DogSchool(Application):
env = {
"AGGREGATE_CACHE_MAXSIZE": "50",
"DEEPCOPY_FROM_AGGREGATE_CACHE": "n",
"IS_SNAPSHOTTING_ENABLED": "y",
}
snapshot_class = Snapshot
def register_dog(self, name: str) -> UUID:
dog = Dog(name)
self.save(dog)
return dog.id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog: Dog = self.repository.get(dog_id)
dog.add_trick(trick)
self.save(dog)
def get_dog(self, dog_id: UUID) -> Dict[str, Any]:
dog: Dog = self.repository.get(dog_id)
return {"name": dog.name, "tricks": tuple(dog.tricks)}
def construct_mapper(self) -> Mapper:
return self.factory.mapper(
transcoder=self.construct_transcoder(),
mapper_class=PydanticMapper,
)
def construct_transcoder(self) -> Transcoder:
return OrjsonTranscoder()
Persistence¶
from typing import Any, Dict, cast
import orjson
from pydantic import BaseModel
from eventsourcing.domain import DomainEventProtocol
from eventsourcing.persistence import (
Mapper,
ProgrammingError,
StoredEvent,
Transcoder,
Transcoding,
)
from eventsourcing.utils import get_topic, resolve_topic
class PydanticMapper(Mapper):
def to_stored_event(self, domain_event: DomainEventProtocol) -> StoredEvent:
topic = get_topic(domain_event.__class__)
event_state = cast(BaseModel, domain_event).dict()
stored_state = self.transcoder.encode(event_state)
if self.compressor:
stored_state = self.compressor.compress(stored_state)
if self.cipher:
stored_state = self.cipher.encrypt(stored_state)
return StoredEvent(
originator_id=domain_event.originator_id,
originator_version=domain_event.originator_version,
topic=topic,
state=stored_state,
)
def to_domain_event(self, stored: StoredEvent) -> DomainEventProtocol:
stored_state = stored.state
if self.cipher:
stored_state = self.cipher.decrypt(stored_state)
if self.compressor:
stored_state = self.compressor.decompress(stored_state)
event_state: Dict[str, Any] = self.transcoder.decode(stored_state)
cls = resolve_topic(stored.topic)
return cls(**event_state)
class OrjsonTranscoder(Transcoder):
def encode(self, obj: Any) -> bytes:
return orjson.dumps(obj)
def decode(self, data: bytes) -> Any:
return orjson.loads(data)
def register(self, transcoding: Transcoding) -> None:
raise ProgrammingError("Please use Pydantic BaseModel") # pragma: no cover
Test case¶
from unittest import TestCase
from eventsourcing.examples.aggregate8.application import DogSchool
class TestDogSchool(TestCase):
def test_dog_school(self) -> None:
# Construct application object.
school = DogSchool()
# Evolve application state.
dog_id = school.register_dog("Fido")
school.add_trick(dog_id, "roll over")
school.add_trick(dog_id, "play dead")
# Query application state.
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Select notifications.
notifications = school.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
# Take snapshot.
school.take_snapshot(dog_id, version=3)
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead")
# Continue with snapshotted aggregate.
school.add_trick(dog_id, "fetch ball")
dog = school.get_dog(dog_id)
assert dog["name"] == "Fido"
assert dog["tricks"] == ("roll over", "play dead", "fetch ball")