Source code for eventsourcing.infrastructure.sequenceditemmapper

from abc import ABC, abstractmethod
from json import JSONDecodeError
from typing import Any, Dict, Generic, NamedTuple, Optional, Tuple, Type

from eventsourcing.infrastructure.sequenceditem import (
    SequencedItem,
    SequencedItemFieldNames,
)
from eventsourcing.utils.cipher.aes import AESCipher
from eventsourcing.utils.topic import get_topic, reconstruct_object, resolve_topic
from eventsourcing.utils.transcoding import ObjectJSONDecoder, ObjectJSONEncoder
from eventsourcing.whitehead import TEvent


[docs]class AbstractSequencedItemMapper(Generic[TEvent], ABC):
[docs] def __init__(self, **kwargs: Any): """ Initialises mapper. """
[docs] @abstractmethod def item_from_event(self, domain_event: TEvent) -> NamedTuple: """ Constructs and returns a sequenced item for given domain event. """
[docs] @abstractmethod def event_from_item(self, sequenced_item: NamedTuple) -> TEvent: """ Constructs and returns a domain event for given sequenced item. """
[docs] @abstractmethod def json_dumps(self, o: object) -> bytes: """ Encodes given object as JSON. """
[docs] @abstractmethod def json_loads(self, s: str) -> object: """ Decodes given JSON as object. """
[docs] @abstractmethod def event_from_topic_and_state(self, topic: str, state: bytes) -> TEvent: """ Resolves topic to an event class, decodes state, and constructs an event. """
[docs] @abstractmethod def event_from_notification(self, notification): """ Reconstructs domain event from an event notification. :param notification: The event notification. :return: A domain event. """
[docs]class SequencedItemMapper(AbstractSequencedItemMapper[TEvent]): """ Uses JSON to transcode domain events. """
[docs] def __init__( self, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequence_id_attr_name: Optional[str] = None, position_attr_name: Optional[str] = None, json_encoder_class: Optional[Type[ObjectJSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[ObjectJSONDecoder]] = None, cipher: Optional[AESCipher] = None, compressor: Any = None, other_attr_names: Tuple[str, ...] = (), ): if sequenced_item_class is not None: self.sequenced_item_class = sequenced_item_class else: self.sequenced_item_class = SequencedItem # type: ignore self.json_encoder_class = json_encoder_class or ObjectJSONEncoder self.json_encoder = self.json_encoder_class(sort_keys=sort_keys) self.json_decoder_class = json_decoder_class or ObjectJSONDecoder self.json_decoder = self.json_decoder_class() self.cipher = cipher self.compressor = compressor self.field_names = SequencedItemFieldNames(self.sequenced_item_class) self.sequence_id_attr_name = ( sequence_id_attr_name or self.field_names.sequence_id ) self.position_attr_name = position_attr_name or self.field_names.position self.other_attr_names = other_attr_names or self.field_names.other_names
[docs] def item_from_event(self, domain_event: TEvent) -> NamedTuple: """ Constructs a sequenced item from a domain event. """ item_args = self.construct_item_args(domain_event) return self.construct_sequenced_item(item_args)
[docs] def construct_item_args(self, domain_event: TEvent) -> Tuple: """ Constructs attributes of a sequenced item from the given domain event. """ # Get the sequence ID. sequence_id = domain_event.__dict__[self.sequence_id_attr_name] # Get the position in the sequence. position = getattr(domain_event, self.position_attr_name, None) # Get topic and data. topic, state = self.get_item_topic_and_state( domain_event.__class__, domain_event.__dict__ ) # Get the 'other' args. # - these are meant to be derivative of the other attributes, # to populate database fields, and shouldn't affect the hash. other_args = tuple( (getattr(domain_event, name) for name in self.other_attr_names) ) return (sequence_id, position, topic, state) + other_args
def get_item_topic_and_state( self, domain_event_class: type, event_attrs: Dict[str, Any] ) -> Tuple[str, bytes]: # Get the topic from the event attrs, otherwise from the class. topic = get_topic(domain_event_class) # Serialise the event attributes. statebytes = self.json_dumps(event_attrs) # Compress plaintext bytes. if self.compressor: # Zlib reduces length by about 25% to 50%. statebytes = self.compressor.compress(statebytes) # Encrypt serialised state. if self.cipher: # Increases length by about 10%. statebytes = self.cipher.encrypt(statebytes) return topic, statebytes
[docs] def json_dumps(self, o: object) -> bytes: return self.json_encoder.encode(o)
def construct_sequenced_item(self, item_args: Tuple) -> NamedTuple: return self.sequenced_item_class(*item_args)
[docs] def event_from_item(self, sequenced_item: NamedTuple) -> TEvent: """ Reconstructs domain event from stored event topic and event attrs. Used in the event store when getting domain events. """ assert isinstance(sequenced_item, self.sequenced_item_class), ( self.sequenced_item_class, type(sequenced_item), ) # Get the topic and state. topic = getattr(sequenced_item, self.field_names.topic) state = getattr(sequenced_item, self.field_names.state) return self.event_from_topic_and_state(topic, state)
[docs] def event_from_topic_and_state(self, topic: str, state: bytes) -> TEvent: domain_event_class, event_attrs = self.get_event_class_and_attrs(topic, state) # Reconstruct domain event object. return reconstruct_object(domain_event_class, event_attrs)
def get_event_class_and_attrs( self, topic: str, state: bytes ) -> Tuple[Type[TEvent], Dict]: # Resolve topic to event class. domain_event_class: Type[TEvent] = resolve_topic(topic) # Decrypt and decompress state. if self.cipher: state = self.cipher.decrypt(state) # Decompress plaintext bytes. if self.compressor: state = self.compressor.decompress(state) # Decode unicode bytes. statestr = state.decode("utf8") # Deserialize JSON. event_attrs: Dict = self.json_loads(statestr) # Return instance class and attribute values. return domain_event_class, event_attrs
[docs] def json_loads(self, s: str) -> Dict: try: return self.json_decoder.decode(s) except JSONDecodeError: raise ValueError("Couldn't load JSON string: {}".format(s))
[docs] def event_from_notification(self, notification): """ Reconstructs domain event from an event notification. :param notification: The event notification. :return: A domain event. """ return self.event_from_topic_and_state( topic=notification[self.field_names.topic], state=notification[self.field_names.state], )