Aggregate 7 - Pydantic and orjson

This example shows the Dog class used in the tutorial and module docs.

Similar to the previous example, the model is expressed in a functional style, and does not use the library Aggregate class. In contrast to the previous example, the aggregate and event classes are defined as Pydantic immutable models, rather than as Python frozen dataclasses.

The application class in this example uses the persistence classes PydanticMapper and OrjsonTranscoder. Pydantic is responsible for converting domain model objects to object types that orjson can serialise, and for reconstructing model objects from JSON objects that have been deserialised by orjson. The application class also uses the custom Snapshot class, which is defined as a Pydantic model.

Domain model

from __future__ import annotations

from datetime import datetime, timezone
from functools import singledispatch
from time import monotonic
from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar
from uuid import UUID, uuid4

from pydantic import BaseModel

from eventsourcing.application import MutatorFunction, ProjectorFunction
from eventsourcing.utils import get_topic


class DomainEvent(BaseModel):
    originator_id: UUID
    originator_version: int
    timestamp: datetime

    class Config:
        allow_mutation = False


def create_timestamp() -> datetime:
    return datetime.fromtimestamp(monotonic(), timezone.utc)


class Aggregate(BaseModel):
    id: UUID
    version: int
    created_on: datetime

    class Config:
        allow_mutation = False


class Snapshot(DomainEvent):
    topic: str
    state: Dict[str, Any]

    class Config:
        allow_mutation = False

    @classmethod
    def take(cls, aggregate: Aggregate) -> Snapshot:
        return Snapshot(
            originator_id=aggregate.id,
            originator_version=aggregate.version,
            timestamp=create_timestamp(),
            topic=get_topic(type(aggregate)),
            state=aggregate.dict(),
        )


TAggregate = TypeVar("TAggregate", bound=Aggregate)


def aggregate_projector(
    mutator: MutatorFunction[TAggregate],
) -> ProjectorFunction[TAggregate]:
    def project_aggregate(
        aggregate: Optional[TAggregate], events: Iterable[DomainEvent]
    ) -> Optional[TAggregate]:
        for event in events:
            aggregate = mutator(event, aggregate)
        return aggregate

    return project_aggregate


class Dog(Aggregate):
    name: str
    tricks: Tuple[str, ...]


class DogRegistered(DomainEvent):
    name: str


class TrickAdded(DomainEvent):
    trick: str


def register_dog(name: str) -> DomainEvent:
    return DogRegistered(
        originator_id=uuid4(),
        originator_version=1,
        timestamp=create_timestamp(),
        name=name,
    )


def add_trick(dog: Dog, trick: str) -> DomainEvent:
    return TrickAdded(
        originator_id=dog.id,
        originator_version=dog.version + 1,
        timestamp=create_timestamp(),
        trick=trick,
    )


@singledispatch
def mutate_dog(event: DomainEvent, dog: Optional[Dog]) -> Optional[Dog]:
    """Mutates aggregate with event."""


@mutate_dog.register
def _(event: DogRegistered, _: None) -> Dog:
    return Dog(
        id=event.originator_id,
        version=event.originator_version,
        created_on=event.timestamp,
        name=event.name,
        tricks=(),
    )


@mutate_dog.register
def _(event: TrickAdded, dog: Dog) -> Dog:
    return Dog(
        id=dog.id,
        version=event.originator_version,
        created_on=event.timestamp,
        name=dog.name,
        tricks=dog.tricks + (event.trick,),
    )


@mutate_dog.register
def _(event: Snapshot, _: None) -> Dog:
    return Dog(
        id=event.state["id"],
        version=event.state["version"],
        created_on=event.state["created_on"],
        name=event.state["name"],
        tricks=event.state["tricks"],
    )


project_dog = aggregate_projector(mutate_dog)

Application

from typing import Any, Dict
from uuid import UUID

from eventsourcing.application import Application
from eventsourcing.examples.aggregate7.domainmodel import (
    Snapshot,
    add_trick,
    project_dog,
    register_dog,
)
from eventsourcing.examples.aggregate7.persistence import (
    OrjsonTranscoder,
    PydanticMapper,
)
from eventsourcing.persistence import Mapper, Transcoder


class DogSchool(Application):
    is_snapshotting_enabled = True
    snapshot_class = Snapshot

    def register_dog(self, name: str) -> UUID:
        event = register_dog(name)
        self.save(event)
        return event.originator_id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog = self.repository.get(dog_id, projector_func=project_dog)
        self.save(add_trick(dog, trick))

    def get_dog(self, dog_id: UUID) -> Dict[str, Any]:
        dog = self.repository.get(dog_id, projector_func=project_dog)
        return {"name": dog.name, "tricks": dog.tricks}

    def construct_mapper(self) -> Mapper:
        return self.factory.mapper(
            transcoder=self.construct_transcoder(),
            mapper_class=PydanticMapper,
        )

    def construct_transcoder(self) -> Transcoder:
        return OrjsonTranscoder()

Persistence

from typing import Any, Dict, cast

import orjson
from pydantic import BaseModel

from eventsourcing.domain import DomainEventProtocol
from eventsourcing.persistence import (
    Mapper,
    ProgrammingError,
    StoredEvent,
    Transcoder,
    Transcoding,
)
from eventsourcing.utils import get_topic, resolve_topic


class PydanticMapper(Mapper):
    def to_stored_event(self, domain_event: DomainEventProtocol) -> StoredEvent:
        topic = get_topic(domain_event.__class__)
        event_state = cast(BaseModel, domain_event).dict()
        stored_state = self.transcoder.encode(event_state)
        if self.compressor:
            stored_state = self.compressor.compress(stored_state)  # pragma: no cover
        if self.cipher:
            stored_state = self.cipher.encrypt(stored_state)  # pragma: no cover
        return StoredEvent(
            originator_id=domain_event.originator_id,
            originator_version=domain_event.originator_version,
            topic=topic,
            state=stored_state,
        )

    def to_domain_event(self, stored: StoredEvent) -> DomainEventProtocol:
        stored_state = stored.state
        if self.cipher:
            stored_state = self.cipher.decrypt(stored_state)  # pragma: no cover
        if self.compressor:
            stored_state = self.compressor.decompress(stored_state)  # pragma: no cover
        event_state: Dict[str, Any] = self.transcoder.decode(stored_state)
        cls = resolve_topic(stored.topic)
        return cls(**event_state)


class OrjsonTranscoder(Transcoder):
    def encode(self, obj: Any) -> bytes:
        return orjson.dumps(obj)

    def decode(self, data: bytes) -> Any:
        return orjson.loads(data)

    def register(self, transcoding: Transcoding) -> None:
        raise ProgrammingError("Please use Pydantic BaseModel")  # pragma: no cover

Test case

from unittest import TestCase

from eventsourcing.examples.aggregate7.application import DogSchool
from eventsourcing.examples.aggregate7.domainmodel import project_dog


class TestDogSchool(TestCase):
    def test_dog_school(self) -> None:
        # Construct application object.
        school = DogSchool()

        # Evolve application state.
        dog_id = school.register_dog("Fido")
        school.add_trick(dog_id, "roll over")
        school.add_trick(dog_id, "play dead")

        # Query application state.
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Select notifications.
        notifications = school.notification_log.select(start=1, limit=10)
        assert len(notifications) == 3

        # Take snapshot.
        school.take_snapshot(dog_id, version=3, projector_func=project_dog)
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead")

        # Continue with snapshotted aggregate.
        school.add_trick(dog_id, "fetch ball")
        dog = school.get_dog(dog_id)
        assert dog["name"] == "Fido"
        assert dog["tricks"] == ("roll over", "play dead", "fetch ball")