from __future__ import annotations
from typing import TYPE_CHECKING, Any
from uuid import UUID
from eventsourcing.sqlite import (
SQLiteApplicationRecorder,
SQLiteCursor,
SQLiteDatastore,
SQLiteRecorder,
)
from examples.contentmanagement.application import PageNotFoundError
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo
if TYPE_CHECKING:
from collections.abc import Sequence
from eventsourcing.persistence import StoredEvent
[docs]
class SQLiteFtsRecorder(FtsRecorder, SQLiteRecorder):
def __init__(
self,
datastore: SQLiteDatastore,
**kwargs: Any,
):
super().__init__(datastore, **kwargs)
self.fts_table_name = "ftsprojection"
self.pages_virtual_table_name = self.fts_table_name + "_fts"
self.create_table_statements.append(
"CREATE TABLE IF NOT EXISTS "
f"{self.fts_table_name} ("
"page_id TEXT, "
"page_slug TEXT, "
"page_title TEXT, "
"page_body TEXT, "
"PRIMARY KEY "
"(page_id)) "
)
self.create_table_statements.append(
f"CREATE VIRTUAL TABLE {self.pages_virtual_table_name} USING fts5("
f"page_id, page_body, content='{self.fts_table_name}')"
)
self.create_table_statements.append(
"CREATE TRIGGER projection_ai AFTER INSERT ON "
f"{self.fts_table_name} BEGIN "
f"INSERT INTO {self.pages_virtual_table_name} "
"(rowid, page_id, page_body) "
"VALUES (new.rowid, new.page_id, new.page_body); "
"END"
)
self.create_table_statements.append(
"CREATE TRIGGER projection_au AFTER UPDATE ON "
f"{self.fts_table_name} "
"BEGIN "
f"INSERT INTO {self.pages_virtual_table_name} "
f"({self.pages_virtual_table_name}, rowid, page_id, page_body) "
"VALUES ('delete', old.rowid, old.page_id, old.page_body);"
f"INSERT INTO {self.pages_virtual_table_name} "
"(rowid, page_id, page_body) "
"VALUES (new.rowid, new.page_id, new.page_body); "
"END"
)
self.select_page_statement = (
"SELECT page_slug, page_title, page_body FROM "
f"{self.fts_table_name} WHERE page_id = ?"
)
self.select_page_from_virtual_table_statement = (
"SELECT page_body FROM "
f"{self.pages_virtual_table_name} WHERE page_id = ?"
)
self.insert_page_statement = (
f"INSERT INTO {self.fts_table_name} VALUES (?, ?, ?, ?)"
)
self.update_page_statement = (
f"UPDATE {self.fts_table_name} "
"SET page_slug = ?, page_title = ?, page_body = ? WHERE page_id = ?"
)
self.search_pages_statement = (
f"SELECT page_id FROM {self.pages_virtual_table_name} "
f"WHERE page_body MATCH ?"
)
[docs]
def insert_pages(self, pages: Sequence[PageInfo]) -> None:
with self.datastore.transaction(commit=True) as c:
self._insert_pages(c, pages=pages)
[docs]
def _insert_pages(self, c: SQLiteCursor, pages: Sequence[PageInfo]) -> None:
for page in pages:
c.execute(
self.insert_page_statement,
(str(page.id), page.slug, page.title, page.body),
)
[docs]
def update_pages(self, pages: Sequence[PageInfo]) -> None:
with self.datastore.transaction(commit=True) as c:
self._update_pages(c, pages=pages)
[docs]
def _update_pages(self, c: SQLiteCursor, pages: Sequence[PageInfo]) -> None:
for page in pages:
c.execute(
self.update_page_statement,
(page.slug, page.title, page.body, str(page.id)),
)
[docs]
def search_pages(self, query: str) -> list[UUID]:
with self.datastore.transaction(commit=False) as c:
c.execute(self.search_pages_statement, [query])
return [UUID(row["page_id"]) for row in c.fetchall()]
[docs]
def select_page(self, page_id: UUID) -> PageInfo:
with self.datastore.transaction(commit=False) as c:
c.execute(self.select_page_statement, [str(page_id)])
for row in c.fetchall():
return PageInfo(
id=page_id,
slug=row["page_slug"],
title=row["page_title"],
body=row["page_body"],
)
msg = f"Page ID {page_id} not found"
raise PageNotFoundError(msg)
[docs]
class SQLiteFtsApplicationRecorder(SQLiteFtsRecorder, SQLiteApplicationRecorder):
def _insert_events(
self,
c: SQLiteCursor,
stored_events: Sequence[StoredEvent],
*,
insert_pages: Sequence[PageInfo] = (),
update_pages: Sequence[PageInfo] = (),
**kwargs: Any,
) -> Sequence[int] | None:
notification_ids = super()._insert_events(c, stored_events, **kwargs)
self._insert_pages(c, pages=insert_pages)
self._update_pages(c, pages=update_pages)
return notification_ids