from abc import ABC, abstractmethod
from typing import Any, Generic, Optional, TypeVar
[docs]class DatastoreSettings(object):
"""
Settings for Datastore.
"""
TDatastoreSettings = TypeVar("TDatastoreSettings", bound=DatastoreSettings)
[docs]class AbstractDatastore(ABC, Generic[TDatastoreSettings]):
can_drop_tables = True
"""
Datastores hold stored event records, used by a record manager.
"""
[docs] def __init__(self, settings: TDatastoreSettings):
self.settings: TDatastoreSettings = settings
@property
def session(self) -> Optional[Any]:
return None
[docs] @abstractmethod
def setup_connection(self) -> None:
"""Sets up a connection to a datastore."""
[docs] @abstractmethod
def close_connection(self) -> None:
"""Drops connection to a datastore."""
[docs] @abstractmethod
def setup_tables(self) -> None:
"""Sets up tables used to store events."""
[docs] @abstractmethod
def setup_table(self, table: Any) -> None:
"""Sets up given table."""
[docs] @abstractmethod
def drop_tables(self) -> None:
"""Drops tables used to store events."""
[docs] @abstractmethod
def drop_table(self, table: Any) -> None:
"""Drops given table."""
[docs] @abstractmethod
def truncate_tables(self) -> None:
"""Truncates tables used to store events."""
[docs]class DatastoreError(Exception):
pass
[docs]class DatastoreConnectionError(DatastoreError):
pass
[docs]class DatastoreTableError(DatastoreError):
pass