Source code for eventsourcing.postgres

from __future__ import annotations

from contextlib import contextmanager
from itertools import chain
from threading import Lock
from types import TracebackType
from typing import (
    Any,
    Dict,
    Iterator,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    Type,
    Union,
)
from uuid import NAMESPACE_URL, UUID, uuid5

import psycopg2
import psycopg2.errors
import psycopg2.extras
from psycopg2.errorcodes import DUPLICATE_PREPARED_STATEMENT
from psycopg2.extensions import connection, cursor

from eventsourcing.persistence import (
    AggregateRecorder,
    ApplicationRecorder,
    Connection,
    ConnectionPool,
    Cursor,
    DatabaseError,
    DataError,
    InfrastructureFactory,
    IntegrityError,
    InterfaceError,
    InternalError,
    Notification,
    NotSupportedError,
    OperationalError,
    PersistenceError,
    ProcessRecorder,
    ProgrammingError,
    StoredEvent,
    Tracking,
)
from eventsourcing.utils import Environment, retry, strtobool

psycopg2.extras.register_uuid()


[docs]class PostgresCursor(Cursor):
[docs] def __init__(self, pg_cursor: cursor): self.pg_cursor = pg_cursor
def __enter__(self, *args: Any, **kwargs: Any) -> "PostgresCursor": self.pg_cursor.__enter__(*args, **kwargs) return self def __exit__(self, *args: Any, **kwargs: Any) -> None: return self.pg_cursor.__exit__(*args, **kwargs) def mogrify(self, statement: str, params: Any = None) -> bytes: return self.pg_cursor.mogrify(statement, vars=params)
[docs] def execute(self, statement: Union[str, bytes], params: Any = None) -> None: self.pg_cursor.execute(query=statement, vars=params)
[docs] def fetchall(self) -> Any: return self.pg_cursor.fetchall()
[docs] def fetchone(self) -> Any: return self.pg_cursor.fetchone()
@property def closed(self) -> bool: return self.pg_cursor.closed
[docs]class PostgresConnection(Connection[PostgresCursor]):
[docs] def __init__(self, pg_conn: connection, max_age: Optional[float]): super().__init__(max_age=max_age) self._pg_conn = pg_conn self.is_prepared: Set[str] = set()
@contextmanager def transaction(self, commit: bool) -> Iterator[PostgresCursor]: # Context managed transaction. with PostgresTransaction(self, commit) as curs: # Context managed cursor. with curs: yield curs
[docs] def cursor(self) -> PostgresCursor: return PostgresCursor( self._pg_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) )
[docs] def rollback(self) -> None: self._pg_conn.rollback()
[docs] def commit(self) -> None: self._pg_conn.commit()
def _close(self) -> None: self._pg_conn.close() super()._close() @property def closed(self) -> bool: return bool(self._pg_conn.closed)
[docs]class PostgresConnectionPool(ConnectionPool[PostgresConnection]):
[docs] def __init__( self, dbname: str, host: str, port: str, user: str, password: str, connect_timeout: int = 5, idle_in_transaction_session_timeout: int = 0, pool_size: int = 1, max_overflow: int = 0, pool_timeout: float = 5.0, max_age: Optional[float] = None, pre_ping: bool = False, ): self.dbname = dbname self.host = host self.port = port self.user = user self.password = password self.connect_timeout = connect_timeout self.idle_in_transaction_session_timeout = idle_in_transaction_session_timeout super().__init__( pool_size=pool_size, max_overflow=max_overflow, pool_timeout=pool_timeout, max_age=max_age, pre_ping=pre_ping, mutually_exclusive_read_write=False, )
def _create_connection(self) -> PostgresConnection: # Make a connection to a database. try: pg_conn = psycopg2.connect( dbname=self.dbname, host=self.host, port=self.port, user=self.user, password=self.password, connect_timeout=self.connect_timeout, ) except psycopg2.OperationalError as e: raise OperationalError(e) from e pg_conn.cursor().execute( f"SET idle_in_transaction_session_timeout = " f"'{self.idle_in_transaction_session_timeout}s'" ) return PostgresConnection(pg_conn, max_age=self.max_age)
class PostgresTransaction: def __init__(self, conn: PostgresConnection, commit: bool): self.conn = conn self.commit = commit self.has_entered = False def __enter__(self) -> PostgresCursor: self.has_entered = True return self.conn.cursor() def __exit__( self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: TracebackType, ) -> None: try: if exc_val: self.conn.rollback() raise exc_val elif not self.commit: self.conn.rollback() else: self.conn.commit() except psycopg2.InterfaceError as e: self.conn.close() raise InterfaceError(str(e)) from e except psycopg2.DataError as e: raise DataError(str(e)) from e except psycopg2.OperationalError as e: self.conn.close() raise OperationalError(str(e)) from e except psycopg2.IntegrityError as e: raise IntegrityError(str(e)) from e except psycopg2.InternalError as e: raise InternalError(str(e)) from e except psycopg2.ProgrammingError as e: raise ProgrammingError(str(e)) from e except psycopg2.NotSupportedError as e: raise NotSupportedError(str(e)) from e except psycopg2.DatabaseError as e: raise DatabaseError(str(e)) from e except psycopg2.Error as e: raise PersistenceError(str(e)) from e class PostgresDatastore: def __init__( self, dbname: str, host: str, port: str, user: str, password: str, connect_timeout: int = 5, idle_in_transaction_session_timeout: int = 0, pool_size: int = 2, max_overflow: int = 2, pool_timeout: float = 5.0, conn_max_age: Optional[float] = None, pre_ping: bool = False, lock_timeout: int = 0, schema: str = "", ): self.pool = PostgresConnectionPool( dbname=dbname, host=host, port=port, user=user, password=password, connect_timeout=connect_timeout, idle_in_transaction_session_timeout=idle_in_transaction_session_timeout, pool_size=pool_size, max_overflow=max_overflow, pool_timeout=pool_timeout, max_age=conn_max_age, pre_ping=pre_ping, ) self.lock_timeout = lock_timeout self.schema = schema.strip() @contextmanager def transaction(self, commit: bool) -> Iterator[PostgresCursor]: with self.get_connection() as conn: with conn.transaction(commit) as curs: yield curs @contextmanager def get_connection(self) -> Iterator[PostgresConnection]: conn = self.pool.get_connection() try: yield conn finally: self.pool.put_connection(conn) def report_on_prepared_statements( self, ) -> Tuple[List[List[Union[bool, str]]], List[str]]: with self.get_connection() as conn: with conn.cursor() as curs: curs.execute("SELECT * from pg_prepared_statements") return sorted(curs.fetchall()), sorted(conn.is_prepared) def close(self) -> None: self.pool.close() def __del__(self) -> None: self.close() PG_IDENTIFIER_MAX_LEN = 63
[docs]class PostgresAggregateRecorder(AggregateRecorder):
[docs] def __init__( self, datastore: PostgresDatastore, events_table_name: str, ): self.statement_name_aliases: Dict[str, str] = {} self.statement_name_aliases_lock = Lock() self.check_table_name_length(events_table_name, datastore.schema) self.datastore = datastore self.events_table_name = events_table_name # Index names can't be qualified names, but # are created in the same schema as the table. if "." in self.events_table_name: unqualified_table_name = self.events_table_name.split(".")[-1] else: unqualified_table_name = self.events_table_name self.notification_id_index_name = ( f"{unqualified_table_name}_notification_id_idx " ) self.create_table_statements = self.construct_create_table_statements() self.insert_events_statement = ( f"INSERT INTO {self.events_table_name} VALUES ($1, $2, $3, $4)" ) self.insert_events_statement_name = f"insert_{events_table_name}".replace( ".", "_" ) self.select_events_statement = ( f"SELECT * FROM {self.events_table_name} WHERE originator_id = $1" ) self.lock_statements: List[str] = []
@staticmethod def check_table_name_length(table_name: str, schema_name: str) -> None: schema_prefix = schema_name + "." if table_name.startswith(schema_prefix): unqualified_table_name = table_name[len(schema_prefix) :] else: unqualified_table_name = table_name if len(unqualified_table_name) > 63: raise ProgrammingError(f"Table name too long: {unqualified_table_name}") def get_statement_alias(self, statement_name: str) -> str: try: alias = self.statement_name_aliases[statement_name] except KeyError: with self.statement_name_aliases_lock: try: alias = self.statement_name_aliases[statement_name] except KeyError: existing_aliases = self.statement_name_aliases.values() if ( len(statement_name) <= PG_IDENTIFIER_MAX_LEN and statement_name not in existing_aliases ): alias = statement_name self.statement_name_aliases[statement_name] = alias else: uid = uuid5( NAMESPACE_URL, f"/statement_names/{statement_name}" ).hex alias = uid for i in range(len(uid)): # pragma: no cover preserve_end = 21 preserve_start = ( PG_IDENTIFIER_MAX_LEN - preserve_end - i - 2 ) uuid5_tail = i candidate = ( statement_name[:preserve_start] + "_" + (uid[-uuid5_tail:] if i else "") + "_" + statement_name[-preserve_end:] ) assert len(alias) <= PG_IDENTIFIER_MAX_LEN if candidate not in existing_aliases: alias = candidate break self.statement_name_aliases[statement_name] = alias return alias def construct_create_table_statements(self) -> List[str]: statement = ( "CREATE TABLE IF NOT EXISTS " f"{self.events_table_name} (" "originator_id uuid NOT NULL, " "originator_version bigint NOT NULL, " "topic text, " "state bytea, " "PRIMARY KEY " "(originator_id, originator_version)) " "WITH (autovacuum_enabled=false)" ) return [statement] def create_table(self) -> None: with self.datastore.transaction(commit=True) as curs: for statement in self.create_table_statements: curs.execute(statement) pass # for Coverage 5.5 bug with CPython 3.10.0rc1
[docs] @retry((InterfaceError, OperationalError), max_attempts=10, wait=0.2) def insert_events( self, stored_events: List[StoredEvent], **kwargs: Any ) -> Optional[Sequence[int]]: with self.datastore.get_connection() as conn: self._prepare_insert_events(conn) with conn.transaction(commit=True) as curs: return self._insert_events(curs, stored_events, **kwargs)
def _prepare_insert_events(self, conn: PostgresConnection) -> None: self._prepare( conn, self.insert_events_statement_name, self.insert_events_statement, ) def _prepare( self, conn: PostgresConnection, statement_name: str, statement: str ) -> str: statement_name_alias = self.get_statement_alias(statement_name) if statement_name not in conn.is_prepared: curs: PostgresCursor with conn.transaction(commit=True) as curs: try: lock_timeout = self.datastore.lock_timeout curs.execute(f"SET LOCAL lock_timeout = '{lock_timeout}s'") curs.execute(f"PREPARE {statement_name_alias} AS " + statement) except psycopg2.errors.lookup(DUPLICATE_PREPARED_STATEMENT): # noqa pass conn.is_prepared.add(statement_name) return statement_name_alias def _insert_events( self, c: PostgresCursor, stored_events: List[StoredEvent], **kwargs: Any, ) -> Optional[Sequence[int]]: # Acquire "EXCLUSIVE" table lock, to serialize inserts so that # insertion of notification IDs is monotonic for notification log # readers. We want concurrent transactions to commit inserted # notification_id values in order, and by locking the table for writes, # it can be guaranteed. The EXCLUSIVE lock mode does not block # the ACCESS SHARE lock which is acquired during SELECT statements, # so the table can be read concurrently. However, INSERT normally # just acquires ROW EXCLUSIVE locks, which risks interleaving of # many inserts in one transaction with many insert in another # transaction. Since one transaction will commit before another, # the possibility arises for readers that are tailing a notification # log to miss items inserted later but with lower notification IDs. # https://www.postgresql.org/docs/current/explicit-locking.html#LOCKING-TABLES # https://www.postgresql.org/docs/9.1/sql-lock.html # https://stackoverflow.com/questions/45866187/guarantee-monotonicity-of # -postgresql-serial-column-values-by-commit-order len_stored_events = len(stored_events) # Only do something if there is something to do. if len_stored_events > 0: # Mogrify the table lock statements. lock_sqls = (c.mogrify(s) for s in self.lock_statements) # Prepare the commands before getting the table lock. alias = self.statement_name_aliases[self.insert_events_statement_name] page_size = 500 pages = [ ( c.mogrify( f"EXECUTE {alias}(%s, %s, %s, %s)", ( stored_event.originator_id, stored_event.originator_version, stored_event.topic, stored_event.state, ), ) for stored_event in page ) for page in ( stored_events[ndx : min(ndx + page_size, len_stored_events)] for ndx in range(0, len_stored_events, page_size) ) ] commands = [ b"; ".join(page) for page in chain([chain(lock_sqls, pages[0])], pages[1:]) ] # Execute the commands. for command in commands: c.execute(command) return None
[docs] @retry((InterfaceError, OperationalError), max_attempts=10, wait=0.2) def select_events( self, originator_id: UUID, gt: Optional[int] = None, lte: Optional[int] = None, desc: bool = False, limit: Optional[int] = None, ) -> List[StoredEvent]: parts = [self.select_events_statement] params: List[Any] = [originator_id] statement_name = f"select_{self.events_table_name}".replace(".", "_") if gt is not None: params.append(gt) parts.append(f"AND originator_version > ${len(params)}") statement_name += "_gt" if lte is not None: params.append(lte) parts.append(f"AND originator_version <= ${len(params)}") statement_name += "_lte" parts.append("ORDER BY originator_version") if desc is False: parts.append("ASC") else: parts.append("DESC") statement_name += "_desc" if limit is not None: params.append(limit) parts.append(f"LIMIT ${len(params)}") statement_name += "_limit" statement = " ".join(parts) stored_events = [] with self.datastore.get_connection() as conn: alias = self._prepare(conn, statement_name, statement) with conn.transaction(commit=False) as curs: curs.execute( f"EXECUTE {alias}({', '.join(['%s' for _ in params])})", params, ) for row in curs.fetchall(): stored_events.append( StoredEvent( originator_id=row["originator_id"], originator_version=row["originator_version"], topic=row["topic"], state=bytes(row["state"]), ) ) pass # for Coverage 5.5 bug with CPython 3.10.0rc1 return stored_events
[docs]class PostgresApplicationRecorder(PostgresAggregateRecorder, ApplicationRecorder):
[docs] def __init__( self, datastore: PostgresDatastore, events_table_name: str = "stored_events", ): super().__init__(datastore, events_table_name) self.insert_events_statement = ( f"INSERT INTO {self.events_table_name} VALUES ($1, $2, $3, $4) " f"RETURNING notification_id" ) self.max_notification_id_statement = ( f"SELECT MAX(notification_id) FROM {self.events_table_name}" ) self.max_notification_id_statement_name = ( f"max_notification_id_{events_table_name}".replace(".", "_") ) self.lock_statements = [ f"SET LOCAL lock_timeout = '{self.datastore.lock_timeout}s'", f"LOCK TABLE {self.events_table_name} IN EXCLUSIVE MODE", ]
def construct_create_table_statements(self) -> List[str]: statements = [ "CREATE TABLE IF NOT EXISTS " f"{self.events_table_name} (" "originator_id uuid NOT NULL, " "originator_version bigint NOT NULL, " "topic text, " "state bytea, " "notification_id bigserial, " "PRIMARY KEY " "(originator_id, originator_version)) " "WITH (autovacuum_enabled=false)", f"CREATE UNIQUE INDEX IF NOT EXISTS " f"{self.notification_id_index_name}" f"ON {self.events_table_name} (notification_id ASC);", ] return statements
[docs] @retry((InterfaceError, OperationalError), max_attempts=10, wait=0.2) def select_notifications( self, start: int, limit: int, stop: Optional[int] = None, topics: Sequence[str] = (), ) -> List[Notification]: """ Returns a list of event notifications from 'start', limited by 'limit'. """ params: List[Union[int, str, Sequence[str]]] = [start] statement = ( "SELECT * " f"FROM {self.events_table_name} " "WHERE notification_id>=$1 " ) statement_name = f"select_notifications_{self.events_table_name}".replace( ".", "_" ) if stop is not None: params.append(stop) statement += f"AND notification_id <= ${len(params)} " statement_name += "_stop" if topics: params.append(topics) statement += f"AND topic = ANY(${len(params)}) " statement_name += "_topics" params.append(limit) statement += "ORDER BY notification_id " f"LIMIT ${len(params)}" notifications = [] with self.datastore.get_connection() as conn: alias = self._prepare( conn, statement_name, statement, ) with conn.transaction(commit=False) as curs: curs.execute( f"EXECUTE {alias}({', '.join(['%s' for _ in params])})", params, ) for row in curs.fetchall(): notifications.append( Notification( id=row["notification_id"], originator_id=row["originator_id"], originator_version=row["originator_version"], topic=row["topic"], state=bytes(row["state"]), ) ) pass # for Coverage 5.5 bug with CPython 3.10.0rc1 return notifications
[docs] @retry((InterfaceError, OperationalError), max_attempts=10, wait=0.2) def max_notification_id(self) -> int: """ Returns the maximum notification ID. """ statement_name = self.max_notification_id_statement_name with self.datastore.get_connection() as conn: statement_alias = self._prepare( conn, statement_name, self.max_notification_id_statement ) with conn.transaction(commit=False) as curs: curs.execute( f"EXECUTE {statement_alias}", ) max_id = curs.fetchone()[0] or 0 return max_id
def _insert_events( self, c: PostgresCursor, stored_events: List[StoredEvent], **kwargs: Any, ) -> Optional[Sequence[int]]: super()._insert_events(c, stored_events, **kwargs) if stored_events: last_notification_id = c.fetchone()[0] notification_ids = list( range( last_notification_id - len(stored_events) + 1, last_notification_id + 1, ) ) else: notification_ids = [] return notification_ids
[docs]class PostgresProcessRecorder(PostgresApplicationRecorder, ProcessRecorder):
[docs] def __init__( self, datastore: PostgresDatastore, events_table_name: str, tracking_table_name: str, ): self.check_table_name_length(tracking_table_name, datastore.schema) self.tracking_table_name = tracking_table_name super().__init__(datastore, events_table_name) self.insert_tracking_statement = ( f"INSERT INTO {self.tracking_table_name} VALUES ($1, $2)" ) self.insert_tracking_statement_name = f"insert_{tracking_table_name}".replace( ".", "_" ) self.max_tracking_id_statement = ( "SELECT MAX(notification_id) " f"FROM {self.tracking_table_name} " "WHERE application_name=$1" ) self.count_tracking_id_statement = ( "SELECT COUNT(*) " f"FROM {self.tracking_table_name} " "WHERE application_name=$1 AND notification_id=$2" ) self.max_tracking_id_statement_name = ( f"max_tracking_id_{tracking_table_name}".replace(".", "_") ) self.count_tracking_id_statement_name = ( f"count_tracking_id_{tracking_table_name}".replace(".", "_") )
def construct_create_table_statements(self) -> List[str]: statements = super().construct_create_table_statements() statements.append( "CREATE TABLE IF NOT EXISTS " f"{self.tracking_table_name} (" "application_name text, " "notification_id bigint, " "PRIMARY KEY " "(application_name, notification_id))" ) return statements
[docs] @retry((InterfaceError, OperationalError), max_attempts=10, wait=0.2) def max_tracking_id(self, application_name: str) -> int: statement_name = self.max_tracking_id_statement_name with self.datastore.get_connection() as conn: statement_alias = self._prepare( conn, statement_name, self.max_tracking_id_statement ) with conn.transaction(commit=False) as curs: curs.execute( f"EXECUTE {statement_alias}(%s)", (application_name,), ) max_id = curs.fetchone()[0] or 0 return max_id
[docs] @retry((InterfaceError, OperationalError), max_attempts=10, wait=0.2) def has_tracking_id(self, application_name: str, notification_id: int) -> bool: statement_name = self.count_tracking_id_statement_name with self.datastore.get_connection() as conn: statement_alias = self._prepare( conn, statement_name, self.count_tracking_id_statement ) with conn.transaction(commit=False) as curs: curs.execute( f"EXECUTE {statement_alias}(%s, %s)", (application_name, notification_id), ) return bool(curs.fetchone()[0])
def _prepare_insert_events(self, conn: PostgresConnection) -> None: super()._prepare_insert_events(conn) self._prepare( conn, self.insert_tracking_statement_name, self.insert_tracking_statement ) def _insert_events( self, c: PostgresCursor, stored_events: List[StoredEvent], **kwargs: Any, ) -> Optional[Sequence[int]]: notification_ids = super()._insert_events(c, stored_events, **kwargs) tracking: Optional[Tracking] = kwargs.get("tracking", None) if tracking is not None: statement_alias = self.statement_name_aliases[ self.insert_tracking_statement_name ] c.execute( f"EXECUTE {statement_alias}(%s, %s)", ( tracking.application_name, tracking.notification_id, ), ) return notification_ids
[docs]class Factory(InfrastructureFactory): POSTGRES_DBNAME = "POSTGRES_DBNAME" POSTGRES_HOST = "POSTGRES_HOST" POSTGRES_PORT = "POSTGRES_PORT" POSTGRES_USER = "POSTGRES_USER" POSTGRES_PASSWORD = "POSTGRES_PASSWORD" POSTGRES_CONNECT_TIMEOUT = "POSTGRES_CONNECT_TIMEOUT" POSTGRES_CONN_MAX_AGE = "POSTGRES_CONN_MAX_AGE" POSTGRES_PRE_PING = "POSTGRES_PRE_PING" POSTGRES_POOL_TIMEOUT = "POSTGRES_POOL_TIMEOUT" POSTGRES_LOCK_TIMEOUT = "POSTGRES_LOCK_TIMEOUT" POSTGRES_POOL_SIZE = "POSTGRES_POOL_SIZE" POSTGRES_POOL_MAX_OVERFLOW = "POSTGRES_POOL_MAX_OVERFLOW" POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT = ( "POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT" ) POSTGRES_SCHEMA = "POSTGRES_SCHEMA" CREATE_TABLE = "CREATE_TABLE" aggregate_recorder_class = PostgresAggregateRecorder application_recorder_class = PostgresApplicationRecorder process_recorder_class = PostgresProcessRecorder
[docs] def __init__(self, env: Environment): super().__init__(env) dbname = self.env.get(self.POSTGRES_DBNAME) if dbname is None: raise EnvironmentError( "Postgres database name not found " "in environment with key " f"'{self.POSTGRES_DBNAME}'" ) host = self.env.get(self.POSTGRES_HOST) if host is None: raise EnvironmentError( "Postgres host not found " "in environment with key " f"'{self.POSTGRES_HOST}'" ) port = self.env.get(self.POSTGRES_PORT) or "5432" user = self.env.get(self.POSTGRES_USER) if user is None: raise EnvironmentError( "Postgres user not found " "in environment with key " f"'{self.POSTGRES_USER}'" ) password = self.env.get(self.POSTGRES_PASSWORD) if password is None: raise EnvironmentError( "Postgres password not found " "in environment with key " f"'{self.POSTGRES_PASSWORD}'" ) connect_timeout: Optional[int] connect_timeout_str = self.env.get(self.POSTGRES_CONNECT_TIMEOUT) if connect_timeout_str is None: connect_timeout = 5 elif connect_timeout_str == "": connect_timeout = 5 else: try: connect_timeout = int(connect_timeout_str) except ValueError: raise EnvironmentError( f"Postgres environment value for key " f"'{self.POSTGRES_CONNECT_TIMEOUT}' is invalid. " f"If set, an integer or empty string is expected: " f"'{connect_timeout_str}'" ) idle_in_transaction_session_timeout_str = ( self.env.get(self.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT) or "5" ) try: idle_in_transaction_session_timeout = int( idle_in_transaction_session_timeout_str ) except ValueError: raise EnvironmentError( f"Postgres environment value for key " f"'{self.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT}' is invalid. " f"If set, an integer or empty string is expected: " f"'{idle_in_transaction_session_timeout_str}'" ) pool_size: Optional[int] pool_size_str = self.env.get(self.POSTGRES_POOL_SIZE) if pool_size_str is None: pool_size = 5 elif pool_size_str == "": pool_size = 5 else: try: pool_size = int(pool_size_str) except ValueError: raise EnvironmentError( f"Postgres environment value for key " f"'{self.POSTGRES_POOL_SIZE}' is invalid. " f"If set, an integer or empty string is expected: " f"'{pool_size_str}'" ) pool_max_overflow: Optional[int] pool_max_overflow_str = self.env.get(self.POSTGRES_POOL_MAX_OVERFLOW) if pool_max_overflow_str is None: pool_max_overflow = 10 elif pool_max_overflow_str == "": pool_max_overflow = 10 else: try: pool_max_overflow = int(pool_max_overflow_str) except ValueError: raise EnvironmentError( f"Postgres environment value for key " f"'{self.POSTGRES_POOL_MAX_OVERFLOW}' is invalid. " f"If set, an integer or empty string is expected: " f"'{pool_max_overflow_str}'" ) pool_timeout: Optional[float] pool_timeout_str = self.env.get(self.POSTGRES_POOL_TIMEOUT) if pool_timeout_str is None: pool_timeout = 30 elif pool_timeout_str == "": pool_timeout = 30 else: try: pool_timeout = float(pool_timeout_str) except ValueError: raise EnvironmentError( f"Postgres environment value for key " f"'{self.POSTGRES_POOL_TIMEOUT}' is invalid. " f"If set, a float or empty string is expected: " f"'{pool_timeout_str}'" ) conn_max_age: Optional[float] conn_max_age_str = self.env.get(self.POSTGRES_CONN_MAX_AGE) if conn_max_age_str is None: conn_max_age = None elif conn_max_age_str == "": conn_max_age = None else: try: conn_max_age = float(conn_max_age_str) except ValueError: raise EnvironmentError( f"Postgres environment value for key " f"'{self.POSTGRES_CONN_MAX_AGE}' is invalid. " f"If set, a float or empty string is expected: " f"'{conn_max_age_str}'" ) pre_ping = strtobool(self.env.get(self.POSTGRES_PRE_PING) or "no") lock_timeout_str = self.env.get(self.POSTGRES_LOCK_TIMEOUT) or "0" try: lock_timeout = int(lock_timeout_str) except ValueError: raise EnvironmentError( f"Postgres environment value for key " f"'{self.POSTGRES_LOCK_TIMEOUT}' is invalid. " f"If set, an integer or empty string is expected: " f"'{lock_timeout_str}'" ) schema = self.env.get(self.POSTGRES_SCHEMA) or "" self.datastore = PostgresDatastore( dbname=dbname, host=host, port=port, user=user, password=password, connect_timeout=connect_timeout, idle_in_transaction_session_timeout=idle_in_transaction_session_timeout, pool_size=pool_size, max_overflow=pool_max_overflow, pool_timeout=pool_timeout, conn_max_age=conn_max_age, pre_ping=pre_ping, lock_timeout=lock_timeout, schema=schema, )
[docs] def aggregate_recorder(self, purpose: str = "events") -> AggregateRecorder: prefix = self.env.name.lower() or "stored" events_table_name = prefix + "_" + purpose if self.datastore.schema: events_table_name = f"{self.datastore.schema}.{events_table_name}" recorder = type(self).aggregate_recorder_class( datastore=self.datastore, events_table_name=events_table_name, ) if self.env_create_table(): recorder.create_table() return recorder
[docs] def application_recorder(self) -> ApplicationRecorder: prefix = self.env.name.lower() or "stored" events_table_name = prefix + "_events" if self.datastore.schema: events_table_name = f"{self.datastore.schema}.{events_table_name}" recorder = type(self).application_recorder_class( datastore=self.datastore, events_table_name=events_table_name, ) if self.env_create_table(): recorder.create_table() return recorder
[docs] def process_recorder(self) -> ProcessRecorder: prefix = self.env.name.lower() or "stored" events_table_name = prefix + "_events" prefix = self.env.name.lower() or "notification" tracking_table_name = prefix + "_tracking" if self.datastore.schema: events_table_name = f"{self.datastore.schema}.{events_table_name}" tracking_table_name = f"{self.datastore.schema}.{tracking_table_name}" recorder = type(self).process_recorder_class( datastore=self.datastore, events_table_name=events_table_name, tracking_table_name=tracking_table_name, ) if self.env_create_table(): recorder.create_table() return recorder
def env_create_table(self) -> bool: return strtobool(self.env.get(self.CREATE_TABLE) or "yes")
[docs] def close(self) -> None: self.datastore.close()