Source code for eventsourcing.infrastructure.factory

from json import JSONDecoder, JSONEncoder
from typing import Any, Generic, NamedTuple, Optional, Type

from eventsourcing.infrastructure.base import (
    DEFAULT_PIPELINE_ID,
    AbstractEventStore,
    AbstractRecordManager,
)
from eventsourcing.infrastructure.datastore import AbstractDatastore
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.sequenceditem import SequencedItem
from eventsourcing.infrastructure.sequenceditemmapper import (
    AbstractSequencedItemMapper,
    SequencedItemMapper,
)
from eventsourcing.utils.cipher.aes import AESCipher
from eventsourcing.whitehead import TEvent


[docs]class InfrastructureFactory(Generic[TEvent]): """ Base class for infrastructure factories. """ record_manager_class: Optional[Type[AbstractRecordManager]] = None sequenced_item_class: Type[NamedTuple] = SequencedItem # type: ignore sequenced_item_mapper_class: Type[AbstractSequencedItemMapper] = SequencedItemMapper integer_sequenced_record_class: Optional[type] = None integer_sequenced_noid_record_class: Optional[type] = None timestamp_sequenced_record_class: Optional[type] = None snapshot_record_class: Optional[type] = None json_decoder_class: Optional[Type[JSONDecoder]] = None json_encoder_class: Optional[Type[JSONEncoder]] = None event_store_class: Optional[Type[AbstractEventStore]] = None
[docs] def __init__( self, record_manager_class: Optional[Type[AbstractRecordManager]] = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, event_store_class: Optional[Type[AbstractEventStore]] = None, sequenced_item_mapper_class: Optional[Type[AbstractSequencedItemMapper]] = None, json_encoder_class: Optional[Type[JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[JSONDecoder]] = None, integer_sequenced_record_class: Optional[type] = None, timestamp_sequenced_record_class: Optional[type] = None, snapshot_record_class: Optional[type] = None, contiguous_record_ids: bool = False, application_name: Optional[str] = None, pipeline_id: int = DEFAULT_PIPELINE_ID, ): self.record_manager_class = ( record_manager_class or type(self).record_manager_class ) self.event_store_class = event_store_class or type(self).event_store_class self.sequenced_item_class = ( sequenced_item_class or type(self).sequenced_item_class ) self.sequenced_item_mapper_class = ( sequenced_item_mapper_class or type(self).sequenced_item_mapper_class ) self.json_encoder_class = json_encoder_class or type(self).json_encoder_class self.sort_keys = sort_keys self.json_decoder_class = json_decoder_class or type(self).json_decoder_class self._integer_sequenced_record_class = integer_sequenced_record_class self._timestamp_sequenced_record_class = timestamp_sequenced_record_class self._snapshot_record_class = snapshot_record_class self.contiguous_record_ids = contiguous_record_ids self.application_name = application_name self.pipeline_id = pipeline_id
[docs] def construct_integer_sequenced_record_manager( self, integer_sequenced_record_class=None, **kwargs: Any ) -> AbstractRecordManager: """ Constructs an integer sequenced record manager. """ integer_sequenced_record_class = ( integer_sequenced_record_class or self._integer_sequenced_record_class or self.integer_sequenced_record_class ) return self.construct_record_manager( integer_sequenced_record_class, **kwargs )
[docs] def construct_timestamp_sequenced_record_manager( self, **kwargs: Any ) -> AbstractRecordManager: """ Constructs a timestamp sequenced record manager. """ timestamp_sequenced_record_class = ( self._timestamp_sequenced_record_class or self.timestamp_sequenced_record_class ) return self.construct_record_manager(timestamp_sequenced_record_class, **kwargs)
[docs] def construct_snapshot_record_manager(self, **kwargs: Any) -> AbstractRecordManager: """ Constructs a snapshot record manager. """ snapshot_record_class = ( self._snapshot_record_class or self.snapshot_record_class ) return self.construct_record_manager(snapshot_record_class, **kwargs)
[docs] def construct_record_manager( self, record_class: Optional[type], sequenced_item_class: Optional[Type[NamedTuple]] = None, **kwargs: Any ) -> AbstractRecordManager: """ Constructs an record manager. """ assert self.record_manager_class is not None return self.record_manager_class( sequenced_item_class=sequenced_item_class or self.sequenced_item_class, record_class=record_class, contiguous_record_ids=self.contiguous_record_ids, application_name=self.application_name, pipeline_id=self.pipeline_id, **kwargs )
[docs] def construct_sequenced_item_mapper( self, cipher: Optional[AESCipher], compressor: Any, ) -> AbstractSequencedItemMapper: """ Constructs sequenced item mapper object. :returns: Sequenced item mapper object. :rtype: eventsourcing.infrastructure.sequenceditemmapper .AbstractSequencedItemMapper """ return self.sequenced_item_mapper_class( sequenced_item_class=self.sequenced_item_class, cipher=cipher, compressor=compressor, # sequence_id_attr_name=sequence_id_attr_name, # position_attr_name=position_attr_name, json_encoder_class=self.json_encoder_class, sort_keys=self.sort_keys, json_decoder_class=self.json_decoder_class, )
[docs] def construct_integer_sequenced_event_store( self, cipher: Optional[AESCipher], compressor: Any, ) -> AbstractEventStore: """ Constructs an integer sequenced event store. """ sequenced_item_mapper = self.construct_sequenced_item_mapper(cipher, compressor) record_manager = self.construct_integer_sequenced_record_manager() return (self.event_store_class or EventStore)( record_manager=record_manager, event_mapper=sequenced_item_mapper )
[docs] def construct_datastore(self) -> Optional[AbstractDatastore]: """ Constructs datastore object. :returns: Concrete datastore object object. """ return None