Source code for eventsourcing.application.sqlalchemy

from eventsourcing.application.simple import ApplicationWithConcreteInfrastructure
from eventsourcing.infrastructure.sqlalchemy.factory import (
    SQLAlchemyInfrastructureFactory,
)
from eventsourcing.infrastructure.sqlalchemy.records import (
    EntitySnapshotRecord,
    StoredEventRecord,
)


[docs]class SQLAlchemyApplication(ApplicationWithConcreteInfrastructure): infrastructure_factory_class = SQLAlchemyInfrastructureFactory stored_event_record_class = StoredEventRecord snapshot_record_class = EntitySnapshotRecord is_constructed_with_session = True tracking_record_class = None
[docs] def __init__(self, uri=None, session=None, tracking_record_class=None, **kwargs): self.uri = uri self.session = session self.tracking_record_class = ( tracking_record_class or type(self).tracking_record_class ) super(SQLAlchemyApplication, self).__init__(**kwargs)
def construct_infrastructure(self, *args, **kwargs): super(SQLAlchemyApplication, self).construct_infrastructure( session=self.session, uri=self.uri, tracking_record_class=self.tracking_record_class, *args, **kwargs ) if self.datastore and self.session is None: self.session = self.datastore.session