Source code for eventsourcing.infrastructure.sqlalchemy.factory

from eventsourcing.infrastructure.base import DEFAULT_PIPELINE_ID
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.factory import InfrastructureFactory
from eventsourcing.infrastructure.sequenceditem import StoredEvent
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
from eventsourcing.infrastructure.sqlalchemy.datastore import (
    SQLAlchemyDatastore,
    SQLAlchemySettings,
)
from eventsourcing.infrastructure.sqlalchemy.manager import SQLAlchemyRecordManager
from eventsourcing.infrastructure.sqlalchemy.records import (
    IntegerSequencedWithIDRecord,
    SnapshotRecord,
    StoredEventRecord,
    TimestampSequencedNoIDRecord,
    NotificationTrackingRecord,
)


[docs]class SQLAlchemyInfrastructureFactory(InfrastructureFactory): """ Infrastructure factory for SQLAlchemy infrastructure. """ record_manager_class = SQLAlchemyRecordManager integer_sequenced_record_class = IntegerSequencedWithIDRecord timestamp_sequenced_record_class = TimestampSequencedNoIDRecord snapshot_record_class = SnapshotRecord tracking_record_class = NotificationTrackingRecord
[docs] def __init__( self, session, uri=None, pool_size=None, tracking_record_class=None, *args, **kwargs ): super(SQLAlchemyInfrastructureFactory, self).__init__(*args, **kwargs) self.session = session self.uri = uri self.pool_size = pool_size self._tracking_record_class = tracking_record_class
[docs] def construct_integer_sequenced_record_manager(self, **kwargs): """ Constructs SQLAlchemy record manager. :return: An SQLAlchemy record manager. :rtype: SQLAlchemyRecordManager """ tracking_record_class = ( self._tracking_record_class or self.tracking_record_class ) return super( SQLAlchemyInfrastructureFactory, self ).construct_integer_sequenced_record_manager( tracking_record_class=tracking_record_class, **kwargs )
[docs] def construct_record_manager(self, record_class, **kwargs): """ Constructs SQLAlchemy record manager. :return: An SQLAlchemy record manager. :rtype: SQLAlchemyRecordManager """ return super(SQLAlchemyInfrastructureFactory, self).construct_record_manager( record_class, session=self.session, **kwargs )
[docs] def construct_datastore(self): """ Constructs SQLAlchemy datastore. :rtype: SQLAlchemyDatastore """ datastore = SQLAlchemyDatastore( settings=SQLAlchemySettings(uri=self.uri, pool_size=self.pool_size), session=self.session, ) self.session = datastore.session return datastore
def construct_sqlalchemy_eventstore( session, sequenced_item_class=None, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=None, json_decoder_class=None, cipher=None, record_class=None, contiguous_record_ids=False, application_name=None, pipeline_id=DEFAULT_PIPELINE_ID, ): sequenced_item_class = sequenced_item_class or StoredEvent sequenced_item_mapper = SequencedItemMapper( sequenced_item_class=sequenced_item_class, sequence_id_attr_name=sequence_id_attr_name, position_attr_name=position_attr_name, json_encoder_class=json_encoder_class, json_decoder_class=json_decoder_class, cipher=cipher, ) factory = SQLAlchemyInfrastructureFactory( session=session, integer_sequenced_record_class=record_class or StoredEventRecord, sequenced_item_class=sequenced_item_class, contiguous_record_ids=contiguous_record_ids, application_name=application_name, pipeline_id=pipeline_id, ) record_manager = factory.construct_integer_sequenced_record_manager() event_store = EventStore( record_manager=record_manager, sequenced_item_mapper=sequenced_item_mapper ) return event_store