Source code for eventsourcing.infrastructure.sqlalchemy.factory

from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.factory import InfrastructureFactory
from eventsourcing.infrastructure.sequenceditem import StoredEvent
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemyDatastore, SQLAlchemySettings
from eventsourcing.infrastructure.sqlalchemy.manager import SQLAlchemyRecordManager, TrackingRecordManager
from eventsourcing.infrastructure.sqlalchemy.records import IntegerSequencedWithIDRecord, SnapshotRecord, \
    StoredEventRecord, TimestampSequencedNoIDRecord


[docs]class SQLAlchemyInfrastructureFactory(InfrastructureFactory): record_manager_class = SQLAlchemyRecordManager integer_sequenced_record_class = IntegerSequencedWithIDRecord timestamp_sequenced_record_class = TimestampSequencedNoIDRecord snapshot_record_class = SnapshotRecord tracking_record_manager_class = TrackingRecordManager def __init__(self, session, uri=None, pool_size=5, *args, **kwargs): super(SQLAlchemyInfrastructureFactory, self).__init__(*args, **kwargs) self.session = session self.uri = uri self.pool_size = pool_size
[docs] def construct_record_manager(self, **kwargs): s = super(SQLAlchemyInfrastructureFactory, self) return s.construct_record_manager(session=self.session, **kwargs)
[docs] def construct_datastore(self): datastore = SQLAlchemyDatastore( settings=SQLAlchemySettings( uri=self.uri, pool_size=self.pool_size ), session=self.session, ) self.session = datastore.session return datastore
[docs] def construct_tracking_record_manager(self): s = super(SQLAlchemyInfrastructureFactory, self) return s.construct_tracking_record_manager(self.session)
[docs]def construct_sqlalchemy_eventstore(session, sequenced_item_class=None, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=None, json_decoder_class=None, cipher=None, record_class=None, contiguous_record_ids=False, application_id=None, pipeline_id=-1, ): sequenced_item_class = sequenced_item_class or StoredEvent sequenced_item_mapper = SequencedItemMapper( sequenced_item_class=sequenced_item_class, sequence_id_attr_name=sequence_id_attr_name, position_attr_name=position_attr_name, json_encoder_class=json_encoder_class, json_decoder_class=json_decoder_class, cipher=cipher, ) factory = SQLAlchemyInfrastructureFactory( session=session, integer_sequenced_record_class=record_class or StoredEventRecord, sequenced_item_class=sequenced_item_class, contiguous_record_ids=contiguous_record_ids, application_id=application_id, pipeline_id=pipeline_id, ) record_manager = factory.construct_integer_sequenced_record_manager() event_store = EventStore( record_manager=record_manager, sequenced_item_mapper=sequenced_item_mapper, ) return event_store