Source code for eventsourcing.infrastructure.datastore

from abc import ABCMeta, abstractmethod

import six


[docs]class DatastoreSettings(object):
"""Base class for settings for database connection used by a stored event repository."""
[docs]class Datastore(six.with_metaclass(ABCMeta)): def __init__(self, settings): # assert isinstance(settings, DatastoreSettings), settings self.settings = settings
[docs] @abstractmethod def setup_connection(self):
"""Sets up a connection to a datastore."""
[docs] @abstractmethod def close_connection(self):
"""Drops connection to a datastore."""
[docs] @abstractmethod def setup_tables(self):
"""Sets up tables used to store events."""
[docs] @abstractmethod def drop_tables(self):
"""Drops tables used to store events."""
[docs] @abstractmethod def truncate_tables(self):
"""Truncates tables used to store events."""
[docs]class DatastoreError(Exception):
pass
[docs]class DatastoreConnectionError(DatastoreError):
pass
[docs]class DatastoreTableError(DatastoreError):
pass