Application 5 - Searchable content¶
This example demonstrates how to extend the library’s application recorder classes to support full text search queries in an event-sourced application with both PostgreSQL and SQLite.
Application¶
The application class SearchableContentApplication
extends the WikiApplication
class presented in the content management example.
It extends the save()
method by using the variable keyword parameters (**kwargs
)
of the application save()
method to pass down to the recorder extra
information that will be used to update a searchable index of the event-sourced
content. It also introduces a search()
method that expects a query
argument and returns a list of pages.
from typing import Any, Dict, List, Optional, Union, cast
from eventsourcing.domain import DomainEventProtocol, MutableOrImmutableAggregate
from eventsourcing.examples.contentmanagement.application import (
ContentManagementApplication,
PageDetailsType,
)
from eventsourcing.examples.contentmanagement.domainmodel import Page
from eventsourcing.examples.searchablecontent.persistence import (
SearchableContentRecorder,
)
from eventsourcing.persistence import Recording
class SearchableContentApplication(ContentManagementApplication):
def save(
self,
*objs: Optional[Union[MutableOrImmutableAggregate, DomainEventProtocol]],
**kwargs: Any,
) -> List[Recording]:
insert_page_bodies: Dict[str, str] = {}
update_page_bodies: Dict[str, str] = {}
for obj in objs:
if isinstance(obj, Page):
if obj.version == len(obj.pending_events):
insert_page_bodies[obj.slug] = obj.body
else:
update_page_bodies[obj.slug] = obj.body
kwargs["insert_page_bodies"] = insert_page_bodies
kwargs["update_page_bodies"] = update_page_bodies
return super().save(*objs, **kwargs)
def search(self, query: str) -> List[PageDetailsType]:
pages = []
recorder = cast(SearchableContentRecorder, self.recorder)
for slug in recorder.search_page_bodies(query):
page = self.get_page_details(slug)
pages.append(page)
return pages
Persistence¶
The recorder classes SearchableContentApplicationRecorder
extend the PostgreSQL
and SQLite ApplicationRecorder
classes by creating a table that contains the current
page body text. They define SQL statements that insert, update, and search the rows
of the table using search query syntax similar to the one used by web search engines.
They define a search_page_bodies()
method which returns the page slugs for page
bodies that match the given search query.
from abc import abstractmethod
from typing import List
from eventsourcing.persistence import ApplicationRecorder
class SearchableContentRecorder(ApplicationRecorder):
@abstractmethod
def search_page_bodies(self, query: str) -> List[str]:
"""
Returns page slugs for page bodies that match query.
"""
The application recorder classes extend the _insert_events()
method by inserting
and updating rows, according to the information passed down from the application
through the save()
method’s variable keyword parameters.
The infrastructure factory classes SearchableContentInfrastructureFactory
extend the
PostgreSQL and SQLite Factory
class by overriding the application_recorder()
method
so that a SearchableContentApplicationRecorder
is constructed as the application recorder.
PostgreSQL¶
The PostgreSQL recorder uses a GIN index and the websearch_to_tsquery()
function.
from typing import Any, Dict, List, Optional, Sequence, cast
from eventsourcing.examples.searchablecontent.persistence import (
SearchableContentRecorder,
)
from eventsourcing.persistence import ApplicationRecorder, StoredEvent
from eventsourcing.postgres import (
Factory,
PostgresApplicationRecorder,
PostgresConnection,
PostgresCursor,
PostgresDatastore,
)
class SearchableContentApplicationRecorder(
SearchableContentRecorder, PostgresApplicationRecorder
):
def __init__(
self,
datastore: PostgresDatastore,
events_table_name: str = "stored_events",
page_bodies_table_name: str = "page_bodies",
):
self.check_table_name_length(page_bodies_table_name, datastore.schema)
self.page_bodies_table_name = page_bodies_table_name
super().__init__(datastore, events_table_name)
self.insert_page_body_statement = (
f"INSERT INTO {self.page_bodies_table_name} VALUES ($1, $2)"
)
self.insert_page_body_statement_name = (
f"insert_{page_bodies_table_name}".replace(".", "_")
)
self.update_page_body_statement = (
f"UPDATE {self.page_bodies_table_name} "
f"SET page_body = $1 WHERE page_slug = $2"
)
self.update_page_body_statement_name = (
f"update_{page_bodies_table_name}".replace(".", "_")
)
self.search_page_body_statement = (
f"SELECT page_slug FROM {self.page_bodies_table_name} WHERE "
f"to_tsvector('english', page_body) @@ websearch_to_tsquery('english', $1)"
)
self.search_page_body_statement_name = (
f"search_{page_bodies_table_name}".replace(".", "_")
)
def construct_create_table_statements(self) -> List[str]:
statements = super().construct_create_table_statements()
statements.append(
"CREATE TABLE IF NOT EXISTS "
f"{self.page_bodies_table_name} ("
"page_slug text, "
"page_body text, "
"PRIMARY KEY "
"(page_slug))"
)
statements.append(
f"CREATE INDEX IF NOT EXISTS {self.page_bodies_table_name}_idx "
f"ON {self.page_bodies_table_name} "
f"USING GIN (to_tsvector('english', page_body))"
)
return statements
def _prepare_insert_events(self, conn: PostgresConnection) -> None:
super()._prepare_insert_events(conn)
self._prepare(
conn, self.insert_page_body_statement_name, self.insert_page_body_statement
)
self._prepare(
conn, self.update_page_body_statement_name, self.update_page_body_statement
)
def _insert_events(
self,
c: PostgresCursor,
stored_events: List[StoredEvent],
**kwargs: Any,
) -> Optional[Sequence[int]]:
notification_ids = super()._insert_events(c, stored_events, **kwargs)
# Insert page bodies.
insert_page_bodies = cast(Dict[str, str], kwargs.get("insert_page_bodies"))
if insert_page_bodies:
for page_slug, page_body in insert_page_bodies.items():
statement_alias = self.statement_name_aliases[
self.insert_page_body_statement_name
]
c.execute(
f"EXECUTE {statement_alias}(%s, %s)",
(
page_slug,
page_body,
),
)
# Update page bodies.
update_page_bodies = cast(Dict[str, str], kwargs.get("update_page_bodies"))
if update_page_bodies:
for page_slug, page_body in update_page_bodies.items():
statement_alias = self.statement_name_aliases[
self.update_page_body_statement_name
]
c.execute(
f"EXECUTE {statement_alias}(%s, %s)",
(
page_body,
page_slug,
),
)
return notification_ids
def search_page_bodies(self, query: str) -> List[str]:
page_slugs = []
with self.datastore.get_connection() as conn:
self._prepare(
conn,
self.search_page_body_statement_name,
self.search_page_body_statement,
)
with conn.transaction(commit=False) as curs:
statement_alias = self.statement_name_aliases[
self.search_page_body_statement_name
]
curs.execute(f"EXECUTE {statement_alias}(%s)", [query])
for row in curs.fetchall():
page_slugs.append(row["page_slug"])
return page_slugs
class SearchableContentInfrastructureFactory(Factory):
def application_recorder(self) -> ApplicationRecorder:
prefix = (self.datastore.schema + ".") if self.datastore.schema else ""
prefix += self.env.name.lower() or "stored"
events_table_name = prefix + "_events"
page_bodies_table_name = prefix + "_page_bodies"
recorder = SearchableContentApplicationRecorder(
datastore=self.datastore,
events_table_name=events_table_name,
page_bodies_table_name=page_bodies_table_name,
)
recorder.create_table()
return recorder
del Factory
SQLite¶
The SQLite recorder uses a virtual table and the MATCH
operator.
from typing import Any, Dict, List, Optional, Sequence, cast
from eventsourcing.examples.searchablecontent.persistence import (
SearchableContentRecorder,
)
from eventsourcing.persistence import ApplicationRecorder, StoredEvent
from eventsourcing.sqlite import (
Factory,
SQLiteApplicationRecorder,
SQLiteCursor,
SQLiteDatastore,
)
class SearchableContentApplicationRecorder(
SearchableContentRecorder, SQLiteApplicationRecorder
):
def __init__(
self,
datastore: SQLiteDatastore,
events_table_name: str = "stored_events",
page_bodies_table_name: str = "page_bodies",
):
self.page_bodies_table_name = page_bodies_table_name
self.page_bodies_virtual_table_name = page_bodies_table_name + "_fts"
super().__init__(datastore, events_table_name)
self.insert_page_body_statement = (
f"INSERT INTO {self.page_bodies_table_name} VALUES (?, ?)"
)
self.update_page_body_statement = (
f"UPDATE {self.page_bodies_table_name} "
f"SET page_body = ? WHERE page_slug = ?"
)
self.search_page_body_statement = (
f"SELECT page_slug FROM {self.page_bodies_virtual_table_name} WHERE "
f"page_body MATCH $1"
)
def construct_create_table_statements(self) -> List[str]:
statements = super().construct_create_table_statements()
statements.append(
"CREATE TABLE IF NOT EXISTS "
f"{self.page_bodies_table_name} ("
"page_slug text, "
"page_body text, "
"PRIMARY KEY "
"(page_slug)) "
)
statements.append(
f"CREATE VIRTUAL TABLE {self.page_bodies_virtual_table_name} USING fts5("
f"page_slug, page_body, content='{self.page_bodies_table_name}')"
)
statements.append(
f"CREATE TRIGGER page_bodies_ai AFTER INSERT ON "
f"{self.page_bodies_table_name} BEGIN "
f"INSERT INTO {self.page_bodies_virtual_table_name} "
f"(rowid, page_slug, page_body) "
f"VALUES (new.rowid, new.page_slug, new.page_body); "
f"END"
)
statements.append(
f"CREATE TRIGGER page_bodies_au AFTER UPDATE ON "
f"{self.page_bodies_table_name} "
f"BEGIN "
f"INSERT INTO {self.page_bodies_virtual_table_name} "
f"({self.page_bodies_virtual_table_name}, rowid, page_slug, page_body) "
f"VALUES ('delete', old.rowid, old.page_slug, old.page_body);"
f"INSERT INTO {self.page_bodies_virtual_table_name} "
f"(rowid, page_slug, page_body) "
f"VALUES (new.rowid, new.page_slug, new.page_body); "
f"END"
)
return statements
def _insert_events(
self,
c: SQLiteCursor,
stored_events: List[StoredEvent],
**kwargs: Any,
) -> Optional[Sequence[int]]:
notification_ids = super()._insert_events(c, stored_events, **kwargs)
# Insert page bodies.
insert_page_bodies = cast(Dict[str, str], kwargs.get("insert_page_bodies"))
if insert_page_bodies:
for page_slug, page_body in insert_page_bodies.items():
c.execute(self.insert_page_body_statement, (page_slug, page_body))
# Update page bodies.
update_page_bodies = cast(Dict[str, str], kwargs.get("update_page_bodies"))
if update_page_bodies:
for page_slug, page_body in update_page_bodies.items():
c.execute(self.update_page_body_statement, (page_body, page_slug))
return notification_ids
def search_page_bodies(self, query: str) -> List[str]:
page_slugs = []
with self.datastore.transaction(commit=False) as c:
c.execute(self.search_page_body_statement, [query])
for row in c.fetchall():
page_slugs.append(row["page_slug"])
return page_slugs
class SearchableContentInfrastructureFactory(Factory):
def application_recorder(self) -> ApplicationRecorder:
recorder = SearchableContentApplicationRecorder(datastore=self.datastore)
recorder.create_table()
return recorder
del Factory
Test case¶
The test case SearchableContentTestCase
uses the application to create three
pages, for ‘animals’, ‘plants’ and ‘minerals’. Content is added to the pages. The
content is searched with various queries and the search results are checked. The
test is executed twice, with the application configured for both PostgreSQL and SQLite.
import os
from typing import Dict
from unittest import TestCase
from uuid import uuid4
from eventsourcing.examples.contentmanagement.domainmodel import user_id_cvar
from eventsourcing.examples.searchablecontent.application import (
SearchableContentApplication,
)
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.tests.postgres_utils import drop_postgres_table
class SearchableContentTestCase(TestCase):
env: Dict[str, str] = {}
def test_app(self) -> None:
app = SearchableContentApplication(env=self.env)
# Set user_id context variable.
user_id = uuid4()
user_id_cvar.set(user_id)
# Create empty pages.
app.create_page(title="Animals", slug="animals")
app.create_page(title="Plants", slug="plants")
app.create_page(title="Minerals", slug="minerals")
# Search, expect no results.
self.assertEqual(0, len(app.search("dog")))
self.assertEqual(0, len(app.search("rose")))
self.assertEqual(0, len(app.search("zinc")))
# Update the pages.
app.update_body(slug="animals", body="cat dog zebra")
app.update_body(slug="plants", body="bluebell rose jasmine")
app.update_body(slug="minerals", body="iron zinc calcium")
# Search for single words, expect results.
pages = app.search("dog")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "animals")
self.assertEqual(pages[0]["body"], "cat dog zebra")
pages = app.search("rose")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "plants")
self.assertEqual(pages[0]["body"], "bluebell rose jasmine")
pages = app.search("zinc")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "minerals")
self.assertEqual(pages[0]["body"], "iron zinc calcium")
# Search for multiple words in same page.
pages = app.search("dog cat")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0]["slug"], "animals")
self.assertEqual(pages[0]["body"], "cat dog zebra")
# Search for multiple words in same page, expect no results.
pages = app.search("rose zebra")
self.assertEqual(0, len(pages))
# Search for alternative words, expect two results.
pages = app.search("rose OR zebra")
self.assertEqual(2, len(pages))
self.assertEqual(["animals", "plants"], sorted(p["slug"] for p in pages))
class TestWithSQLite(SearchableContentTestCase):
env = {
"PERSISTENCE_MODULE": "eventsourcing.examples.searchablecontent.sqlite",
"SQLITE_DBNAME": ":memory:",
}
class TestWithPostgres(SearchableContentTestCase):
env = {"PERSISTENCE_MODULE": "eventsourcing.examples.searchablecontent.postgres"}
def setUp(self) -> None:
super().setUp()
os.environ["POSTGRES_DBNAME"] = "eventsourcing"
os.environ["POSTGRES_HOST"] = "127.0.0.1"
os.environ["POSTGRES_PORT"] = "5432"
os.environ["POSTGRES_USER"] = "eventsourcing"
os.environ["POSTGRES_PASSWORD"] = "eventsourcing"
self.drop_tables()
def tearDown(self) -> None:
self.drop_tables()
super().tearDown()
def drop_tables(self) -> None:
db = PostgresDatastore(
os.environ["POSTGRES_DBNAME"],
os.environ["POSTGRES_HOST"],
os.environ["POSTGRES_PORT"],
os.environ["POSTGRES_USER"],
os.environ["POSTGRES_PASSWORD"],
)
drop_postgres_table(db, "public.searchablecontentapplication_events")
drop_postgres_table(db, "public.searchablecontentapplication_page_bodies")
db.close()
del SearchableContentTestCase