Source code for eventsourcing.infrastructure.sqlalchemy.factory

from typing import Any, NamedTuple, Optional, Type

from eventsourcing.domain.model.events import DomainEvent
from eventsourcing.infrastructure.base import DEFAULT_PIPELINE_ID, AbstractRecordManager
from eventsourcing.infrastructure.datastore import AbstractDatastore
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.factory import InfrastructureFactory
from eventsourcing.infrastructure.sequenceditem import StoredEvent
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
from eventsourcing.infrastructure.sqlalchemy.datastore import (
    SQLAlchemyDatastore,
    SQLAlchemySettings,
)
from eventsourcing.infrastructure.sqlalchemy.manager import SQLAlchemyRecordManager
from eventsourcing.infrastructure.sqlalchemy.records import (
    IntegerSequencedWithIDRecord,
    NotificationTrackingRecord,
    SnapshotRecord,
    StoredEventRecord,
    TimestampSequencedNoIDRecord,
)
from eventsourcing.utils.cipher.aes import AESCipher
from eventsourcing.utils.transcoding import ObjectJSONDecoder, ObjectJSONEncoder


[docs]class SQLAlchemyInfrastructureFactory(InfrastructureFactory): """ Infrastructure factory for SQLAlchemy infrastructure. """ record_manager_class = SQLAlchemyRecordManager integer_sequenced_record_class = IntegerSequencedWithIDRecord timestamp_sequenced_record_class = TimestampSequencedNoIDRecord snapshot_record_class = SnapshotRecord tracking_record_class = NotificationTrackingRecord
[docs] def __init__( self, session: Any, uri: Optional[str] = None, pool_size: Optional[int] = None, tracking_record_class: Optional[type] = None, *args: Any, **kwargs: Any ): super(SQLAlchemyInfrastructureFactory, self).__init__(*args, **kwargs) self.session = session self.uri = uri self.pool_size = pool_size self._tracking_record_class = tracking_record_class
[docs] def construct_integer_sequenced_record_manager( self, **kwargs: Any ) -> AbstractRecordManager: """ Constructs SQLAlchemy record manager. :return: An SQLAlchemy record manager. :rtype: SQLAlchemyRecordManager """ tracking_record_class = ( self._tracking_record_class or self.tracking_record_class ) return super( SQLAlchemyInfrastructureFactory, self ).construct_integer_sequenced_record_manager( tracking_record_class=tracking_record_class, **kwargs )
[docs] def construct_record_manager( self, record_class: Optional[type], sequenced_item_class: Optional[Type[NamedTuple]] = None, **kwargs: Any ) -> AbstractRecordManager: """ Constructs SQLAlchemy record manager. :return: An SQLAlchemy record manager. :rtype: SQLAlchemyRecordManager """ return super(SQLAlchemyInfrastructureFactory, self).construct_record_manager( record_class, sequenced_item_class=sequenced_item_class, session=self.session, **kwargs )
[docs] def construct_datastore(self) -> Optional[AbstractDatastore]: """ Constructs SQLAlchemy datastore. :rtype: SQLAlchemyDatastore """ datastore = SQLAlchemyDatastore( settings=SQLAlchemySettings(uri=self.uri, pool_size=self.pool_size), session=self.session, ) if self.session is None: assert datastore.session, "Datastore object session is None" self.session = datastore.session return datastore
def construct_sqlalchemy_eventstore( session: Any, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequence_id_attr_name: Optional[str] = None, position_attr_name: Optional[str] = None, json_encoder_class: Optional[Type[ObjectJSONEncoder]] = None, json_decoder_class: Optional[Type[ObjectJSONDecoder]] = None, cipher: Optional[AESCipher] = None, record_class: Optional[type] = None, contiguous_record_ids: bool = False, application_name: Optional[str] = None, pipeline_id: int = DEFAULT_PIPELINE_ID, ) -> EventStore: sequenced_item_class = sequenced_item_class or StoredEvent # type: ignore sequenced_item_mapper = SequencedItemMapper[DomainEvent]( sequenced_item_class=sequenced_item_class, sequence_id_attr_name=sequence_id_attr_name, position_attr_name=position_attr_name, json_encoder_class=json_encoder_class, json_decoder_class=json_decoder_class, cipher=cipher, ) factory = SQLAlchemyInfrastructureFactory( session=session, integer_sequenced_record_class=record_class or StoredEventRecord, sequenced_item_class=sequenced_item_class, contiguous_record_ids=contiguous_record_ids, application_name=application_name, pipeline_id=pipeline_id, ) record_manager = factory.construct_integer_sequenced_record_manager() event_store = EventStore[DomainEvent, AbstractRecordManager]( record_manager=record_manager, event_mapper=sequenced_item_mapper ) return event_store