Aggregate 8 - Pydantic with declarative syntax

This example shows another variation of the Dog aggregate class used in the tutorial and module docs.

Similar to the first example, the aggregate is expressed using the library’s declarative syntax. And similar to the previous example, the model events are defined using Pydantic.

The application class in this example uses the persistence classes PydanticMapper and OrjsonTranscoder, just like the previous example. Pydantic is responsible for converting domain model objects to object types that orjson can serialise, and for reconstructing model objects from JSON objects that have been deserialised by orjson. The application class also uses the custom Snapshot class, which is defined as a Pydantic model.

One advantage of using Pydantic here is that any custom value objects will be automatically reconstructed without needing to define the transcoding classes that would be needed when using the library’s default JSONTranscoder. This is demonstrated in the example below with the Trick class, which is used in both aggregate events and aggregate state, and which is reconstructed from serialised string values, representing only the name of the trick, from both recorded aggregate events and from recorded snapshots.

Domain model

from __future__ import annotations

from datetime import datetime
from typing import List
from uuid import UUID

from pydantic import BaseModel, Extra

from eventsourcing.domain import (
    Aggregate as BaseAggregate,
    CanInitAggregate,
    CanMutateAggregate,
    CanSnapshotAggregate,
    event,
)


class DomainEvent(BaseModel):
    originator_id: UUID
    originator_version: int
    timestamp: datetime

    class Config:
        frozen = True


class Aggregate(BaseAggregate):
    class Event(DomainEvent, CanMutateAggregate):
        pass

    class Created(Event, CanInitAggregate):
        originator_topic: str


class SnapshotState(BaseModel):
    class Config:
        extra = Extra.allow


class AggregateSnapshot(DomainEvent, CanSnapshotAggregate):
    topic: str
    state: SnapshotState


class Trick(BaseModel):
    name: str


class DogState(SnapshotState):
    name: str
    tricks: List[Trick]


class Dog(Aggregate):
    class Snapshot(AggregateSnapshot):
        state: DogState

    @event("Registered")
    def __init__(self, name: str) -> None:
        self.name = name
        self.tricks: List[Trick] = []

    @event("TrickAdded")
    def add_trick(self, trick: Trick) -> None:
        self.tricks.append(trick)

Application

from __future__ import annotations

from typing import Any, Dict
from uuid import UUID

from eventsourcing.application import Application
from eventsourcing.examples.aggregate8.domainmodel import Dog, Trick
from eventsourcing.examples.aggregate8.persistence import (
    OrjsonTranscoder,
    PydanticMapper,
)
from eventsourcing.persistence import Mapper, Transcoder


class DogSchool(Application):
    is_snapshotting_enabled = True

    def register_dog(self, name: str) -> UUID:
        dog = Dog(name)
        self.save(dog)
        return dog.id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog: Dog = self.repository.get(dog_id)
        dog.add_trick(Trick(name=trick))
        self.save(dog)

    def get_dog(self, dog_id: UUID) -> Dict[str, Any]:
        dog: Dog = self.repository.get(dog_id)
        return {
            "name": dog.name,
            "tricks": tuple(dog.tricks),
            "created_on": dog.created_on,
            "modified_on": dog.modified_on,
        }

    def construct_mapper(self) -> Mapper:
        return self.factory.mapper(
            transcoder=self.construct_transcoder(),
            mapper_class=PydanticMapper,
        )

    def construct_transcoder(self) -> Transcoder:
        return OrjsonTranscoder()

Persistence

from __future__ import annotations

from typing import Any, Dict, cast

import orjson
from pydantic import BaseModel

from eventsourcing.domain import DomainEventProtocol
from eventsourcing.persistence import (
    Mapper,
    ProgrammingError,
    StoredEvent,
    Transcoder,
    Transcoding,
)
from eventsourcing.utils import get_topic, resolve_topic


class PydanticMapper(Mapper):
    def to_stored_event(self, domain_event: DomainEventProtocol) -> StoredEvent:
        topic = get_topic(domain_event.__class__)
        event_state = cast(BaseModel, domain_event).model_dump()
        stored_state = self.transcoder.encode(event_state)
        if self.compressor:
            stored_state = self.compressor.compress(stored_state)
        if self.cipher:
            stored_state = self.cipher.encrypt(stored_state)
        return StoredEvent(
            originator_id=domain_event.originator_id,
            originator_version=domain_event.originator_version,
            topic=topic,
            state=stored_state,
        )

    def to_domain_event(self, stored: StoredEvent) -> DomainEventProtocol:
        stored_state = stored.state
        if self.cipher:
            stored_state = self.cipher.decrypt(stored_state)
        if self.compressor:
            stored_state = self.compressor.decompress(stored_state)
        event_state: Dict[str, Any] = self.transcoder.decode(stored_state)
        cls = resolve_topic(stored.topic)
        return cls(**event_state)


class OrjsonTranscoder(Transcoder):
    def encode(self, obj: Any) -> bytes:
        return orjson.dumps(obj)

    def decode(self, data: bytes) -> Any:
        return orjson.loads(data)

    def register(self, transcoding: Transcoding) -> None:
        raise ProgrammingError("Please use Pydantic BaseModel")  # pragma: no cover

Test case

from __future__ import annotations

from typing import Tuple
from unittest import TestCase

from eventsourcing.examples.aggregate8.application import DogSchool
from eventsourcing.examples.aggregate8.domainmodel import Trick


class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqualTricks(dog["tricks"], ("roll over", "play dead"))

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        school.take_snapshot(dog_id, version=3)
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqualTricks(dog["tricks"], ("roll over", "play dead"))

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        self.assertEqual(dog["name"], "Fido")
        self.assertEqualTricks(dog["tricks"], ("roll over", "play dead", "fetch ball"))

    def assertEqualTricks(
        self, actual: Tuple[Trick, ...], expected: Tuple[str, ...]
    ) -> None:
        self.assertEqual(len(actual), len(expected))
        for i, trick in enumerate(actual):
            self.assertIsInstance(trick, Trick)
            self.assertEqual(trick.name, expected[i])