Application 5 - Searchable content¶
This example demonstrates how to extend the library’s application recorder classes to support full text search queries in an event-sourced application with both PostgreSQL and SQLite.
Application¶
The application class SearchableContentApplication
extends the ContentManagement
class presented in Application 3 - Content management.
Its save()
method sets the variable keyword
parameters insert_pages
and update_pages
. It also introduces a search()
method that
expects a query
argument and returns a list of pages. The application’s recorders are expected
to be receptive to these variable keyword parameters and to support the search_pages()
function.
from __future__ import annotations
from typing import TYPE_CHECKING, Any, cast
from examples.contentmanagement.application import ContentManagement, PageDetailsType
from examples.contentmanagement.domainmodel import Page
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo
if TYPE_CHECKING:
from uuid import UUID
from eventsourcing.domain import DomainEventProtocol, MutableOrImmutableAggregate
from eventsourcing.persistence import Recording
class FtsContentManagement(ContentManagement):
def save(
self,
*objs: MutableOrImmutableAggregate[UUID] | DomainEventProtocol[UUID] | None,
**kwargs: Any,
) -> list[Recording[UUID]]:
insert_pages: list[PageInfo] = []
update_pages: list[PageInfo] = []
for obj in objs:
if isinstance(obj, Page):
if obj.version == len(obj.pending_events):
insert_pages.append(PageInfo(obj.id, obj.slug, obj.title, obj.body))
else:
update_pages.append(PageInfo(obj.id, obj.slug, obj.title, obj.body))
kwargs["insert_pages"] = insert_pages
kwargs["update_pages"] = update_pages
return super().save(*objs, **kwargs)
def search(self, query: str) -> list[PageDetailsType]:
pages = []
recorder = cast("FtsRecorder", self.recorder)
for page_id in recorder.search_pages(query):
page = self.get_page_by_id(page_id)
pages.append(page)
return pages
Persistence¶
The recorder class SearchableContentRecorder
extends the AggregateRecorder
by
defining abstract methods to search and select pages. These methods will be implemented
for both PostgreSQL and SQLite, which will also create custom tables for page content with
a full text search indexes.
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING
from eventsourcing.persistence import Recorder
if TYPE_CHECKING:
from collections.abc import Sequence
from uuid import UUID
@dataclass(frozen=True)
class PageInfo:
id: UUID
slug: str
title: str
body: str
class FtsRecorder(Recorder, ABC):
@abstractmethod
def insert_pages(self, pages: Sequence[PageInfo]) -> None:
"""Insert a sequence of pages (id, slug, title, body)."""
@abstractmethod
def update_pages(self, pages: Sequence[PageInfo]) -> None:
"""Update a sequence of pages (id, slug, title, body)."""
@abstractmethod
def search_pages(self, query: str) -> list[UUID]:
"""Returns IDs for pages that match query."""
@abstractmethod
def select_page(self, page_id: UUID) -> PageInfo:
"""Returns slug, title and body for given ID."""
def search(self, query: str) -> Sequence[PageInfo]:
pages = []
for page_id in self.search_pages(query):
page = self.select_page(page_id)
pages.append(page)
return pages
The _insert_events()
methods of the PostgreSQL and SQLite recorders are extended, so that
rows are inserted and updated, according to the information passed down from the application
in the variable keyword arguments insert_pages
and update_pages
.
PostgreSQL¶
The PostgreSQL recorder uses a GIN index and the websearch_to_tsquery()
function.
The PostgreSQL Factory
class is extended to involve this custom recorder
in a custom PostgreSQL persistence module so that it can be used by the ContentManagement
application.
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from psycopg.sql import SQL, Identifier
from eventsourcing.postgres import (
PostgresApplicationRecorder,
PostgresDatastore,
PostgresRecorder,
)
from examples.contentmanagement.application import PageNotFoundError
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo
if TYPE_CHECKING:
from collections.abc import Sequence
from uuid import UUID
from psycopg import Cursor
from psycopg.rows import DictRow
from eventsourcing.persistence import StoredEvent
class PostgresFtsRecorder(
PostgresRecorder,
FtsRecorder,
):
def __init__(
self,
datastore: PostgresDatastore,
fts_table_name: str = "ftsprojection",
**kwargs: Any,
):
super().__init__(datastore, **kwargs)
self.check_table_name_length(fts_table_name)
self.fts_table_name = fts_table_name
self.create_table_statements.append(
SQL(
"CREATE TABLE IF NOT EXISTS "
"{0}.{1} ("
"page_id uuid, "
"page_slug text, "
"page_title text, "
"page_body text, "
"PRIMARY KEY "
"(page_id))"
).format(
Identifier(self.datastore.schema),
Identifier(self.fts_table_name),
)
)
self.create_table_statements.append(
SQL(
"CREATE INDEX IF NOT EXISTS {0} "
"ON {1}.{2} "
"USING GIN (to_tsvector('english', page_body))"
).format(
Identifier(self.fts_table_name + "_idx"),
Identifier(self.datastore.schema),
Identifier(self.fts_table_name),
)
)
self.select_page_statement = SQL(
"SELECT page_slug, page_title, page_body FROM {0}.{1} WHERE page_id = %s"
).format(
Identifier(self.datastore.schema),
Identifier(self.fts_table_name),
)
self.insert_page_statement = SQL(
"INSERT INTO {0}.{1} VALUES (%s, %s, %s, %s)"
).format(
Identifier(self.datastore.schema),
Identifier(self.fts_table_name),
)
self.update_page_statement = SQL(
"UPDATE {0}.{1} SET "
"page_slug = %s, "
"page_title = %s, "
"page_body = %s "
"WHERE page_id = %s"
).format(
Identifier(self.datastore.schema),
Identifier(self.fts_table_name),
)
self.search_pages_statement = SQL(
"SELECT page_id FROM {0}.{1} WHERE "
"to_tsvector('english', page_body) @@ "
"websearch_to_tsquery('english', %s)"
).format(
Identifier(self.datastore.schema),
Identifier(self.fts_table_name),
)
def insert_pages(self, pages: Sequence[PageInfo]) -> None:
with self.datastore.transaction(commit=True) as curs:
self._insert_pages(curs, pages)
def update_pages(self, pages: Sequence[PageInfo]) -> None:
with self.datastore.transaction(commit=True) as curs:
self._update_pages(curs, pages)
def _insert_pages(self, curs: Cursor[DictRow], pages: Sequence[PageInfo]) -> None:
for page in pages:
params = (page.id, page.slug, page.title, page.body)
curs.execute(self.insert_page_statement, params, prepare=True)
def _update_pages(self, curs: Cursor[DictRow], pages: Sequence[PageInfo]) -> None:
for page in pages:
params = (page.slug, page.title, page.body, page.id)
curs.execute(self.update_page_statement, params, prepare=True)
def search_pages(self, query: str) -> list[UUID]:
with self.datastore.transaction(commit=False) as curs:
curs.execute(self.search_pages_statement, [query], prepare=True)
return [row["page_id"] for row in curs.fetchall()]
def select_page(self, page_id: UUID) -> PageInfo:
with self.datastore.transaction(commit=False) as curs:
curs.execute(self.select_page_statement, [str(page_id)], prepare=True)
for row in curs.fetchall():
return PageInfo(
id=page_id,
slug=row["page_slug"],
title=row["page_title"],
body=row["page_body"],
)
msg = f"Page ID {page_id} not found"
raise PageNotFoundError(msg)
class PostgresFtsApplicationRecorder(PostgresFtsRecorder, PostgresApplicationRecorder):
def _insert_events(
self,
curs: Cursor[DictRow],
stored_events: Sequence[StoredEvent],
*,
insert_pages: Sequence[PageInfo] = (),
update_pages: Sequence[PageInfo] = (),
**kwargs: Any,
) -> None:
notification_ids = super()._insert_events(curs, stored_events, **kwargs)
self._insert_pages(curs, pages=insert_pages)
self._update_pages(curs, pages=update_pages)
return notification_ids
SQLite¶
The SQLite recorder uses a virtual table and the MATCH
operator.
The SQLite Factory
class is extended to involve this custom recorder
in a custom SQLite persistence module so that it can be used by the ContentManagement
application.
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from uuid import UUID
from eventsourcing.sqlite import (
SQLiteApplicationRecorder,
SQLiteCursor,
SQLiteDatastore,
SQLiteRecorder,
)
from examples.contentmanagement.application import PageNotFoundError
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo
if TYPE_CHECKING:
from collections.abc import Sequence
from eventsourcing.persistence import StoredEvent
class SQLiteFtsRecorder(FtsRecorder, SQLiteRecorder):
def __init__(
self,
datastore: SQLiteDatastore,
**kwargs: Any,
):
super().__init__(datastore, **kwargs)
self.fts_table_name = "ftsprojection"
self.pages_virtual_table_name = self.fts_table_name + "_fts"
self.create_table_statements.append(
"CREATE TABLE IF NOT EXISTS "
f"{self.fts_table_name} ("
"page_id TEXT, "
"page_slug TEXT, "
"page_title TEXT, "
"page_body TEXT, "
"PRIMARY KEY "
"(page_id)) "
)
self.create_table_statements.append(
f"CREATE VIRTUAL TABLE {self.pages_virtual_table_name} USING fts5("
f"page_id, page_body, content='{self.fts_table_name}')"
)
self.create_table_statements.append(
"CREATE TRIGGER projection_ai AFTER INSERT ON "
f"{self.fts_table_name} BEGIN "
f"INSERT INTO {self.pages_virtual_table_name} "
"(rowid, page_id, page_body) "
"VALUES (new.rowid, new.page_id, new.page_body); "
"END"
)
self.create_table_statements.append(
"CREATE TRIGGER projection_au AFTER UPDATE ON "
f"{self.fts_table_name} "
"BEGIN "
f"INSERT INTO {self.pages_virtual_table_name} "
f"({self.pages_virtual_table_name}, rowid, page_id, page_body) "
"VALUES ('delete', old.rowid, old.page_id, old.page_body);"
f"INSERT INTO {self.pages_virtual_table_name} "
"(rowid, page_id, page_body) "
"VALUES (new.rowid, new.page_id, new.page_body); "
"END"
)
self.select_page_statement = (
"SELECT page_slug, page_title, page_body FROM "
f"{self.fts_table_name} WHERE page_id = ?"
)
self.select_page_from_virtual_table_statement = (
"SELECT page_body FROM "
f"{self.pages_virtual_table_name} WHERE page_id = ?"
)
self.insert_page_statement = (
f"INSERT INTO {self.fts_table_name} VALUES (?, ?, ?, ?)"
)
self.update_page_statement = (
f"UPDATE {self.fts_table_name} "
"SET page_slug = ?, page_title = ?, page_body = ? WHERE page_id = ?"
)
self.search_pages_statement = (
f"SELECT page_id FROM {self.pages_virtual_table_name} "
f"WHERE page_body MATCH ?"
)
def insert_pages(self, pages: Sequence[PageInfo]) -> None:
with self.datastore.transaction(commit=True) as c:
self._insert_pages(c, pages=pages)
def _insert_pages(self, c: SQLiteCursor, pages: Sequence[PageInfo]) -> None:
for page in pages:
c.execute(
self.insert_page_statement,
(str(page.id), page.slug, page.title, page.body),
)
def update_pages(self, pages: Sequence[PageInfo]) -> None:
with self.datastore.transaction(commit=True) as c:
self._update_pages(c, pages=pages)
def _update_pages(self, c: SQLiteCursor, pages: Sequence[PageInfo]) -> None:
for page in pages:
c.execute(
self.update_page_statement,
(page.slug, page.title, page.body, str(page.id)),
)
def search_pages(self, query: str) -> list[UUID]:
with self.datastore.transaction(commit=False) as c:
c.execute(self.search_pages_statement, [query])
return [UUID(row["page_id"]) for row in c.fetchall()]
def select_page(self, page_id: UUID) -> PageInfo:
with self.datastore.transaction(commit=False) as c:
c.execute(self.select_page_statement, [str(page_id)])
for row in c.fetchall():
return PageInfo(
id=page_id,
slug=row["page_slug"],
title=row["page_title"],
body=row["page_body"],
)
msg = f"Page ID {page_id} not found"
raise PageNotFoundError(msg)
class SQLiteFtsApplicationRecorder(SQLiteFtsRecorder, SQLiteApplicationRecorder):
def _insert_events(
self,
c: SQLiteCursor,
stored_events: Sequence[StoredEvent],
*,
insert_pages: Sequence[PageInfo] = (),
update_pages: Sequence[PageInfo] = (),
**kwargs: Any,
) -> Sequence[int] | None:
notification_ids = super()._insert_events(c, stored_events, **kwargs)
self._insert_pages(c, pages=insert_pages)
self._update_pages(c, pages=update_pages)
return notification_ids
Test case¶
The test case SearchableContentApplicationTestCase
uses the SearchableContentApplication
to
create three pages, for ‘animals’, ‘plants’ and ‘minerals’. Content is added to the pages. The
content is searched with various queries and the search results are checked. The
test case is executed twice, once with the PostgreSQL persistence module, and once with the
SQLite persistence module.
from __future__ import annotations
from typing import ClassVar
from unittest import TestCase
from uuid import uuid4
from eventsourcing.tests.postgres_utils import drop_tables
from eventsourcing.utils import get_topic
from examples.contentmanagement.domainmodel import user_id_cvar
from examples.ftscontentmanagement.application import FtsContentManagement
from examples.ftscontentmanagement.postgres import PostgresFtsApplicationRecorder
from examples.ftscontentmanagement.sqlite import SQLiteFtsApplicationRecorder
class FtsContentManagementTestCase(TestCase):
env: ClassVar[dict[str, str]] = {}
def test_app(self) -> None:
app = FtsContentManagement(env=self.env)
# Set user_id context variable.
user_id = uuid4()
user_id_cvar.set(user_id)
# Create empty pages.
app.create_page(title="Animals", slug="animals")
app.create_page(title="Plants", slug="plants")
app.create_page(title="Minerals", slug="minerals")
# Search, expect no results.
self.assertEqual(0, len(app.search("dog")))
self.assertEqual(0, len(app.search("rose")))
self.assertEqual(0, len(app.search("zinc")))
# Update the pages.
app.update_body(slug="animals", body="cat dog zebra")
app.update_body(slug="plants", body="bluebell rose jasmine")
app.update_body(slug="minerals", body="iron zinc calcium")
# Search for single words, expect results.
pages = app.search("dog")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "animals")
self.assertEqual(pages[0]["body"], "cat dog zebra")
pages = app.search("rose")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "plants")
self.assertEqual(pages[0]["body"], "bluebell rose jasmine")
pages = app.search("zinc")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "minerals")
self.assertEqual(pages[0]["body"], "iron zinc calcium")
# Search for multiple words in same page.
pages = app.search("dog cat")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "animals")
self.assertEqual(pages[0]["body"], "cat dog zebra")
# Search for multiple words in same page, expect no results.
pages = app.search("rose zebra")
self.assertEqual(0, len(pages))
# Search for alternative words, expect two results.
pages = app.search("rose OR zebra")
self.assertEqual(2, len(pages))
self.assertEqual(["animals", "plants"], sorted(p["slug"] for p in pages))
class TestWithSQLite(FtsContentManagementTestCase):
env: ClassVar[dict[str, str]] = {
"PERSISTENCE_MODULE": "eventsourcing.sqlite",
"APPLICATION_RECORDER_TOPIC": get_topic(SQLiteFtsApplicationRecorder),
"SQLITE_DBNAME": ":memory:",
}
class TestWithPostgres(FtsContentManagementTestCase):
env: ClassVar[dict[str, str]] = {
"PERSISTENCE_MODULE": "eventsourcing.postgres",
"POSTGRES_DBNAME": "eventsourcing",
"POSTGRES_HOST": "127.0.0.1",
"POSTGRES_PORT": "5432",
"POSTGRES_USER": "eventsourcing",
"POSTGRES_PASSWORD": "eventsourcing",
"APPLICATION_RECORDER_TOPIC": get_topic(PostgresFtsApplicationRecorder),
}
def setUp(self) -> None:
drop_tables()
super().setUp()
def tearDown(self) -> None:
super().tearDown()
drop_tables()
del FtsContentManagementTestCase
Code reference¶
- class examples.ftscontentmanagement.application.FtsContentManagement(env: EnvType | None = None)[source]¶
Bases:
ContentManagement
- save(*objs: MutableOrImmutableAggregate[UUID] | DomainEventProtocol[UUID] | None, **kwargs: Any) list[Recording[UUID]] [source]¶
Collects pending events from given aggregates and puts them in the application’s event store.
- name = 'FtsContentManagement'¶
- class examples.ftscontentmanagement.persistence.PageInfo(id: 'UUID', slug: 'str', title: 'str', body: 'str')[source]¶
Bases:
object
- id: UUID¶
- slug: str¶
- title: str¶
- body: str¶
- class examples.ftscontentmanagement.persistence.FtsRecorder[source]¶
Bases:
Recorder
,ABC
- abstractmethod insert_pages(pages: Sequence[PageInfo]) None [source]¶
Insert a sequence of pages (id, slug, title, body).
- abstractmethod update_pages(pages: Sequence[PageInfo]) None [source]¶
Update a sequence of pages (id, slug, title, body).
- abstractmethod search_pages(query: str) list[UUID] [source]¶
Returns IDs for pages that match query.
- class examples.ftscontentmanagement.postgres.PostgresFtsRecorder(datastore: PostgresDatastore, fts_table_name: str = 'ftsprojection', **kwargs: Any)[source]¶
Bases:
PostgresRecorder
,FtsRecorder
- insert_pages(pages: Sequence[PageInfo]) None [source]¶
Insert a sequence of pages (id, slug, title, body).
- class examples.ftscontentmanagement.postgres.PostgresFtsApplicationRecorder(datastore: PostgresDatastore, fts_table_name: str = 'ftsprojection', **kwargs: Any)[source]¶
- class examples.ftscontentmanagement.sqlite.SQLiteFtsRecorder(datastore: SQLiteDatastore, **kwargs: Any)[source]¶
Bases:
FtsRecorder
,SQLiteRecorder
- insert_pages(pages: Sequence[PageInfo]) None [source]¶
Insert a sequence of pages (id, slug, title, body).
- _insert_pages(c: SQLiteCursor, pages: Sequence[PageInfo]) None [source]¶
- update_pages(pages: Sequence[PageInfo]) None [source]¶
Update a sequence of pages (id, slug, title, body).
- _update_pages(c: SQLiteCursor, pages: Sequence[PageInfo]) None [source]¶
- class examples.ftscontentmanagement.sqlite.SQLiteFtsApplicationRecorder(datastore: SQLiteDatastore, **kwargs: Any)[source]¶
- class examples.ftscontentmanagement.test_application.TestWithSQLite(methodName='runTest')[source]¶
Bases:
FtsContentManagementTestCase
- env: ClassVar[dict[str, str]] = {'APPLICATION_RECORDER_TOPIC': 'examples.ftscontentmanagement.sqlite:SQLiteFtsApplicationRecorder', 'PERSISTENCE_MODULE': 'eventsourcing.sqlite', 'SQLITE_DBNAME': ':memory:'}¶
- class examples.ftscontentmanagement.test_application.TestWithPostgres(methodName='runTest')[source]¶
Bases:
FtsContentManagementTestCase
- env: ClassVar[dict[str, str]] = {'APPLICATION_RECORDER_TOPIC': 'examples.ftscontentmanagement.postgres:PostgresFtsApplicationRecorder', 'PERSISTENCE_MODULE': 'eventsourcing.postgres', 'POSTGRES_DBNAME': 'eventsourcing', 'POSTGRES_HOST': '127.0.0.1', 'POSTGRES_PASSWORD': 'eventsourcing', 'POSTGRES_PORT': '5432', 'POSTGRES_USER': 'eventsourcing'}¶