Application 4 - Searchable content

This example demonstrates how to extend the library’s PostgreSQL application recorder to support full text search queries in an event-sourced application.

Application

The application class SearchableWikiApplication extends the WikiApplication class presented in the previous example. It overrides the application’s construct_factory() method by constructing an extended version of the library’s PostgreSQL infrastructure factory (see below). It extends the save() method by using the variable keyword parameters (**kwargs) of the application save() method to pass down to the recorder extra information that will be used to update a searchable index of the event-sourced content. And it introduces a search() method that expects a query argument and returns a list of pages.

from typing import Any, Dict, List, Optional, Union, cast

from eventsourcing.domain import DomainEventProtocol, MutableOrImmutableAggregate
from eventsourcing.examples.searchablecontent.persistence import (
    SearchableWikiApplicationRecorder,
    SearchableWikiInfrastructureFactory,
)
from eventsourcing.examples.wiki.application import PageDetailsType, WikiApplication
from eventsourcing.examples.wiki.domainmodel import Page
from eventsourcing.persistence import InfrastructureFactory, Recording
from eventsourcing.utils import Environment


class SearchableWikiApplication(WikiApplication):
    def construct_factory(self, env: Environment) -> InfrastructureFactory:
        return SearchableWikiInfrastructureFactory(env)

    def save(
        self,
        *objs: Optional[Union[MutableOrImmutableAggregate, DomainEventProtocol]],
        **kwargs: Any,
    ) -> List[Recording]:
        insert_page_bodies: Dict[str, str] = {}
        update_page_bodies: Dict[str, str] = {}
        for obj in objs:
            if isinstance(obj, Page):
                if obj.version == len(obj.pending_events):
                    insert_page_bodies[obj.slug] = obj.body
                else:
                    update_page_bodies[obj.slug] = obj.body
        kwargs["insert_page_bodies"] = insert_page_bodies
        kwargs["update_page_bodies"] = update_page_bodies
        return super().save(*objs, **kwargs)

    def search(self, query: str) -> List[PageDetailsType]:
        pages = []
        recorder = cast(SearchableWikiApplicationRecorder, self.recorder)
        for slug in recorder.search_page_bodies(query):
            page = self.get_page_details(slug)
            pages.append(page)
        return pages

Persistence

The infrastructure class SearchableWikiInfrastructureFactory extends the PosgreSQL Factory class by overriding the application_recorder() so that the SearchableWikiApplicationRecorder is constructed for the application.

The recorder class SearchableWikiApplicationRecorder creates a table that contains the current page body text, and a GIN index that allows the text to be searched. It defines SQL statements that insert, update, and search the rows of the table using search query syntax similar to the one used by web search engines.

It extends the _insert_events() method by inserting and updating rows, according to the information passed down from the application through the save() method’s variable keyword parameters. It introduces a search_page_bodies() method which returns the page slugs for page bodies that match the given search query.

from typing import Any, Dict, List, Optional, Sequence, cast

from eventsourcing.persistence import ApplicationRecorder, StoredEvent
from eventsourcing.postgres import (
    Factory,
    PostgresApplicationRecorder,
    PostgresConnection,
    PostgresCursor,
    PostgresDatastore,
)


class SearchableWikiInfrastructureFactory(Factory):
    def application_recorder(self) -> ApplicationRecorder:
        prefix = (self.datastore.schema + ".") if self.datastore.schema else ""
        prefix += self.env.name.lower() or "stored"
        events_table_name = prefix + "_events"
        page_bodies_table_name = prefix + "_page_bodies"
        recorder = SearchableWikiApplicationRecorder(
            datastore=self.datastore,
            events_table_name=events_table_name,
            page_bodies_table_name=page_bodies_table_name,
        )
        recorder.create_table()
        return recorder


class SearchableWikiApplicationRecorder(PostgresApplicationRecorder):
    def __init__(
        self,
        datastore: PostgresDatastore,
        events_table_name: str = "stored_events",
        page_bodies_table_name: str = "page_bodies",
    ):
        self.check_table_name_length(page_bodies_table_name, datastore.schema)
        self.page_bodies_table_name = page_bodies_table_name
        super().__init__(datastore, events_table_name)
        self.insert_page_body_statement = (
            f"INSERT INTO {self.page_bodies_table_name} VALUES ($1, $2)"
        )
        self.insert_page_body_statement_name = (
            f"insert_{page_bodies_table_name}".replace(".", "_")
        )
        self.update_page_body_statement = (
            f"UPDATE {self.page_bodies_table_name} "
            f"SET page_body = $1 WHERE page_slug = $2"
        )
        self.update_page_body_statement_name = (
            f"update_{page_bodies_table_name}".replace(".", "_")
        )
        self.search_page_body_statement = (
            f"SELECT page_slug FROM {self.page_bodies_table_name} WHERE "
            f"to_tsvector('english', page_body) @@ websearch_to_tsquery('english', $1)"
        )

        self.search_page_body_statement_name = (
            f"search_{page_bodies_table_name}".replace(".", "_")
        )

    def construct_create_table_statements(self) -> List[str]:
        statements = super().construct_create_table_statements()
        statements.append(
            "CREATE TABLE IF NOT EXISTS "
            f"{self.page_bodies_table_name} ("
            "page_slug text, "
            "page_body text, "
            "PRIMARY KEY "
            "(page_slug))"
        )
        statements.append(
            f"CREATE INDEX IF NOT EXISTS {self.page_bodies_table_name}_idx "
            f"ON {self.page_bodies_table_name} "
            f"USING GIN (to_tsvector('english', page_body))"
        )
        return statements

    def _prepare_insert_events(self, conn: PostgresConnection) -> None:
        super()._prepare_insert_events(conn)
        self._prepare(
            conn, self.insert_page_body_statement_name, self.insert_page_body_statement
        )
        self._prepare(
            conn, self.update_page_body_statement_name, self.update_page_body_statement
        )
        self._prepare(
            conn, self.search_page_body_statement_name, self.search_page_body_statement
        )

    def _insert_events(
        self,
        c: PostgresCursor,
        stored_events: List[StoredEvent],
        **kwargs: Any,
    ) -> Optional[Sequence[int]]:
        notification_ids = super()._insert_events(c, stored_events, **kwargs)

        # Insert page bodies.
        insert_page_bodies = cast(Dict[str, str], kwargs.get("insert_page_bodies"))
        if insert_page_bodies:
            for page_slug, page_body in insert_page_bodies.items():
                statement_alias = self.statement_name_aliases[
                    self.insert_page_body_statement_name
                ]
                c.execute(
                    f"EXECUTE {statement_alias}(%s, %s)",
                    (
                        page_slug,
                        page_body,
                    ),
                )

        # Update page bodies.
        update_page_bodies = cast(Dict[str, str], kwargs.get("update_page_bodies"))
        if update_page_bodies:
            for page_slug, page_body in update_page_bodies.items():
                statement_alias = self.statement_name_aliases[
                    self.update_page_body_statement_name
                ]
                c.execute(
                    f"EXECUTE {statement_alias}(%s, %s)",
                    (
                        page_body,
                        page_slug,
                    ),
                )
        return notification_ids

    def search_page_bodies(self, query: str) -> List[str]:
        page_slugs = []

        with self.datastore.get_connection() as conn:
            with conn.transaction(commit=False) as curs:
                statement_alias = self.statement_name_aliases[
                    self.search_page_body_statement_name
                ]
                curs.execute(f"EXECUTE {statement_alias}(%s)", [query])
                for row in curs.fetchall():
                    page_slugs.append(row["page_slug"])

        return page_slugs

Test case

The test case below creates three pages for animals, plants, and minerals. Content is added to the pages. The pages are searched with various queries and the search results are checked.

import os
from unittest import TestCase
from uuid import uuid4

from eventsourcing.examples.searchablecontent.application import (
    SearchableWikiApplication,
)
from eventsourcing.examples.wiki.domainmodel import user_id_cvar
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.tests.postgres_utils import drop_postgres_table


class TestSearchableWiki(TestCase):
    def test(self) -> None:

        # Set user_id context variable.
        user_id = uuid4()
        user_id_cvar.set(user_id)

        # Construct application.
        app = SearchableWikiApplication()

        # Create empty pages.
        app.create_page(title="Animals", slug="animals")
        app.create_page(title="Plants", slug="plants")
        app.create_page(title="Minerals", slug="minerals")

        # Search, expect no results.
        self.assertEqual(0, len(app.search("dog")))
        self.assertEqual(0, len(app.search("rose")))
        self.assertEqual(0, len(app.search("zinc")))

        # Update the pages.
        app.update_body(slug="animals", body="cat dog zebra")
        app.update_body(slug="plants", body="bluebell rose jasmine")
        app.update_body(slug="minerals", body="iron zinc calcium")

        # Search for single words, expect results.
        pages = app.search("dog")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "animals")
        self.assertEqual(pages[0]["body"], "cat dog zebra")

        pages = app.search("rose")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "plants")
        self.assertEqual(pages[0]["body"], "bluebell rose jasmine")

        pages = app.search("zinc")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "minerals")
        self.assertEqual(pages[0]["body"], "iron zinc calcium")

        # Search for multiple words in same page.
        pages = app.search("dog cat")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "animals")
        self.assertEqual(pages[0]["body"], "cat dog zebra")

        # Search for multiple words in same page, expect no results.
        pages = app.search("rose zebra")
        self.assertEqual(0, len(pages))

        # Search for alternative words, expect two results.
        pages = app.search("rose OR zebra")
        self.assertEqual(2, len(pages))
        self.assertEqual(["animals", "plants"], sorted(p["slug"] for p in pages))

    def setUp(self) -> None:
        super().setUp()
        os.environ["POSTGRES_DBNAME"] = "eventsourcing"
        os.environ["POSTGRES_HOST"] = "127.0.0.1"
        os.environ["POSTGRES_PORT"] = "5432"
        os.environ["POSTGRES_USER"] = "eventsourcing"
        os.environ["POSTGRES_PASSWORD"] = "eventsourcing"
        self.drop_tables()

    def tearDown(self) -> None:
        self.drop_tables()
        super().tearDown()

    def drop_tables(self) -> None:
        db = PostgresDatastore(
            os.environ["POSTGRES_DBNAME"],
            os.environ["POSTGRES_HOST"],
            os.environ["POSTGRES_PORT"],
            os.environ["POSTGRES_USER"],
            os.environ["POSTGRES_PASSWORD"],
        )
        drop_postgres_table(db, "public.searchablewikiapplication_events")
        drop_postgres_table(db, "public.searchablewikiapplication_page_bodies")
        db.close()