Source code for eventsourcing.dcb.msgpack

from __future__ import annotations

from typing import Any, TypeVar

import msgspec

from eventsourcing.dcb import api, domain, persistence
from eventsourcing.utils import get_topic, resolve_topic


[docs] class Decision(msgspec.Struct, domain.Decision):
[docs] def as_dict(self) -> dict[str, Any]: return {key: getattr(self, key) for key in self.__struct_fields__}
TDecision = TypeVar("TDecision", bound=Decision)
[docs] class MessagePackMapper(persistence.DCBMapper[Decision]):
[docs] def to_dcb_event(self, event: domain.Tagged[TDecision]) -> api.DCBEvent: return api.DCBEvent( type=get_topic(type(event.decision)), data=msgspec.msgpack.encode(event.decision), tags=event.tags, )
[docs] def to_domain_event(self, event: api.DCBEvent) -> domain.Tagged[Decision]: return domain.Tagged( tags=event.tags, decision=msgspec.msgpack.decode( event.data, type=resolve_topic(event.type), ), )
[docs] class InitialDecision(Decision, domain.InitialDecision): originator_topic: str