Source code for examples.aggregate7.orjsonpydantic

from __future__ import annotations

from typing import TYPE_CHECKING, Any, ClassVar, cast
from uuid import UUID

import orjson
from pydantic import BaseModel

from eventsourcing.application import Application
from eventsourcing.persistence import Mapper, StoredEvent, Transcoder
from eventsourcing.utils import get_topic, resolve_topic

if TYPE_CHECKING:
    from eventsourcing.domain import DomainEventProtocol


[docs] class PydanticMapper(Mapper[UUID]):
[docs] def to_stored_event(self, domain_event: DomainEventProtocol[UUID]) -> StoredEvent: topic = get_topic(domain_event.__class__) event_state = cast(BaseModel, domain_event).model_dump(mode="json") stored_state = self.transcoder.encode(event_state) if self.compressor: stored_state = self.compressor.compress(stored_state) if self.cipher: stored_state = self.cipher.encrypt(stored_state) return StoredEvent( originator_id=domain_event.originator_id, originator_version=domain_event.originator_version, topic=topic, state=stored_state, )
[docs] def to_domain_event(self, stored_event: StoredEvent) -> DomainEventProtocol[UUID]: stored_state = stored_event.state if self.cipher: stored_state = self.cipher.decrypt(stored_state) if self.compressor: stored_state = self.compressor.decompress(stored_state) event_state: dict[str, Any] = self.transcoder.decode(stored_state) cls = resolve_topic(stored_event.topic) return cls(**event_state)
[docs] class OrjsonTranscoder(Transcoder):
[docs] def encode(self, obj: Any) -> bytes: return orjson.dumps(obj)
[docs] def decode(self, data: bytes) -> Any: return orjson.loads(data)
[docs] class PydanticApplication(Application[UUID]): env: ClassVar[dict[str, str]] = { "TRANSCODER_TOPIC": get_topic(OrjsonTranscoder), "MAPPER_TOPIC": get_topic(PydanticMapper), }