import os
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import StaticPool
from eventsourcing.infrastructure.datastore import Datastore, DatastoreSettings
from eventsourcing.infrastructure.sqlalchemy.records import Base
DEFAULT_SQLALCHEMY_DB_URI = 'sqlite:///:memory:'
# DEFAULT_SQLALCHEMY_DB_URI = 'sqlite:///FILE_SYSTEM_PATH'
# DEFAULT_SQLALCHEMY_DB_URI = 'mysql://username:password@localhost/eventsourcing'
# DEFAULT_SQLALCHEMY_DB_URI = 'postgresql://username:password@localhost:5432/eventsourcing'
[docs]class SQLAlchemySettings(DatastoreSettings):
def __init__(self, uri=None, pool_size=5):
self.uri = uri or os.getenv('DB_URI', DEFAULT_SQLALCHEMY_DB_URI)
self.pool_size = pool_size
# self.pool_size = pool_size if not self.uri.startswith('sqlite') else 1
[docs]class SQLAlchemyDatastore(Datastore):
def __init__(self, base=Base, tables=None, connection_strategy='plain',
session=None, **kwargs):
super(SQLAlchemyDatastore, self).__init__(**kwargs)
self._session = session
self._engine = None if session is None else session.bind
self._base = base
self._tables = tables
self._connection_strategy = connection_strategy
@property
def session(self):
if self._session is None:
if self._engine is None:
self.setup_connection()
session_factory = sessionmaker(bind=self._engine)
self._session = scoped_session(session_factory)
return self._session
[docs] def setup_connection(self):
assert isinstance(self.settings, SQLAlchemySettings), self.settings
if self._engine is None:
if self.is_sqlite():
kwargs = {
'connect_args': {'check_same_thread': False},
}
elif self.settings.pool_size == 1:
kwargs = {
'poolclass': StaticPool
}
else:
kwargs = {
'pool_size': self.settings.pool_size,
}
self._engine = create_engine(
self.settings.uri,
strategy=self._connection_strategy,
**kwargs
)
assert self._engine
[docs] def is_sqlite(self):
return self.settings.uri.startswith('sqlite')
[docs] def setup_tables(self, tables=None):
if self._tables is not None:
for table in self._tables:
self.setup_table(table)
[docs] def setup_table(self, table):
if self._engine is None:
raise Exception("Engine not set when required: {}".format(self))
table.__table__.create(self._engine, checkfirst=True)
[docs] def drop_tables(self):
if self._tables is not None:
for table in self._tables:
self.drop_table(table)
[docs] def drop_table(self, table):
table.__table__.drop(self._engine, checkfirst=True)
[docs] def truncate_tables(self):
self.drop_tables()
[docs] def close_connection(self):
if self._session:
self._session.close()
self._session = None
if self._engine:
# Call dispose(), unless sqlite (to avoid error 'stored_events'
# table does not exist in projections.rst doc).
if not self.is_sqlite():
self._engine.dispose()
self._engine = None