Source code for eventsourcing.infrastructure.datastore

from abc import ABC, abstractmethod
from typing import Any, Generic, Optional, TypeVar


[docs]class DatastoreSettings(object): """ Settings for Datastore. """
TDatastoreSettings = TypeVar("TDatastoreSettings", bound=DatastoreSettings)
[docs]class AbstractDatastore(ABC, Generic[TDatastoreSettings]): can_drop_tables = True """ Datastores hold stored event records, used by a record manager. """
[docs] def __init__(self, settings: TDatastoreSettings): self.settings: TDatastoreSettings = settings
@property def session(self) -> Optional[Any]: return None
[docs] @abstractmethod def setup_connection(self) -> None: """Sets up a connection to a datastore."""
[docs] @abstractmethod def close_connection(self) -> None: """Drops connection to a datastore."""
[docs] @abstractmethod def setup_tables(self) -> None: """Sets up tables used to store events."""
[docs] @abstractmethod def setup_table(self, table: Any) -> None: """Sets up given table."""
[docs] @abstractmethod def drop_tables(self) -> None: """Drops tables used to store events."""
[docs] @abstractmethod def drop_table(self, table: Any) -> None: """Drops given table."""
[docs] @abstractmethod def truncate_tables(self) -> None: """Truncates tables used to store events."""
[docs]class DatastoreError(Exception): pass
[docs]class DatastoreConnectionError(DatastoreError): pass
[docs]class DatastoreTableError(DatastoreError): pass