Source code for eventsourcing.dcb.postgres_tt

from __future__ import annotations

import threading
from typing import TYPE_CHECKING, Any, NamedTuple

from psycopg.generators import notifies
from psycopg.sql import SQL, Composed, Identifier

from eventsourcing.dcb.api import (
    DCBAppendCondition,
    DCBEvent,
    DCBQuery,
    DCBQueryItem,
    DCBReadResponse,
    DCBRecorder,
    DCBSequencedEvent,
)
from eventsourcing.dcb.persistence import (
    DCBInfrastructureFactory,
    DCBListenNotifySubscription,
)
from eventsourcing.dcb.popo import SimpleDCBReadResponse
from eventsourcing.persistence import IntegrityError, InternalError, ProgrammingError
from eventsourcing.postgres import (
    NO_TRACEBACK,
    BasePostgresFactory,
    PostgresDatastore,
    PostgresRecorder,
    PostgresTrackingRecorder,
)

if TYPE_CHECKING:
    from collections.abc import Sequence

    from psycopg import Connection, Cursor
    from psycopg.abc import Params
    from psycopg.rows import DictRow

DB_TYPE_NAME_DCB_EVENT_TT = "dcb_event_tt"

DB_TYPE_DCB_EVENT = SQL("""
CREATE TYPE {schema}.{event_type} AS (
    type text,
    data bytea,
    tags text[]
)
""")

DB_TYPE_NAME_DCB_QUERY_ITEM_TT = "dcb_query_item_tt"

DB_TYPE_DCB_QUERY_ITEM = SQL("""
CREATE TYPE {schema}.{query_item_type} AS (
    types text[],
    tags text[]
)
""")

DB_TABLE_DCB_EVENTS = SQL("""
CREATE TABLE IF NOT EXISTS {schema}.{events_table} (
    id bigserial,
    type text NOT NULL ,
    data bytea,
    tags text[] NOT NULL
) WITH (
  autovacuum_enabled = true,
  autovacuum_vacuum_threshold = 100000000,  -- Effectively disables VACUUM
  autovacuum_vacuum_scale_factor = 0.5,     -- Same here, high scale factor
  autovacuum_analyze_threshold = 1000,      -- Triggers ANALYZE more often
  autovacuum_analyze_scale_factor = 0.01    -- Triggers after 1% new rows
)
""")

DB_INDEX_UNIQUE_ID_COVER_TYPE = SQL("""
CREATE UNIQUE INDEX IF NOT EXISTS {id_cover_type_index} ON
{schema}.{events_table} (id) INCLUDE (type)
""")

DB_TABLE_DCB_TAGS = SQL("""
CREATE TABLE IF NOT EXISTS {schema}.{tags_table} (
    tag text,
    main_id bigint REFERENCES {events_table} (id)
) WITH (
    autovacuum_enabled = true,
    autovacuum_vacuum_threshold = 100000000,  -- Effectively disables VACUUM
    autovacuum_vacuum_scale_factor = 0.5,     -- Same here, high scale factor
    autovacuum_analyze_threshold = 1000,      -- Triggers ANALYZE more often
    autovacuum_analyze_scale_factor = 0.01    -- Triggers after 1% new rows
)
""")

DB_INDEX_TAG_MAIN_ID = SQL("""
CREATE INDEX IF NOT EXISTS {tag_main_id_index} ON
{schema}.{tags_table} (tag, main_id)
""")

SQL_SELECT_ALL = SQL("""
SELECT * FROM {schema}.{events_table}
WHERE id > COALESCE(%(after)s, 0)
ORDER BY id ASC
LIMIT COALESCE(%(limit)s, 9223372036854775807)
""")

SQL_SELECT_EVENTS_BY_TYPE = SQL("""
SELECT * FROM {schema}.{events_table}
WHERE type = %(event_type)s
AND id > COALESCE(%(after)s, 0)
ORDER BY id ASC
LIMIT COALESCE(%(limit)s, 9223372036854775807)
""")

SQL_SELECT_MAX_ID = SQL("""
SELECT MAX(id) FROM {schema}.{events_table}
""")

SQL_SELECT_BY_TAGS = SQL("""
WITH query_items AS (
    SELECT * FROM unnest(
        %(query_items)s::{schema}.{query_item_type}[]
    ) WITH ORDINALITY
),
initial_matches AS (
    SELECT
        t.main_id,
        qi.ordinality,
        t.tag,
        qi.tags AS required_tags,
        qi.types AS allowed_types
    FROM query_items qi
    JOIN {schema}.{tags_table} t
      ON t.tag = ANY(qi.tags)
   WHERE t.main_id > COALESCE(%(after)s, 0)
),
matched_groups AS (
    SELECT
        main_id,
        ordinality,
        COUNT(DISTINCT tag) AS matched_tag_count,
        array_length(required_tags, 1) AS required_tag_count,
        allowed_types
    FROM initial_matches
    GROUP BY main_id, ordinality, required_tag_count, allowed_types
),
qualified_ids AS (
    SELECT main_id, allowed_types
    FROM matched_groups
    WHERE matched_tag_count = required_tag_count
),
filtered_ids AS (
    SELECT m.id
    FROM {schema}.{events_table} m
    JOIN qualified_ids q ON q.main_id = m.id
    WHERE
        m.id > COALESCE(%(after)s, 0)
        AND (
            array_length(q.allowed_types, 1) IS NULL
            OR array_length(q.allowed_types, 1) = 0
            OR m.type = ANY(q.allowed_types)
        )
    ORDER BY m.id ASC
    LIMIT COALESCE(%(limit)s, 9223372036854775807)
)
SELECT *
FROM {schema}.{events_table}  m
WHERE m.id IN (SELECT id FROM filtered_ids)
ORDER BY m.id ASC;
""")


SQL_UNCONDITIONAL_APPEND = SQL("""
SELECT * FROM {schema}.{unconditional_append}(%(events)s)
""")
DB_FUNCTION_NAME_DCB_UNCONDITIONAL_APPEND_TT = "dcb_unconditional_append_tt"
DB_FUNCTION_UNCONDITIONAL_APPEND = SQL("""
CREATE OR REPLACE FUNCTION {schema}.{unconditional_append}(
    new_events {schema}.{event_type}[]
) RETURNS SETOF bigint
LANGUAGE plpgsql AS $$
BEGIN
    RETURN QUERY
    WITH new_data AS (
        SELECT * FROM unnest(new_events)
    ),
    inserted AS (
        INSERT INTO {schema}.{events_table} (type, data, tags)
        SELECT type, data, tags
        FROM new_data
        RETURNING id, tags
    ),
    expanded_tags AS (
        SELECT ins.id AS main_id, tag
        FROM inserted ins,
             unnest(ins.tags) AS tag
    ),
    tag_insert AS (
        INSERT INTO {schema}.{tags_table} (tag, main_id)
        SELECT tag, main_id
        FROM expanded_tags
    )
    SELECT MAX(id) FROM inserted;
    NOTIFY {channel};

END
$$;
""")

SQL_CONDITIONAL_APPEND = SQL("""
SELECT * FROM {schema}.{conditional_append}(%(query_items)s, %(after)s, %(events)s)
""")
DB_FUNCTION_NAME_DCB_CONDITIONAL_APPEND_TT = "dcb_conditional_append_tt"
DB_FUNCTION_CONDITIONAL_APPEND = SQL("""
CREATE OR REPLACE FUNCTION {schema}.{conditional_append}(
    query_items {schema}.{query_item_type}[],
    after_id bigint,
    new_events {schema}.{event_type}[]
) RETURNS SETOF bigint
LANGUAGE plpgsql AS $$
DECLARE
    conflict_exists boolean;
BEGIN
    -- Step 0: Lock table in exclusive mode (reads can still read)
    SET LOCAL lock_timeout = '{lock_timeout}s';
    LOCK TABLE {schema}.{events_table} IN EXCLUSIVE MODE;

    -- Step 1: Check for conflicts
    WITH query_items_cte AS (
        SELECT * FROM unnest(query_items) WITH ORDINALITY
    ),
    initial_matches AS (
        SELECT
            t.main_id,
            qi.ordinality,
            t.tag,
            qi.tags AS required_tags,
            qi.types AS allowed_types
        FROM query_items_cte qi
        JOIN {schema}.{tags_table} t
          ON t.tag = ANY(qi.tags)
        WHERE t.main_id > COALESCE(after_id, 0)
    ),
    matched_groups AS (
        SELECT
            main_id,
            ordinality,
            COUNT(DISTINCT tag) AS matched_tag_count,
            array_length(required_tags, 1) AS required_tag_count,
            allowed_types
        FROM initial_matches
        GROUP BY main_id, ordinality, required_tag_count, allowed_types
    ),
    qualified_ids AS (
        SELECT main_id, allowed_types
        FROM matched_groups
        WHERE matched_tag_count = required_tag_count
    ),
    filtered_ids AS (
        SELECT m.id
        FROM {schema}.{events_table} m
        JOIN qualified_ids q ON q.main_id = m.id
        WHERE
            m.id > COALESCE(after_id, 0)
            AND (
                array_length(q.allowed_types, 1) IS NULL
                OR array_length(q.allowed_types, 1) = 0
                OR m.type = ANY(q.allowed_types)
            )
        LIMIT 1
    )
    SELECT EXISTS (SELECT 1 FROM filtered_ids)
    INTO conflict_exists;

    -- Step 2: Insert if no conflicts
    IF NOT conflict_exists THEN
        RETURN QUERY
        WITH new_data AS (
            SELECT * FROM unnest(new_events)
        ),
        inserted AS (
            INSERT INTO {schema}.{events_table} (type, data, tags)
            SELECT type, data, tags
            FROM new_data
            RETURNING id, tags
        ),
        expanded_tags AS (
            SELECT ins.id AS main_id, tag
            FROM inserted ins,
                 unnest(ins.tags) AS tag
        ),
        tag_insert AS (
            INSERT INTO {schema}.{tags_table} (tag, main_id)
            SELECT tag, main_id
            FROM expanded_tags
        )
        SELECT MAX(id) FROM inserted;
        NOTIFY {channel};

    END IF;

    -- If conflict exists, return empty result
    RETURN;
END
$$;
""")


SQL_SET_LOCAL_LOCK_TIMEOUT = SQL("SET LOCAL lock_timeout = '{lock_timeout}s'")
SQL_LOCK_TABLE = SQL("LOCK TABLE {schema}.{events_table} IN EXCLUSIVE MODE")

SQL_EXPLAIN = SQL("EXPLAIN")
SQL_EXPLAIN_ANALYZE = SQL("EXPLAIN (ANALYZE, BUFFERS, VERBOSE)")


[docs] class PostgresDCBRecorderTT(DCBRecorder, PostgresRecorder):
[docs] def __init__( self, datastore: PostgresDatastore, *, events_table_name: str = "dcb_events", ): super().__init__(datastore) # Define identifiers. self.events_table_name = events_table_name + "_tt_main" self.channel_name = self.events_table_name.replace(".", "_") self.tags_table_name = events_table_name + "_tt_tag" self.index_name_id_cover_type = self.events_table_name + "_idx_id_type" self.index_name_tag_main_id = self.tags_table_name + "_idx_tag_main_id" # Check identifier lengths. self.check_identifier_length(self.events_table_name) self.check_identifier_length(self.tags_table_name) self.check_identifier_length(self.index_name_id_cover_type) self.check_identifier_length(self.index_name_tag_main_id) self.check_identifier_length(DB_TYPE_NAME_DCB_EVENT_TT) self.check_identifier_length(DB_TYPE_NAME_DCB_QUERY_ITEM_TT) # Register composite database types. self.datastore.db_type_names.add(DB_TYPE_NAME_DCB_EVENT_TT) self.datastore.db_type_names.add(DB_TYPE_NAME_DCB_QUERY_ITEM_TT) self.datastore.register_type_adapters() # Define SQL template keyword arguments. self.sql_kwargs = { "schema": Identifier(self.datastore.schema), "events_table": Identifier(self.events_table_name), "channel": Identifier(self.channel_name), "tags_table": Identifier(self.tags_table_name), "event_type": Identifier(DB_TYPE_NAME_DCB_EVENT_TT), "query_item_type": Identifier(DB_TYPE_NAME_DCB_QUERY_ITEM_TT), "id_cover_type_index": Identifier(self.index_name_id_cover_type), "tag_main_id_index": Identifier(self.index_name_tag_main_id), "unconditional_append": Identifier( DB_FUNCTION_NAME_DCB_UNCONDITIONAL_APPEND_TT ), "conditional_append": Identifier( DB_FUNCTION_NAME_DCB_CONDITIONAL_APPEND_TT ), "lock_timeout": self.datastore.lock_timeout, } # Format and extend SQL create statements. self.sql_create_statements.extend( [ self.format(DB_TYPE_DCB_EVENT), self.format(DB_TYPE_DCB_QUERY_ITEM), self.format(DB_TABLE_DCB_EVENTS), self.format(DB_INDEX_UNIQUE_ID_COVER_TYPE), self.format(DB_TABLE_DCB_TAGS), self.format(DB_INDEX_TAG_MAIN_ID), self.format(DB_FUNCTION_UNCONDITIONAL_APPEND), self.format(DB_FUNCTION_CONDITIONAL_APPEND), ] ) # Format other SQL statements. self.sql_select_by_tags = self.format(SQL_SELECT_BY_TAGS) self.sql_select_all = self.format(SQL_SELECT_ALL) self.sql_select_by_type = self.format(SQL_SELECT_EVENTS_BY_TYPE) self.sql_select_max_id = self.format(SQL_SELECT_MAX_ID) self.sql_unconditional_append = self.format(SQL_UNCONDITIONAL_APPEND) self.sql_conditional_append = self.format(SQL_CONDITIONAL_APPEND) self.sql_set_local_lock_timeout = self.format(SQL_SET_LOCAL_LOCK_TIMEOUT) self.sql_lock_table = self.format(SQL_LOCK_TABLE)
[docs] def format(self, sql: SQL) -> Composed: return sql.format(**self.sql_kwargs)
[docs] def read( self, query: DCBQuery | None = None, *, after: int | None = None, limit: int | None = None, ) -> DCBReadResponse: with self.datastore.cursor() as curs: events, head = self._read( curs=curs, query=query, after=after, limit=limit, return_head=True, ) # TODO: Actually return an iterator from _read()! return SimpleDCBReadResponse(events=iter(events), head=head)
def _read( self, curs: Cursor[DictRow], query: DCBQuery | None = None, *, after: int | None = None, limit: int | None = None, return_head: bool = True, ) -> tuple[Sequence[DCBSequencedEvent], int | None]: if return_head and limit is None: self.execute(curs, self.sql_select_max_id, explain=False) row = curs.fetchone() head = None if row is None else row["max"] else: head = None if not query or not query.items: # Select all. self.execute( curs, self.sql_select_all, { "after": after, "limit": limit, }, explain=False, ) rows = curs.fetchall() elif self.all_query_items_have_tags(query): # Select with tags. psycopg_dcb_query_items = self.construct_psycopg_query_items(query.items) self.execute( curs, self.sql_select_by_tags, { "query_items": psycopg_dcb_query_items, "after": after, "limit": limit, }, explain=False, ) rows = curs.fetchall() elif self.has_one_query_item_one_type(query): # Select for one type. self.execute( curs, self.sql_select_by_type, { "event_type": query.items[0].types[0], "after": after, "limit": limit, }, explain=False, ) rows = curs.fetchall() else: msg = f"Unsupported query: {query}" raise ProgrammingError(msg) events = [ DCBSequencedEvent( event=DCBEvent( type=row["type"], data=row["data"], tags=row["tags"], ), position=row["id"], ) for row in rows ] # Maybe update head. if return_head and events: head = max(head or 0, *[e.position for e in events]) return events, head
[docs] def subscribe( self, query: DCBQuery | None = None, *, after: int | None = None, ) -> PostgresDCBSubscription: return PostgresDCBSubscription( recorder=self, query=query, after=after, )
[docs] def append( self, events: Sequence[DCBEvent], condition: DCBAppendCondition | None = None ) -> int: assert len(events) > 0 psycopg_dcb_events = self.construct_psycopg_dcb_events(events) # Do single-statement "unconditional append". if condition is None: with self.datastore.cursor() as curs: return self._unconditional_append(curs, psycopg_dcb_events) if self.all_query_items_have_tags(condition.fail_if_events_match): # Do single-statement "conditional append". psycopg_dcb_query_items = self.construct_psycopg_query_items( condition.fail_if_events_match.items ) with self.datastore.cursor() as curs: self.execute( curs, self.sql_conditional_append, { "query_items": psycopg_dcb_query_items, "after": condition.after, "events": psycopg_dcb_events, }, explain=False, ) row = curs.fetchone() if row is None: raise IntegrityError return row[DB_FUNCTION_NAME_DCB_CONDITIONAL_APPEND_TT] # Do separate "read" and "append" operations in a transaction. with self.datastore.transaction(commit=True) as curs: # Lock the table in exclusive mode (readers can still read) to ensure # nothing else will execute an append condition statement until after # we have finished inserting new events, whilst expecting that others # are playing by the same game. By the way, this is how optimistic # locking works. if self.datastore.lock_timeout: curs.execute(self.sql_set_local_lock_timeout) curs.execute(self.sql_lock_table) # Check the append condition. failed, _ = self._read( curs=curs, query=condition.fail_if_events_match, after=condition.after, limit=1, return_head=False, ) if failed: raise IntegrityError(failed) # If okay, then do an "unconditional append". return self._unconditional_append(curs, psycopg_dcb_events)
def _unconditional_append( self, curs: Cursor[DictRow], psycopg_dcb_events: list[PsycopgDCBEvent] ) -> int: self.execute( curs, self.sql_unconditional_append, { "events": psycopg_dcb_events, }, explain=False, ) row = curs.fetchone() if row is None: # pragma: no cover msg = "Shouldn't get here" raise InternalError(msg) return row[DB_FUNCTION_NAME_DCB_UNCONDITIONAL_APPEND_TT]
[docs] def construct_psycopg_dcb_events( self, dcb_events: Sequence[DCBEvent] ) -> list[PsycopgDCBEvent]: return [ self.datastore.psycopg_python_types[DB_TYPE_NAME_DCB_EVENT_TT]( type=e.type, data=e.data, tags=e.tags, ) for e in dcb_events ]
[docs] def construct_psycopg_query_items( self, query_items: Sequence[DCBQueryItem] ) -> list[PsycopgDCBQueryItem]: return [ self.datastore.psycopg_python_types[DB_TYPE_NAME_DCB_QUERY_ITEM_TT]( types=q.types, tags=q.tags, ) for q in query_items ]
[docs] def has_one_query_item_one_type(self, query: DCBQuery) -> bool: return ( len(query.items) == 1 and len(query.items[0].types) == 1 and len(query.items[0].tags) == 0 )
[docs] def all_query_items_have_tags(self, query: DCBQuery) -> bool: return all(len(q.tags) > 0 for q in query.items) and len(query.items) > 0
[docs] def execute( self, cursor: Cursor[DictRow], statement: Composed, params: Params | None = None, *, explain: bool = False, prepare: bool = True, ) -> None: if explain: # pragma: no cover print() # noqa: T201 print("Statement:", statement.as_string().strip()) # noqa: T201 print("Params:", params) # noqa: T201 print() # noqa: T201 # with self.datastore.transaction(commit=False) as explain_cursor: # explain_cursor.execute(SQL_EXPLAIN + statement, params) # rows = explain_cursor.fetchall() # print("\n".join([r["QUERY PLAN"] for r in rows])) # no qa: T201 # print() # no qa: T201 # with self.datastore.transaction(commit=False) as explain_cursor: cursor.execute(SQL_EXPLAIN + statement, params) rows = cursor.fetchall() print("\n".join([r["QUERY PLAN"] for r in rows])) # noqa: T201 print() # noqa: T201 cursor.execute(statement, params, prepare=prepare)
[docs] class PsycopgDCBEvent(NamedTuple): type: str data: bytes tags: list[str]
[docs] class PsycopgDCBQueryItem(NamedTuple): types: list[str] tags: list[str]
[docs] class PostgresDCBSubscription(DCBListenNotifySubscription[PostgresDCBRecorderTT]):
[docs] def __init__( self, recorder: PostgresDCBRecorderTT, query: DCBQuery | None = None, after: int | None = None, ) -> None: super().__init__(recorder=recorder, query=query, after=after) self._has_listen_connection = threading.Event() self._listen_connection: Connection[dict[str, Any]] | None = None self._listen_thread = threading.Thread(target=self._listen, daemon=True) self._listen_thread.start()
def __exit__(self, *args: object, **kwargs: Any) -> None: super().__exit__(*args, **kwargs) self._listen_thread.join() def _listen(self) -> None: recorder = self._recorder assert isinstance(recorder, PostgresDCBRecorderTT) try: with recorder.datastore.get_connection() as conn: self._listen_connection = conn self._has_listen_connection.set() conn.execute( SQL("LISTEN {0}").format(Identifier(recorder.channel_name)) ) while not self._has_been_stopped and not self._thread_error: # This block simplifies psycopg's conn.notifies(), because # we aren't interested in the actual notify messages, and # also we want to stop consuming notify messages when the # subscription has an error or is otherwise stopped. with conn.lock: try: if conn.wait(notifies(conn.pgconn), interval=1): self._has_been_notified.set() except NO_TRACEBACK as ex: # pragma: no cover raise ex.with_traceback(None) from None except BaseException as e: # pragma: no cover if self._thread_error is None: self._thread_error = e self.stop()
[docs] class PostgresTTDCBFactory( BasePostgresFactory[PostgresTrackingRecorder], DCBInfrastructureFactory[PostgresTrackingRecorder], ):
[docs] def dcb_recorder(self) -> DCBRecorder: prefix = self.env.name.lower() or "dcb" dcb_table_name = prefix + "_events" recorder = PostgresDCBRecorderTT( datastore=self.datastore, events_table_name=dcb_table_name, ) if self.env_create_table(): recorder.create_table() return recorder