Application 5 - Searchable content

This example demonstrates how to extend the library’s application recorder classes to support full text search queries in an event-sourced application with both PostgreSQL and SQLite.

Application

The application class SearchableContentApplication extends the ContentManagementApplication class presented in Application 3 - Content management. Its save() method sets the variable keyword parameters insert_pages and update_pages. It also introduces a search() method that expects a query argument and returns a list of pages. The application’s recorders are expected to be receptive to these variable keyword parameters and to support the search_pages() function.

from __future__ import annotations

from typing import Any, List, Optional, Tuple, Union, cast
from uuid import UUID

from eventsourcing.domain import DomainEventProtocol, MutableOrImmutableAggregate
from eventsourcing.examples.contentmanagement.application import (
    ContentManagementApplication,
    PageDetailsType,
)
from eventsourcing.examples.contentmanagement.domainmodel import Page
from eventsourcing.examples.searchablecontent.persistence import (
    SearchableContentRecorder,
)
from eventsourcing.persistence import Recording


class SearchableContentApplication(ContentManagementApplication):
    def save(
        self,
        *objs: Optional[Union[MutableOrImmutableAggregate, DomainEventProtocol]],
        **kwargs: Any,
    ) -> List[Recording]:
        insert_pages: List[Tuple[UUID, str, str, str]] = []
        update_pages: List[Tuple[UUID, str, str, str]] = []
        for obj in objs:
            if isinstance(obj, Page):
                if obj.version == len(obj.pending_events):
                    insert_pages.append((obj.id, obj.slug, obj.title, obj.body))
                else:
                    update_pages.append((obj.id, obj.slug, obj.title, obj.body))
        kwargs["insert_pages"] = insert_pages
        kwargs["update_pages"] = update_pages
        return super().save(*objs, **kwargs)

    def search(self, query: str) -> List[PageDetailsType]:
        pages = []
        recorder = cast(SearchableContentRecorder, self.recorder)
        for page_id in recorder.search_pages(query):
            page = self.get_page_by_id(page_id)
            pages.append(page)
        return pages

Persistence

The recorder class SearchableContentRecorder extends the AggregateRecorder by defining abstract methods to search and select pages. These methods will be implemented for both PostgreSQL and SQLite, which will also create custom tables for page content with a full text search indexes.

from __future__ import annotations

from abc import abstractmethod
from typing import List, Tuple
from uuid import UUID

from eventsourcing.persistence import AggregateRecorder


class SearchableContentRecorder(AggregateRecorder):
    @abstractmethod
    def search_pages(self, query: str) -> List[UUID]:
        """
        Returns IDs for pages that match query.
        """

    @abstractmethod
    def select_page(self, page_id: UUID) -> Tuple[str, str, str]:
        """
        Returns slug, title and body for given ID.
        """

The _insert_events() methods of the PostgreSQL and SQLite recorders are extended, so that rows are inserted and updated, according to the information passed down from the application in the variable keyword arguments insert_pages and update_pages.

PostgreSQL

The PostgreSQL recorder uses a GIN index and the websearch_to_tsquery() function. The PostgreSQL Factory class is extended to involve this custom recorder in a custom PostgreSQL persistence module so that it can be used by the ContentManagementApplication.

from __future__ import annotations

from typing import Any, List, Optional, Sequence, Tuple
from uuid import UUID

from eventsourcing.examples.contentmanagement.application import PageNotFound
from eventsourcing.examples.searchablecontent.persistence import (
    SearchableContentRecorder,
)
from eventsourcing.persistence import StoredEvent
from eventsourcing.postgres import (
    Factory,
    PostgresAggregateRecorder,
    PostgresApplicationRecorder,
    PostgresConnection,
    PostgresCursor,
)


class PostgresSearchableContentRecorder(
    SearchableContentRecorder,
    PostgresAggregateRecorder,
):
    pages_table_name = "pages_projection_example"
    select_page_statement = (
        f"SELECT page_slug, page_title, page_body FROM {pages_table_name}"
        f" WHERE page_id = $1"
    )

    select_page_statement_name = f"select_{pages_table_name}".replace(".", "_")

    insert_page_statement = f"INSERT INTO {pages_table_name} VALUES ($1, $2, $3, $4)"
    insert_page_statement_name = f"insert_{pages_table_name}".replace(".", "_")

    update_page_statement = (
        f"UPDATE {pages_table_name} "
        f"SET page_slug = $1, page_title = $2, page_body = $3 WHERE page_id = $4"
    )
    update_page_statement_name = f"update_{pages_table_name}".replace(".", "_")

    search_pages_statement = (
        f"SELECT page_id FROM {pages_table_name} WHERE "
        f"to_tsvector('english', page_body) @@ websearch_to_tsquery('english', $1)"
    )
    search_pages_statement_name = f"search_{pages_table_name}".replace(".", "_")

    def construct_create_table_statements(self) -> List[str]:
        statements = super().construct_create_table_statements()
        statements.append(
            "CREATE TABLE IF NOT EXISTS "
            f"{self.pages_table_name} ("
            "page_id uuid, "
            "page_slug text, "
            "page_title text, "
            "page_body text, "
            "PRIMARY KEY "
            "(page_id))"
        )
        statements.append(
            f"CREATE INDEX IF NOT EXISTS {self.pages_table_name}_idx "
            f"ON {self.pages_table_name} "
            f"USING GIN (to_tsvector('english', page_body))"
        )
        return statements

    def _prepare_insert_events(self, conn: PostgresConnection) -> None:
        super()._prepare_insert_events(conn)
        self._prepare(conn, self.insert_page_statement_name, self.insert_page_statement)
        self._prepare(conn, self.update_page_statement_name, self.update_page_statement)

    def _insert_events(
        self,
        c: PostgresCursor,
        stored_events: List[StoredEvent],
        **kwargs: Any,
    ) -> Optional[Sequence[int]]:
        notification_ids = super()._insert_events(c, stored_events, **kwargs)
        self._insert_pages(c, **kwargs)
        self._update_pages(c, **kwargs)
        return notification_ids

    def _insert_pages(
        self,
        c: PostgresCursor,
        insert_pages: Sequence[Tuple[UUID, str, str, str]] = (),
        **_: Any,
    ) -> None:
        for page_id, page_slug, page_title, page_body in insert_pages:
            statement_alias = self.statement_name_aliases[
                self.insert_page_statement_name
            ]
            c.execute(
                f"EXECUTE {statement_alias}(%s, %s, %s, %s)",
                (
                    page_id,
                    page_slug,
                    page_title,
                    page_body,
                ),
            )

    def _update_pages(
        self,
        c: PostgresCursor,
        update_pages: Sequence[Tuple[UUID, str, str, str]] = (),
        **_: Any,
    ) -> None:
        for page_id, page_slug, page_title, page_body in update_pages:
            statement_alias = self.statement_name_aliases[
                self.update_page_statement_name
            ]
            c.execute(
                f"EXECUTE {statement_alias}(%s, %s, %s, %s)",
                (
                    page_slug,
                    page_title,
                    page_body,
                    page_id,
                ),
            )

    def search_pages(self, query: str) -> List[UUID]:
        page_ids = []

        with self.datastore.get_connection() as conn:
            self._prepare(
                conn,
                self.search_pages_statement_name,
                self.search_pages_statement,
            )
            with conn.transaction(commit=False) as curs:
                statement_alias = self.statement_name_aliases[
                    self.search_pages_statement_name
                ]
                curs.execute(f"EXECUTE {statement_alias}(%s)", [query])
                for row in curs.fetchall():
                    page_ids.append(row["page_id"])

        return page_ids

    def select_page(self, page_id: UUID) -> Tuple[str, str, str]:
        with self.datastore.get_connection() as conn:
            self._prepare(
                conn,
                self.select_page_statement_name,
                self.select_page_statement,
            )
            with conn.transaction(commit=False) as curs:
                statement_alias = self.statement_name_aliases[
                    self.select_page_statement_name
                ]
                curs.execute(f"EXECUTE {statement_alias}(%s)", [str(page_id)])
                for row in curs.fetchall():
                    return row["page_slug"], row["page_title"], row["page_body"]
        raise PageNotFound(f"Page ID {page_id} not found")


class SearchableContentApplicationRecorder(
    PostgresSearchableContentRecorder, PostgresApplicationRecorder
):
    pass


class SearchableContentInfrastructureFactory(Factory):
    application_recorder_class = SearchableContentApplicationRecorder


del Factory

SQLite

The SQLite recorder uses a virtual table and the MATCH operator. The SQLite Factory class is extended to involve this custom recorder in a custom SQLite persistence module so that it can be used by the ContentManagementApplication.

from __future__ import annotations

from typing import Any, List, Optional, Sequence, Tuple
from uuid import UUID

from eventsourcing.examples.contentmanagement.application import PageNotFound
from eventsourcing.examples.searchablecontent.persistence import (
    SearchableContentRecorder,
)
from eventsourcing.persistence import StoredEvent
from eventsourcing.sqlite import (
    Factory,
    SQLiteAggregateRecorder,
    SQLiteApplicationRecorder,
    SQLiteCursor,
)


class SQLiteSearchableContentRecorder(
    SearchableContentRecorder, SQLiteAggregateRecorder
):
    pages_table_name = "pages_projection_example"
    pages_virtual_table_name = pages_table_name + "_fts"
    select_page_statement = (
        f"SELECT page_slug, page_title, page_body FROM "
        f"{pages_table_name} WHERE page_id = ?"
    )
    insert_page_statement = f"INSERT INTO {pages_table_name} VALUES (?, ?, ?, ?)"
    update_page_statement = (
        f"UPDATE {pages_table_name} "
        f"SET page_slug = ?, page_title = ?, page_body = ? WHERE page_id = ?"
    )
    search_pages_statement = (
        f"SELECT page_id FROM {pages_virtual_table_name} WHERE " f"page_body MATCH ?"
    )

    def construct_create_table_statements(self) -> List[str]:
        statements = super().construct_create_table_statements()
        statements.append(
            "CREATE TABLE IF NOT EXISTS "
            f"{self.pages_table_name} ("
            "page_id TEXT, "
            "page_slug TEXT, "
            "page_title TEXT, "
            "page_body TEXT, "
            "PRIMARY KEY "
            "(page_id)) "
        )
        statements.append(
            f"CREATE VIRTUAL TABLE {self.pages_virtual_table_name} USING fts5("
            f"page_id, page_body, content='{self.pages_table_name}')"
        )
        statements.append(
            f"CREATE TRIGGER projection_ai AFTER INSERT ON "
            f"{self.pages_table_name} BEGIN "
            f"INSERT INTO {self.pages_virtual_table_name} "
            f"(rowid, page_id, page_body) "
            f"VALUES (new.rowid, new.page_id, new.page_body); "
            f"END"
        )
        statements.append(
            f"CREATE TRIGGER projection_au AFTER UPDATE ON "
            f"{self.pages_table_name} "
            f"BEGIN "
            f"INSERT INTO {self.pages_virtual_table_name} "
            f"({self.pages_virtual_table_name}, rowid, page_id, page_body) "
            f"VALUES ('delete', old.rowid, old.page_id, old.page_body);"
            f"INSERT INTO {self.pages_virtual_table_name} "
            f"(rowid, page_id, page_body) "
            f"VALUES (new.rowid, new.page_id, new.page_body); "
            f"END"
        )
        return statements

    def _insert_events(
        self,
        c: SQLiteCursor,
        stored_events: List[StoredEvent],
        **kwargs: Any,
    ) -> Optional[Sequence[int]]:
        notification_ids = super()._insert_events(c, stored_events, **kwargs)
        self._insert_pages(c, **kwargs)
        self._update_pages(c, **kwargs)
        return notification_ids

    def _insert_pages(
        self,
        c: SQLiteCursor,
        insert_pages: Sequence[Tuple[UUID, str, str, str]] = (),
        **_: Any,
    ) -> None:
        for page_id, page_slug, page_title, page_body in insert_pages:
            c.execute(
                self.insert_page_statement,
                (str(page_id), page_slug, page_title, page_body),
            )

    def _update_pages(
        self,
        c: SQLiteCursor,
        update_pages: Sequence[Tuple[UUID, str, str, str]] = (),
        **_: Any,
    ) -> None:
        for page_id, page_slug, page_title, page_body in update_pages:
            c.execute(
                self.update_page_statement,
                (page_slug, page_title, page_body, str(page_id)),
            )

    def search_pages(self, query: str) -> List[UUID]:
        page_slugs = []

        with self.datastore.transaction(commit=False) as c:
            c.execute(self.search_pages_statement, [query])
            for row in c.fetchall():
                page_slugs.append(UUID(row["page_id"]))

        return page_slugs

    def select_page(self, page_id: UUID) -> Tuple[str, str, str]:
        with self.datastore.transaction(commit=False) as c:
            c.execute(self.select_page_statement, [str(page_id)])
            for row in c.fetchall():
                return row["page_slug"], row["page_title"], row["page_body"]
        raise PageNotFound(f"Page ID {page_id} not found")


class SearchableContentApplicationRecorder(
    SQLiteSearchableContentRecorder, SQLiteApplicationRecorder
):
    pass


class SearchableContentInfrastructureFactory(Factory):
    application_recorder_class = SearchableContentApplicationRecorder


del Factory

Test case

The test case SearchableContentApplicationTestCase uses the SearchableContentApplication to create three pages, for ‘animals’, ‘plants’ and ‘minerals’. Content is added to the pages. The content is searched with various queries and the search results are checked. The test case is executed twice, once with the PostgreSQL persistence module, and once with the SQLite persistence module.

from __future__ import annotations

import os
from typing import Dict
from unittest import TestCase
from uuid import uuid4

from eventsourcing.examples.contentmanagement.domainmodel import user_id_cvar
from eventsourcing.examples.searchablecontent.application import (
    SearchableContentApplication,
)
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.tests.postgres_utils import drop_postgres_table


class SearchableContentApplicationTestCase(TestCase):
    env: Dict[str, str] = {}

    def test_app(self) -> None:
        app = SearchableContentApplication(env=self.env)

        # Set user_id context variable.
        user_id = uuid4()
        user_id_cvar.set(user_id)

        # Create empty pages.
        app.create_page(title="Animals", slug="animals")
        app.create_page(title="Plants", slug="plants")
        app.create_page(title="Minerals", slug="minerals")

        # Search, expect no results.
        self.assertEqual(0, len(app.search("dog")))
        self.assertEqual(0, len(app.search("rose")))
        self.assertEqual(0, len(app.search("zinc")))

        # Update the pages.
        app.update_body(slug="animals", body="cat dog zebra")
        app.update_body(slug="plants", body="bluebell rose jasmine")
        app.update_body(slug="minerals", body="iron zinc calcium")

        # Search for single words, expect results.
        pages = app.search("dog")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "animals")
        self.assertEqual(pages[0]["body"], "cat dog zebra")

        pages = app.search("rose")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "plants")
        self.assertEqual(pages[0]["body"], "bluebell rose jasmine")

        pages = app.search("zinc")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "minerals")
        self.assertEqual(pages[0]["body"], "iron zinc calcium")

        # Search for multiple words in same page.
        pages = app.search("dog cat")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "animals")
        self.assertEqual(pages[0]["body"], "cat dog zebra")

        # Search for multiple words in same page, expect no results.
        pages = app.search("rose zebra")
        self.assertEqual(0, len(pages))

        # Search for alternative words, expect two results.
        pages = app.search("rose OR zebra")
        self.assertEqual(2, len(pages))
        self.assertEqual(["animals", "plants"], sorted(p["slug"] for p in pages))


class TestWithSQLite(SearchableContentApplicationTestCase):
    env = {
        "PERSISTENCE_MODULE": "eventsourcing.examples.searchablecontent.sqlite",
        "SQLITE_DBNAME": ":memory:",
    }


class TestWithPostgres(SearchableContentApplicationTestCase):
    env = {"PERSISTENCE_MODULE": "eventsourcing.examples.searchablecontent.postgres"}

    def setUp(self) -> None:
        super().setUp()
        os.environ["POSTGRES_DBNAME"] = "eventsourcing"
        os.environ["POSTGRES_HOST"] = "127.0.0.1"
        os.environ["POSTGRES_PORT"] = "5432"
        os.environ["POSTGRES_USER"] = "eventsourcing"
        os.environ["POSTGRES_PASSWORD"] = "eventsourcing"
        self.drop_tables()

    def tearDown(self) -> None:
        self.drop_tables()
        super().tearDown()

    def drop_tables(self) -> None:
        db = PostgresDatastore(
            os.environ["POSTGRES_DBNAME"],
            os.environ["POSTGRES_HOST"],
            os.environ["POSTGRES_PORT"],
            os.environ["POSTGRES_USER"],
            os.environ["POSTGRES_PASSWORD"],
        )
        drop_postgres_table(db, "public.searchablecontentapplication_events")
        drop_postgres_table(db, "public.pages_projection_example")
        db.close()


del SearchableContentApplicationTestCase