Application 5 - Searchable content

This example demonstrates how to extend the library’s application recorder classes to support full text search queries in an event-sourced application with both PostgreSQL and SQLite.

Application

The application class SearchableContentApplication extends the WikiApplication class presented in the content management example. It extends the save() method by using the variable keyword parameters (**kwargs) of the application save() method to pass down to the recorder extra information that will be used to update a searchable index of the event-sourced content. It also introduces a search() method that expects a query argument and returns a list of pages.

from typing import Any, Dict, List, Optional, Union, cast

from eventsourcing.domain import DomainEventProtocol, MutableOrImmutableAggregate
from eventsourcing.examples.contentmanagement.application import (
    ContentManagementApplication,
    PageDetailsType,
)
from eventsourcing.examples.contentmanagement.domainmodel import Page
from eventsourcing.examples.searchablecontent.persistence import (
    SearchableContentRecorder,
)
from eventsourcing.persistence import Recording


class SearchableContentApplication(ContentManagementApplication):
    def save(
        self,
        *objs: Optional[Union[MutableOrImmutableAggregate, DomainEventProtocol]],
        **kwargs: Any,
    ) -> List[Recording]:
        insert_page_bodies: Dict[str, str] = {}
        update_page_bodies: Dict[str, str] = {}
        for obj in objs:
            if isinstance(obj, Page):
                if obj.version == len(obj.pending_events):
                    insert_page_bodies[obj.slug] = obj.body
                else:
                    update_page_bodies[obj.slug] = obj.body
        kwargs["insert_page_bodies"] = insert_page_bodies
        kwargs["update_page_bodies"] = update_page_bodies
        return super().save(*objs, **kwargs)

    def search(self, query: str) -> List[PageDetailsType]:
        pages = []
        recorder = cast(SearchableContentRecorder, self.recorder)
        for slug in recorder.search_page_bodies(query):
            page = self.get_page_details(slug)
            pages.append(page)
        return pages

Persistence

The recorder classes SearchableContentApplicationRecorder extend the PostgreSQL and SQLite ApplicationRecorder classes by creating a table that contains the current page body text. They define SQL statements that insert, update, and search the rows of the table using search query syntax similar to the one used by web search engines. They define a search_page_bodies() method which returns the page slugs for page bodies that match the given search query.

from abc import abstractmethod
from typing import List

from eventsourcing.persistence import ApplicationRecorder


class SearchableContentRecorder(ApplicationRecorder):
    @abstractmethod
    def search_page_bodies(self, query: str) -> List[str]:
        """
        Returns page slugs for page bodies that match query.
        """

The application recorder classes extend the _insert_events() method by inserting and updating rows, according to the information passed down from the application through the save() method’s variable keyword parameters.

The infrastructure factory classes SearchableContentInfrastructureFactory extend the PostgreSQL and SQLite Factory class by overriding the application_recorder() method so that a SearchableContentApplicationRecorder is constructed as the application recorder.

PostgreSQL

The PostgreSQL recorder uses a GIN index and the websearch_to_tsquery() function.

from typing import Any, Dict, List, Optional, Sequence, cast

from eventsourcing.examples.searchablecontent.persistence import (
    SearchableContentRecorder,
)
from eventsourcing.persistence import ApplicationRecorder, StoredEvent
from eventsourcing.postgres import (
    Factory,
    PostgresApplicationRecorder,
    PostgresConnection,
    PostgresCursor,
    PostgresDatastore,
)


class SearchableContentApplicationRecorder(
    SearchableContentRecorder, PostgresApplicationRecorder
):
    def __init__(
        self,
        datastore: PostgresDatastore,
        events_table_name: str = "stored_events",
        page_bodies_table_name: str = "page_bodies",
    ):
        self.check_table_name_length(page_bodies_table_name, datastore.schema)
        self.page_bodies_table_name = page_bodies_table_name
        super().__init__(datastore, events_table_name)
        self.insert_page_body_statement = (
            f"INSERT INTO {self.page_bodies_table_name} VALUES ($1, $2)"
        )
        self.insert_page_body_statement_name = (
            f"insert_{page_bodies_table_name}".replace(".", "_")
        )
        self.update_page_body_statement = (
            f"UPDATE {self.page_bodies_table_name} "
            f"SET page_body = $1 WHERE page_slug = $2"
        )
        self.update_page_body_statement_name = (
            f"update_{page_bodies_table_name}".replace(".", "_")
        )
        self.search_page_body_statement = (
            f"SELECT page_slug FROM {self.page_bodies_table_name} WHERE "
            f"to_tsvector('english', page_body) @@ websearch_to_tsquery('english', $1)"
        )

        self.search_page_body_statement_name = (
            f"search_{page_bodies_table_name}".replace(".", "_")
        )

    def construct_create_table_statements(self) -> List[str]:
        statements = super().construct_create_table_statements()
        statements.append(
            "CREATE TABLE IF NOT EXISTS "
            f"{self.page_bodies_table_name} ("
            "page_slug text, "
            "page_body text, "
            "PRIMARY KEY "
            "(page_slug))"
        )
        statements.append(
            f"CREATE INDEX IF NOT EXISTS {self.page_bodies_table_name}_idx "
            f"ON {self.page_bodies_table_name} "
            f"USING GIN (to_tsvector('english', page_body))"
        )
        return statements

    def _prepare_insert_events(self, conn: PostgresConnection) -> None:
        super()._prepare_insert_events(conn)
        self._prepare(
            conn, self.insert_page_body_statement_name, self.insert_page_body_statement
        )
        self._prepare(
            conn, self.update_page_body_statement_name, self.update_page_body_statement
        )

    def _insert_events(
        self,
        c: PostgresCursor,
        stored_events: List[StoredEvent],
        **kwargs: Any,
    ) -> Optional[Sequence[int]]:
        notification_ids = super()._insert_events(c, stored_events, **kwargs)

        # Insert page bodies.
        insert_page_bodies = cast(Dict[str, str], kwargs.get("insert_page_bodies"))
        if insert_page_bodies:
            for page_slug, page_body in insert_page_bodies.items():
                statement_alias = self.statement_name_aliases[
                    self.insert_page_body_statement_name
                ]
                c.execute(
                    f"EXECUTE {statement_alias}(%s, %s)",
                    (
                        page_slug,
                        page_body,
                    ),
                )

        # Update page bodies.
        update_page_bodies = cast(Dict[str, str], kwargs.get("update_page_bodies"))
        if update_page_bodies:
            for page_slug, page_body in update_page_bodies.items():
                statement_alias = self.statement_name_aliases[
                    self.update_page_body_statement_name
                ]
                c.execute(
                    f"EXECUTE {statement_alias}(%s, %s)",
                    (
                        page_body,
                        page_slug,
                    ),
                )
        return notification_ids

    def search_page_bodies(self, query: str) -> List[str]:
        page_slugs = []

        with self.datastore.get_connection() as conn:
            self._prepare(
                conn,
                self.search_page_body_statement_name,
                self.search_page_body_statement,
            )
            with conn.transaction(commit=False) as curs:
                statement_alias = self.statement_name_aliases[
                    self.search_page_body_statement_name
                ]
                curs.execute(f"EXECUTE {statement_alias}(%s)", [query])
                for row in curs.fetchall():
                    page_slugs.append(row["page_slug"])

        return page_slugs


class SearchableContentInfrastructureFactory(Factory):
    def application_recorder(self) -> ApplicationRecorder:
        prefix = (self.datastore.schema + ".") if self.datastore.schema else ""
        prefix += self.env.name.lower() or "stored"
        events_table_name = prefix + "_events"
        page_bodies_table_name = prefix + "_page_bodies"
        recorder = SearchableContentApplicationRecorder(
            datastore=self.datastore,
            events_table_name=events_table_name,
            page_bodies_table_name=page_bodies_table_name,
        )
        recorder.create_table()
        return recorder


del Factory

SQLite

The SQLite recorder uses a virtual table and the MATCH operator.

from typing import Any, Dict, List, Optional, Sequence, cast

from eventsourcing.examples.searchablecontent.persistence import (
    SearchableContentRecorder,
)
from eventsourcing.persistence import ApplicationRecorder, StoredEvent
from eventsourcing.sqlite import (
    Factory,
    SQLiteApplicationRecorder,
    SQLiteCursor,
    SQLiteDatastore,
)


class SearchableContentApplicationRecorder(
    SearchableContentRecorder, SQLiteApplicationRecorder
):
    def __init__(
        self,
        datastore: SQLiteDatastore,
        events_table_name: str = "stored_events",
        page_bodies_table_name: str = "page_bodies",
    ):
        self.page_bodies_table_name = page_bodies_table_name
        self.page_bodies_virtual_table_name = page_bodies_table_name + "_fts"
        super().__init__(datastore, events_table_name)
        self.insert_page_body_statement = (
            f"INSERT INTO {self.page_bodies_table_name} VALUES (?, ?)"
        )
        self.update_page_body_statement = (
            f"UPDATE {self.page_bodies_table_name} "
            f"SET page_body = ? WHERE page_slug = ?"
        )
        self.search_page_body_statement = (
            f"SELECT page_slug FROM {self.page_bodies_virtual_table_name} WHERE "
            f"page_body MATCH $1"
        )

    def construct_create_table_statements(self) -> List[str]:
        statements = super().construct_create_table_statements()
        statements.append(
            "CREATE TABLE IF NOT EXISTS "
            f"{self.page_bodies_table_name} ("
            "page_slug text, "
            "page_body text, "
            "PRIMARY KEY "
            "(page_slug)) "
        )
        statements.append(
            f"CREATE VIRTUAL TABLE {self.page_bodies_virtual_table_name} USING fts5("
            f"page_slug, page_body, content='{self.page_bodies_table_name}')"
        )
        statements.append(
            f"CREATE TRIGGER page_bodies_ai AFTER INSERT ON "
            f"{self.page_bodies_table_name} BEGIN "
            f"INSERT INTO {self.page_bodies_virtual_table_name} "
            f"(rowid, page_slug, page_body) "
            f"VALUES (new.rowid, new.page_slug, new.page_body); "
            f"END"
        )
        statements.append(
            f"CREATE TRIGGER page_bodies_au AFTER UPDATE ON "
            f"{self.page_bodies_table_name} "
            f"BEGIN "
            f"INSERT INTO {self.page_bodies_virtual_table_name} "
            f"({self.page_bodies_virtual_table_name}, rowid, page_slug, page_body) "
            f"VALUES ('delete', old.rowid, old.page_slug, old.page_body);"
            f"INSERT INTO {self.page_bodies_virtual_table_name} "
            f"(rowid, page_slug, page_body) "
            f"VALUES (new.rowid, new.page_slug, new.page_body); "
            f"END"
        )
        return statements

    def _insert_events(
        self,
        c: SQLiteCursor,
        stored_events: List[StoredEvent],
        **kwargs: Any,
    ) -> Optional[Sequence[int]]:
        notification_ids = super()._insert_events(c, stored_events, **kwargs)

        # Insert page bodies.
        insert_page_bodies = cast(Dict[str, str], kwargs.get("insert_page_bodies"))
        if insert_page_bodies:
            for page_slug, page_body in insert_page_bodies.items():
                c.execute(self.insert_page_body_statement, (page_slug, page_body))

        # Update page bodies.
        update_page_bodies = cast(Dict[str, str], kwargs.get("update_page_bodies"))
        if update_page_bodies:
            for page_slug, page_body in update_page_bodies.items():
                c.execute(self.update_page_body_statement, (page_body, page_slug))
        return notification_ids

    def search_page_bodies(self, query: str) -> List[str]:
        page_slugs = []

        with self.datastore.transaction(commit=False) as c:
            c.execute(self.search_page_body_statement, [query])
            for row in c.fetchall():
                page_slugs.append(row["page_slug"])

        return page_slugs


class SearchableContentInfrastructureFactory(Factory):
    def application_recorder(self) -> ApplicationRecorder:
        recorder = SearchableContentApplicationRecorder(datastore=self.datastore)
        recorder.create_table()
        return recorder


del Factory

Test case

The test case SearchableContentTestCase uses the application to create three pages, for ‘animals’, ‘plants’ and ‘minerals’. Content is added to the pages. The content is searched with various queries and the search results are checked. The test is executed twice, with the application configured for both PostgreSQL and SQLite.

import os
from typing import Dict
from unittest import TestCase
from uuid import uuid4

from eventsourcing.examples.contentmanagement.domainmodel import user_id_cvar
from eventsourcing.examples.searchablecontent.application import (
    SearchableContentApplication,
)
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.tests.postgres_utils import drop_postgres_table


class SearchableContentTestCase(TestCase):
    env: Dict[str, str] = {}

    def test_app(self) -> None:
        app = SearchableContentApplication(env=self.env)

        # Set user_id context variable.
        user_id = uuid4()
        user_id_cvar.set(user_id)

        # Create empty pages.
        app.create_page(title="Animals", slug="animals")
        app.create_page(title="Plants", slug="plants")
        app.create_page(title="Minerals", slug="minerals")

        # Search, expect no results.
        self.assertEqual(0, len(app.search("dog")))
        self.assertEqual(0, len(app.search("rose")))
        self.assertEqual(0, len(app.search("zinc")))

        # Update the pages.
        app.update_body(slug="animals", body="cat dog zebra")
        app.update_body(slug="plants", body="bluebell rose jasmine")
        app.update_body(slug="minerals", body="iron zinc calcium")

        # Search for single words, expect results.
        pages = app.search("dog")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "animals")
        self.assertEqual(pages[0]["body"], "cat dog zebra")

        pages = app.search("rose")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "plants")
        self.assertEqual(pages[0]["body"], "bluebell rose jasmine")

        pages = app.search("zinc")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "minerals")
        self.assertEqual(pages[0]["body"], "iron zinc calcium")

        # Search for multiple words in same page.
        pages = app.search("dog cat")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "animals")
        self.assertEqual(pages[0]["body"], "cat dog zebra")

        # Search for multiple words in same page, expect no results.
        pages = app.search("rose zebra")
        self.assertEqual(0, len(pages))

        # Search for alternative words, expect two results.
        pages = app.search("rose OR zebra")
        self.assertEqual(2, len(pages))
        self.assertEqual(["animals", "plants"], sorted(p["slug"] for p in pages))


class TestWithSQLite(SearchableContentTestCase):
    env = {
        "PERSISTENCE_MODULE": "eventsourcing.examples.searchablecontent.sqlite",
        "SQLITE_DBNAME": ":memory:",
    }


class TestWithPostgres(SearchableContentTestCase):
    env = {"PERSISTENCE_MODULE": "eventsourcing.examples.searchablecontent.postgres"}

    def setUp(self) -> None:
        super().setUp()
        os.environ["POSTGRES_DBNAME"] = "eventsourcing"
        os.environ["POSTGRES_HOST"] = "127.0.0.1"
        os.environ["POSTGRES_PORT"] = "5432"
        os.environ["POSTGRES_USER"] = "eventsourcing"
        os.environ["POSTGRES_PASSWORD"] = "eventsourcing"
        self.drop_tables()

    def tearDown(self) -> None:
        self.drop_tables()
        super().tearDown()

    def drop_tables(self) -> None:
        db = PostgresDatastore(
            os.environ["POSTGRES_DBNAME"],
            os.environ["POSTGRES_HOST"],
            os.environ["POSTGRES_PORT"],
            os.environ["POSTGRES_USER"],
            os.environ["POSTGRES_PASSWORD"],
        )
        drop_postgres_table(db, "public.searchablecontentapplication_events")
        drop_postgres_table(db, "public.searchablecontentapplication_page_bodies")
        db.close()


del SearchableContentTestCase