from __future__ import annotations
from typing import TYPE_CHECKING, Any
from psycopg.sql import SQL, Identifier
from eventsourcing.postgres import (
PostgresApplicationRecorder,
PostgresDatastore,
PostgresRecorder,
)
from examples.contentmanagement.application import PageNotFoundError
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo
if TYPE_CHECKING:
from collections.abc import Sequence
from uuid import UUID
from psycopg import Cursor
from psycopg.rows import DictRow
from eventsourcing.persistence import StoredEvent
[docs]
class PostgresFtsRecorder(
PostgresRecorder,
FtsRecorder,
):
def __init__(
self,
datastore: PostgresDatastore,
fts_table_name: str = "ftsprojection",
**kwargs: Any,
):
super().__init__(datastore, **kwargs)
self.check_table_name_length(fts_table_name)
self.fts_table_name = fts_table_name
self.create_table_statements.append(
SQL(
"CREATE TABLE IF NOT EXISTS "
"{0} ("
"page_id uuid, "
"page_slug text, "
"page_title text, "
"page_body text, "
"PRIMARY KEY "
"(page_id))"
).format(Identifier(self.fts_table_name))
)
self.create_table_statements.append(
SQL(
"CREATE INDEX IF NOT EXISTS {0} "
"ON {1} "
"USING GIN (to_tsvector('english', page_body))"
).format(
Identifier(self.fts_table_name + "_idx"),
Identifier(self.fts_table_name),
)
)
self.select_page_statement = SQL(
"SELECT page_slug, page_title, page_body FROM {0} WHERE page_id = %s"
).format(Identifier(self.fts_table_name))
self.insert_page_statement = SQL(
"INSERT INTO {0} VALUES (%s, %s, %s, %s)"
).format(Identifier(self.fts_table_name))
self.update_page_statement = SQL(
"UPDATE {0} SET "
"page_slug = %s, "
"page_title = %s, "
"page_body = %s "
"WHERE page_id = %s"
).format(Identifier(self.fts_table_name))
self.search_pages_statement = SQL(
"SELECT page_id FROM {0} WHERE "
"to_tsvector('english', page_body) @@ "
"websearch_to_tsquery('english', %s)"
).format(Identifier(self.fts_table_name))
[docs]
def insert_pages(self, pages: Sequence[PageInfo]) -> None:
with self.datastore.transaction(commit=True) as curs:
self._insert_pages(curs, pages)
[docs]
def update_pages(self, pages: Sequence[PageInfo]) -> None:
with self.datastore.transaction(commit=True) as curs:
self._update_pages(curs, pages)
[docs]
def _insert_pages(self, curs: Cursor[DictRow], pages: Sequence[PageInfo]) -> None:
for page in pages:
params = (page.id, page.slug, page.title, page.body)
curs.execute(self.insert_page_statement, params, prepare=True)
[docs]
def _update_pages(self, curs: Cursor[DictRow], pages: Sequence[PageInfo]) -> None:
for page in pages:
params = (page.slug, page.title, page.body, page.id)
curs.execute(self.update_page_statement, params, prepare=True)
[docs]
def search_pages(self, query: str) -> list[UUID]:
with self.datastore.transaction(commit=False) as curs:
curs.execute(self.search_pages_statement, [query], prepare=True)
return [row["page_id"] for row in curs.fetchall()]
[docs]
def select_page(self, page_id: UUID) -> PageInfo:
with self.datastore.transaction(commit=False) as curs:
curs.execute(self.select_page_statement, [str(page_id)], prepare=True)
for row in curs.fetchall():
return PageInfo(
id=page_id,
slug=row["page_slug"],
title=row["page_title"],
body=row["page_body"],
)
msg = f"Page ID {page_id} not found"
raise PageNotFoundError(msg)
[docs]
class PostgresFtsApplicationRecorder(PostgresFtsRecorder, PostgresApplicationRecorder):
def _insert_events(
self,
curs: Cursor[DictRow],
stored_events: list[StoredEvent],
*,
insert_pages: Sequence[PageInfo] = (),
update_pages: Sequence[PageInfo] = (),
**kwargs: Any,
) -> None:
notification_ids = super()._insert_events(curs, stored_events, **kwargs)
self._insert_pages(curs, pages=insert_pages)
self._update_pages(curs, pages=update_pages)
return notification_ids