Source code for eventsourcing.infrastructure.sqlalchemy.factory

from eventsourcing.infrastructure.base import DEFAULT_PIPELINE_ID
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.factory import InfrastructureFactory
from eventsourcing.infrastructure.sequenceditem import StoredEvent
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemyDatastore, SQLAlchemySettings
from eventsourcing.infrastructure.sqlalchemy.manager import SQLAlchemyRecordManager
from eventsourcing.infrastructure.sqlalchemy.records import IntegerSequencedWithIDRecord, SnapshotRecord, \
    StoredEventRecord, TimestampSequencedNoIDRecord


[docs]class SQLAlchemyInfrastructureFactory(InfrastructureFactory): record_manager_class = SQLAlchemyRecordManager integer_sequenced_record_class = IntegerSequencedWithIDRecord timestamp_sequenced_record_class = TimestampSequencedNoIDRecord snapshot_record_class = SnapshotRecord def __init__(self, session, uri=None, pool_size=None, *args, **kwargs): super(SQLAlchemyInfrastructureFactory, self).__init__(*args, **kwargs) self.session = session self.uri = uri self.pool_size = pool_size
[docs] def construct_record_manager(self, **kwargs): s = super(SQLAlchemyInfrastructureFactory, self) return s.construct_record_manager(session=self.session, **kwargs)
[docs] def construct_datastore(self): datastore = SQLAlchemyDatastore( settings=SQLAlchemySettings( uri=self.uri, pool_size=self.pool_size ), session=self.session, ) self.session = datastore.session return datastore
[docs]def construct_sqlalchemy_eventstore(session, sequenced_item_class=None, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=None, json_decoder_class=None, cipher=None, record_class=None, contiguous_record_ids=False, application_name=None, pipeline_id=DEFAULT_PIPELINE_ID, ): sequenced_item_class = sequenced_item_class or StoredEvent sequenced_item_mapper = SequencedItemMapper( sequenced_item_class=sequenced_item_class, sequence_id_attr_name=sequence_id_attr_name, position_attr_name=position_attr_name, json_encoder_class=json_encoder_class, json_decoder_class=json_decoder_class, cipher=cipher, ) factory = SQLAlchemyInfrastructureFactory( session=session, integer_sequenced_record_class=record_class or StoredEventRecord, sequenced_item_class=sequenced_item_class, contiguous_record_ids=contiguous_record_ids, application_name=application_name, pipeline_id=pipeline_id, ) record_manager = factory.construct_integer_sequenced_record_manager() event_store = EventStore( record_manager=record_manager, sequenced_item_mapper=sequenced_item_mapper, ) return event_store