Source code for eventsourcing.infrastructure.datastore

from abc import ABC, abstractmethod


[docs]class DatastoreSettings(object): """Base class for settings for database connection used by a stored event repository."""
[docs]class Datastore(ABC): def __init__(self, settings): # assert isinstance(settings, DatastoreSettings), settings self.settings = settings
[docs] @abstractmethod def setup_connection(self): """Sets up a connection to a datastore."""
[docs] @abstractmethod def close_connection(self): """Drops connection to a datastore."""
[docs] @abstractmethod def setup_tables(self): """Sets up tables used to store events."""
[docs] @abstractmethod def drop_tables(self): """Drops tables used to store events."""
[docs] @abstractmethod def truncate_tables(self): """Truncates tables used to store events."""
[docs]class DatastoreError(Exception): pass
[docs]class DatastoreConnectionError(DatastoreError): pass
[docs]class DatastoreTableError(DatastoreError): pass