Application 5 - Searchable content¶
This example demonstrates how to extend the library’s application recorder classes to support full text search queries in an event-sourced application with both PostgreSQL and SQLite.
Application¶
The application class SearchableContentApplication
extends the ContentManagementApplication
class presented in Application 3 - Content management.
Its save()
method sets the variable keyword
parameters insert_pages
and update_pages
. It also introduces a search()
method that
expects a query
argument and returns a list of pages. The application’s recorders are expected
to be receptive to these variable keyword parameters and to support the search_pages()
function.
from __future__ import annotations
from typing import Any, List, Optional, Tuple, Union, cast
from uuid import UUID
from eventsourcing.domain import DomainEventProtocol, MutableOrImmutableAggregate
from eventsourcing.examples.contentmanagement.application import (
ContentManagementApplication,
PageDetailsType,
)
from eventsourcing.examples.contentmanagement.domainmodel import Page
from eventsourcing.examples.searchablecontent.persistence import (
SearchableContentRecorder,
)
from eventsourcing.persistence import Recording
class SearchableContentApplication(ContentManagementApplication):
def save(
self,
*objs: Optional[Union[MutableOrImmutableAggregate, DomainEventProtocol]],
**kwargs: Any,
) -> List[Recording]:
insert_pages: List[Tuple[UUID, str, str, str]] = []
update_pages: List[Tuple[UUID, str, str, str]] = []
for obj in objs:
if isinstance(obj, Page):
if obj.version == len(obj.pending_events):
insert_pages.append((obj.id, obj.slug, obj.title, obj.body))
else:
update_pages.append((obj.id, obj.slug, obj.title, obj.body))
kwargs["insert_pages"] = insert_pages
kwargs["update_pages"] = update_pages
return super().save(*objs, **kwargs)
def search(self, query: str) -> List[PageDetailsType]:
pages = []
recorder = cast(SearchableContentRecorder, self.recorder)
for page_id in recorder.search_pages(query):
page = self.get_page_by_id(page_id)
pages.append(page)
return pages
Persistence¶
The recorder class SearchableContentRecorder
extends the AggregateRecorder
by
defining abstract methods to search and select pages. These methods will be implemented
for both PostgreSQL and SQLite, which will also create custom tables for page content with
a full text search indexes.
from __future__ import annotations
from abc import abstractmethod
from typing import List, Tuple
from uuid import UUID
from eventsourcing.persistence import AggregateRecorder
class SearchableContentRecorder(AggregateRecorder):
@abstractmethod
def search_pages(self, query: str) -> List[UUID]:
"""
Returns IDs for pages that match query.
"""
@abstractmethod
def select_page(self, page_id: UUID) -> Tuple[str, str, str]:
"""
Returns slug, title and body for given ID.
"""
The _insert_events()
methods of the PostgreSQL and SQLite recorders are extended, so that
rows are inserted and updated, according to the information passed down from the application
in the variable keyword arguments insert_pages
and update_pages
.
PostgreSQL¶
The PostgreSQL recorder uses a GIN index and the websearch_to_tsquery()
function.
The PostgreSQL Factory
class is extended to involve this custom recorder
in a custom PostgreSQL persistence module so that it can be used by the ContentManagementApplication
.
from __future__ import annotations
from typing import Any, List, Optional, Sequence, Tuple
from uuid import UUID
from eventsourcing.examples.contentmanagement.application import PageNotFound
from eventsourcing.examples.searchablecontent.persistence import (
SearchableContentRecorder,
)
from eventsourcing.persistence import StoredEvent
from eventsourcing.postgres import (
Factory,
PostgresAggregateRecorder,
PostgresApplicationRecorder,
PostgresConnection,
PostgresCursor,
)
class PostgresSearchableContentRecorder(
SearchableContentRecorder,
PostgresAggregateRecorder,
):
pages_table_name = "pages_projection_example"
select_page_statement = (
f"SELECT page_slug, page_title, page_body FROM {pages_table_name}"
f" WHERE page_id = $1"
)
select_page_statement_name = f"select_{pages_table_name}".replace(".", "_")
insert_page_statement = f"INSERT INTO {pages_table_name} VALUES ($1, $2, $3, $4)"
insert_page_statement_name = f"insert_{pages_table_name}".replace(".", "_")
update_page_statement = (
f"UPDATE {pages_table_name} "
f"SET page_slug = $1, page_title = $2, page_body = $3 WHERE page_id = $4"
)
update_page_statement_name = f"update_{pages_table_name}".replace(".", "_")
search_pages_statement = (
f"SELECT page_id FROM {pages_table_name} WHERE "
f"to_tsvector('english', page_body) @@ websearch_to_tsquery('english', $1)"
)
search_pages_statement_name = f"search_{pages_table_name}".replace(".", "_")
def construct_create_table_statements(self) -> List[str]:
statements = super().construct_create_table_statements()
statements.append(
"CREATE TABLE IF NOT EXISTS "
f"{self.pages_table_name} ("
"page_id uuid, "
"page_slug text, "
"page_title text, "
"page_body text, "
"PRIMARY KEY "
"(page_id))"
)
statements.append(
f"CREATE INDEX IF NOT EXISTS {self.pages_table_name}_idx "
f"ON {self.pages_table_name} "
f"USING GIN (to_tsvector('english', page_body))"
)
return statements
def _prepare_insert_events(self, conn: PostgresConnection) -> None:
super()._prepare_insert_events(conn)
self._prepare(conn, self.insert_page_statement_name, self.insert_page_statement)
self._prepare(conn, self.update_page_statement_name, self.update_page_statement)
def _insert_events(
self,
c: PostgresCursor,
stored_events: List[StoredEvent],
**kwargs: Any,
) -> Optional[Sequence[int]]:
notification_ids = super()._insert_events(c, stored_events, **kwargs)
self._insert_pages(c, **kwargs)
self._update_pages(c, **kwargs)
return notification_ids
def _insert_pages(
self,
c: PostgresCursor,
insert_pages: Sequence[Tuple[UUID, str, str, str]] = (),
**_: Any,
) -> None:
for page_id, page_slug, page_title, page_body in insert_pages:
statement_alias = self.statement_name_aliases[
self.insert_page_statement_name
]
c.execute(
f"EXECUTE {statement_alias}(%s, %s, %s, %s)",
(
page_id,
page_slug,
page_title,
page_body,
),
)
def _update_pages(
self,
c: PostgresCursor,
update_pages: Sequence[Tuple[UUID, str, str, str]] = (),
**_: Any,
) -> None:
for page_id, page_slug, page_title, page_body in update_pages:
statement_alias = self.statement_name_aliases[
self.update_page_statement_name
]
c.execute(
f"EXECUTE {statement_alias}(%s, %s, %s, %s)",
(
page_slug,
page_title,
page_body,
page_id,
),
)
def search_pages(self, query: str) -> List[UUID]:
page_ids = []
with self.datastore.get_connection() as conn:
self._prepare(
conn,
self.search_pages_statement_name,
self.search_pages_statement,
)
with conn.transaction(commit=False) as curs:
statement_alias = self.statement_name_aliases[
self.search_pages_statement_name
]
curs.execute(f"EXECUTE {statement_alias}(%s)", [query])
for row in curs.fetchall():
page_ids.append(row["page_id"])
return page_ids
def select_page(self, page_id: UUID) -> Tuple[str, str, str]:
with self.datastore.get_connection() as conn:
self._prepare(
conn,
self.select_page_statement_name,
self.select_page_statement,
)
with conn.transaction(commit=False) as curs:
statement_alias = self.statement_name_aliases[
self.select_page_statement_name
]
curs.execute(f"EXECUTE {statement_alias}(%s)", [str(page_id)])
for row in curs.fetchall():
return row["page_slug"], row["page_title"], row["page_body"]
raise PageNotFound(f"Page ID {page_id} not found")
class SearchableContentApplicationRecorder(
PostgresSearchableContentRecorder, PostgresApplicationRecorder
):
pass
class SearchableContentInfrastructureFactory(Factory):
application_recorder_class = SearchableContentApplicationRecorder
del Factory
SQLite¶
The SQLite recorder uses a virtual table and the MATCH
operator.
The SQLite Factory
class is extended to involve this custom recorder
in a custom SQLite persistence module so that it can be used by the ContentManagementApplication
.
from __future__ import annotations
from typing import Any, List, Optional, Sequence, Tuple
from uuid import UUID
from eventsourcing.examples.contentmanagement.application import PageNotFound
from eventsourcing.examples.searchablecontent.persistence import (
SearchableContentRecorder,
)
from eventsourcing.persistence import StoredEvent
from eventsourcing.sqlite import (
Factory,
SQLiteAggregateRecorder,
SQLiteApplicationRecorder,
SQLiteCursor,
)
class SQLiteSearchableContentRecorder(
SearchableContentRecorder, SQLiteAggregateRecorder
):
pages_table_name = "pages_projection_example"
pages_virtual_table_name = pages_table_name + "_fts"
select_page_statement = (
f"SELECT page_slug, page_title, page_body FROM "
f"{pages_table_name} WHERE page_id = ?"
)
insert_page_statement = f"INSERT INTO {pages_table_name} VALUES (?, ?, ?, ?)"
update_page_statement = (
f"UPDATE {pages_table_name} "
f"SET page_slug = ?, page_title = ?, page_body = ? WHERE page_id = ?"
)
search_pages_statement = (
f"SELECT page_id FROM {pages_virtual_table_name} WHERE " f"page_body MATCH ?"
)
def construct_create_table_statements(self) -> List[str]:
statements = super().construct_create_table_statements()
statements.append(
"CREATE TABLE IF NOT EXISTS "
f"{self.pages_table_name} ("
"page_id TEXT, "
"page_slug TEXT, "
"page_title TEXT, "
"page_body TEXT, "
"PRIMARY KEY "
"(page_id)) "
)
statements.append(
f"CREATE VIRTUAL TABLE {self.pages_virtual_table_name} USING fts5("
f"page_id, page_body, content='{self.pages_table_name}')"
)
statements.append(
f"CREATE TRIGGER projection_ai AFTER INSERT ON "
f"{self.pages_table_name} BEGIN "
f"INSERT INTO {self.pages_virtual_table_name} "
f"(rowid, page_id, page_body) "
f"VALUES (new.rowid, new.page_id, new.page_body); "
f"END"
)
statements.append(
f"CREATE TRIGGER projection_au AFTER UPDATE ON "
f"{self.pages_table_name} "
f"BEGIN "
f"INSERT INTO {self.pages_virtual_table_name} "
f"({self.pages_virtual_table_name}, rowid, page_id, page_body) "
f"VALUES ('delete', old.rowid, old.page_id, old.page_body);"
f"INSERT INTO {self.pages_virtual_table_name} "
f"(rowid, page_id, page_body) "
f"VALUES (new.rowid, new.page_id, new.page_body); "
f"END"
)
return statements
def _insert_events(
self,
c: SQLiteCursor,
stored_events: List[StoredEvent],
**kwargs: Any,
) -> Optional[Sequence[int]]:
notification_ids = super()._insert_events(c, stored_events, **kwargs)
self._insert_pages(c, **kwargs)
self._update_pages(c, **kwargs)
return notification_ids
def _insert_pages(
self,
c: SQLiteCursor,
insert_pages: Sequence[Tuple[UUID, str, str, str]] = (),
**_: Any,
) -> None:
for page_id, page_slug, page_title, page_body in insert_pages:
c.execute(
self.insert_page_statement,
(str(page_id), page_slug, page_title, page_body),
)
def _update_pages(
self,
c: SQLiteCursor,
update_pages: Sequence[Tuple[UUID, str, str, str]] = (),
**_: Any,
) -> None:
for page_id, page_slug, page_title, page_body in update_pages:
c.execute(
self.update_page_statement,
(page_slug, page_title, page_body, str(page_id)),
)
def search_pages(self, query: str) -> List[UUID]:
page_slugs = []
with self.datastore.transaction(commit=False) as c:
c.execute(self.search_pages_statement, [query])
for row in c.fetchall():
page_slugs.append(UUID(row["page_id"]))
return page_slugs
def select_page(self, page_id: UUID) -> Tuple[str, str, str]:
with self.datastore.transaction(commit=False) as c:
c.execute(self.select_page_statement, [str(page_id)])
for row in c.fetchall():
return row["page_slug"], row["page_title"], row["page_body"]
raise PageNotFound(f"Page ID {page_id} not found")
class SearchableContentApplicationRecorder(
SQLiteSearchableContentRecorder, SQLiteApplicationRecorder
):
pass
class SearchableContentInfrastructureFactory(Factory):
application_recorder_class = SearchableContentApplicationRecorder
del Factory
Test case¶
The test case SearchableContentApplicationTestCase
uses the SearchableContentApplication
to
create three pages, for ‘animals’, ‘plants’ and ‘minerals’. Content is added to the pages. The
content is searched with various queries and the search results are checked. The
test case is executed twice, once with the PostgreSQL persistence module, and once with the
SQLite persistence module.
from __future__ import annotations
import os
from typing import Dict
from unittest import TestCase
from uuid import uuid4
from eventsourcing.examples.contentmanagement.domainmodel import user_id_cvar
from eventsourcing.examples.searchablecontent.application import (
SearchableContentApplication,
)
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.tests.postgres_utils import drop_postgres_table
class SearchableContentApplicationTestCase(TestCase):
env: Dict[str, str] = {}
def test_app(self) -> None:
app = SearchableContentApplication(env=self.env)
# Set user_id context variable.
user_id = uuid4()
user_id_cvar.set(user_id)
# Create empty pages.
app.create_page(title="Animals", slug="animals")
app.create_page(title="Plants", slug="plants")
app.create_page(title="Minerals", slug="minerals")
# Search, expect no results.
self.assertEqual(0, len(app.search("dog")))
self.assertEqual(0, len(app.search("rose")))
self.assertEqual(0, len(app.search("zinc")))
# Update the pages.
app.update_body(slug="animals", body="cat dog zebra")
app.update_body(slug="plants", body="bluebell rose jasmine")
app.update_body(slug="minerals", body="iron zinc calcium")
# Search for single words, expect results.
pages = app.search("dog")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "animals")
self.assertEqual(pages[0]["body"], "cat dog zebra")
pages = app.search("rose")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "plants")
self.assertEqual(pages[0]["body"], "bluebell rose jasmine")
pages = app.search("zinc")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "minerals")
self.assertEqual(pages[0]["body"], "iron zinc calcium")
# Search for multiple words in same page.
pages = app.search("dog cat")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "animals")
self.assertEqual(pages[0]["body"], "cat dog zebra")
# Search for multiple words in same page, expect no results.
pages = app.search("rose zebra")
self.assertEqual(0, len(pages))
# Search for alternative words, expect two results.
pages = app.search("rose OR zebra")
self.assertEqual(2, len(pages))
self.assertEqual(["animals", "plants"], sorted(p["slug"] for p in pages))
class TestWithSQLite(SearchableContentApplicationTestCase):
env = {
"PERSISTENCE_MODULE": "eventsourcing.examples.searchablecontent.sqlite",
"SQLITE_DBNAME": ":memory:",
}
class TestWithPostgres(SearchableContentApplicationTestCase):
env = {"PERSISTENCE_MODULE": "eventsourcing.examples.searchablecontent.postgres"}
def setUp(self) -> None:
super().setUp()
os.environ["POSTGRES_DBNAME"] = "eventsourcing"
os.environ["POSTGRES_HOST"] = "127.0.0.1"
os.environ["POSTGRES_PORT"] = "5432"
os.environ["POSTGRES_USER"] = "eventsourcing"
os.environ["POSTGRES_PASSWORD"] = "eventsourcing"
self.drop_tables()
def tearDown(self) -> None:
self.drop_tables()
super().tearDown()
def drop_tables(self) -> None:
db = PostgresDatastore(
os.environ["POSTGRES_DBNAME"],
os.environ["POSTGRES_HOST"],
os.environ["POSTGRES_PORT"],
os.environ["POSTGRES_USER"],
os.environ["POSTGRES_PASSWORD"],
)
drop_postgres_table(db, "public.searchablecontentapplication_events")
drop_postgres_table(db, "public.pages_projection_example")
db.close()
del SearchableContentApplicationTestCase