Application 5 - Searchable content

This example demonstrates how to extend the library’s application recorder classes to support full text search queries in an event-sourced application with both PostgreSQL and SQLite.

Application

The application class SearchableContentApplication extends the ContentManagement class presented in Application 3 - Content management. Its save() method sets the variable keyword parameters insert_pages and update_pages. It also introduces a search() method that expects a query argument and returns a list of pages. The application’s recorders are expected to be receptive to these variable keyword parameters and to support the search_pages() function.

from __future__ import annotations

from typing import TYPE_CHECKING, Any, cast

from examples.contentmanagement.application import ContentManagement, PageDetailsType
from examples.contentmanagement.domainmodel import Page
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo

if TYPE_CHECKING:
    from uuid import UUID

    from eventsourcing.domain import DomainEventProtocol, MutableOrImmutableAggregate
    from eventsourcing.persistence import Recording


class FtsContentManagement(ContentManagement):
    def save(
        self,
        *objs: MutableOrImmutableAggregate[UUID] | DomainEventProtocol[UUID] | None,
        **kwargs: Any,
    ) -> list[Recording[UUID]]:
        insert_pages: list[PageInfo] = []
        update_pages: list[PageInfo] = []
        for obj in objs:
            if isinstance(obj, Page):
                if obj.version == len(obj.pending_events):
                    insert_pages.append(PageInfo(obj.id, obj.slug, obj.title, obj.body))
                else:
                    update_pages.append(PageInfo(obj.id, obj.slug, obj.title, obj.body))
        kwargs["insert_pages"] = insert_pages
        kwargs["update_pages"] = update_pages
        return super().save(*objs, **kwargs)

    def search(self, query: str) -> list[PageDetailsType]:
        pages = []
        recorder = cast("FtsRecorder", self.recorder)
        for page_id in recorder.search_pages(query):
            page = self.get_page_by_id(page_id)
            pages.append(page)
        return pages

Persistence

The recorder class SearchableContentRecorder extends the AggregateRecorder by defining abstract methods to search and select pages. These methods will be implemented for both PostgreSQL and SQLite, which will also create custom tables for page content with a full text search indexes.

from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING

from eventsourcing.persistence import Recorder

if TYPE_CHECKING:
    from collections.abc import Sequence
    from uuid import UUID


@dataclass(frozen=True)
class PageInfo:
    id: UUID
    slug: str
    title: str
    body: str


class FtsRecorder(Recorder, ABC):
    @abstractmethod
    def insert_pages(self, pages: Sequence[PageInfo]) -> None:
        """Insert a sequence of pages (id, slug, title, body)."""

    @abstractmethod
    def update_pages(self, pages: Sequence[PageInfo]) -> None:
        """Update a sequence of pages (id, slug, title, body)."""

    @abstractmethod
    def search_pages(self, query: str) -> list[UUID]:
        """Returns IDs for pages that match query."""

    @abstractmethod
    def select_page(self, page_id: UUID) -> PageInfo:
        """Returns slug, title and body for given ID."""

    def search(self, query: str) -> Sequence[PageInfo]:
        pages = []
        for page_id in self.search_pages(query):
            page = self.select_page(page_id)
            pages.append(page)
        return pages

The _insert_events() methods of the PostgreSQL and SQLite recorders are extended, so that rows are inserted and updated, according to the information passed down from the application in the variable keyword arguments insert_pages and update_pages.

PostgreSQL

The PostgreSQL recorder uses a GIN index and the websearch_to_tsquery() function. The PostgreSQL Factory class is extended to involve this custom recorder in a custom PostgreSQL persistence module so that it can be used by the ContentManagement application.

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from psycopg.sql import SQL, Identifier

from eventsourcing.postgres import (
    PostgresApplicationRecorder,
    PostgresDatastore,
    PostgresRecorder,
)
from examples.contentmanagement.application import PageNotFoundError
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo

if TYPE_CHECKING:
    from collections.abc import Sequence
    from uuid import UUID

    from psycopg import Cursor
    from psycopg.rows import DictRow

    from eventsourcing.persistence import StoredEvent


class PostgresFtsRecorder(
    PostgresRecorder,
    FtsRecorder,
):
    def __init__(
        self,
        datastore: PostgresDatastore,
        fts_table_name: str = "ftsprojection",
        **kwargs: Any,
    ):
        super().__init__(datastore, **kwargs)
        self.check_table_name_length(fts_table_name)
        self.fts_table_name = fts_table_name
        self.create_table_statements.append(
            SQL(
                "CREATE TABLE IF NOT EXISTS "
                "{0}.{1} ("
                "page_id uuid, "
                "page_slug text, "
                "page_title text, "
                "page_body text, "
                "PRIMARY KEY "
                "(page_id))"
            ).format(
                Identifier(self.datastore.schema),
                Identifier(self.fts_table_name),
            )
        )
        self.create_table_statements.append(
            SQL(
                "CREATE INDEX IF NOT EXISTS {0} "
                "ON {1}.{2} "
                "USING GIN (to_tsvector('english', page_body))"
            ).format(
                Identifier(self.fts_table_name + "_idx"),
                Identifier(self.datastore.schema),
                Identifier(self.fts_table_name),
            )
        )

        self.select_page_statement = SQL(
            "SELECT page_slug, page_title, page_body FROM {0}.{1} WHERE page_id = %s"
        ).format(
            Identifier(self.datastore.schema),
            Identifier(self.fts_table_name),
        )

        self.insert_page_statement = SQL(
            "INSERT INTO {0}.{1} VALUES (%s, %s, %s, %s)"
        ).format(
            Identifier(self.datastore.schema),
            Identifier(self.fts_table_name),
        )

        self.update_page_statement = SQL(
            "UPDATE {0}.{1} SET "
            "page_slug = %s, "
            "page_title = %s, "
            "page_body = %s "
            "WHERE page_id = %s"
        ).format(
            Identifier(self.datastore.schema),
            Identifier(self.fts_table_name),
        )

        self.search_pages_statement = SQL(
            "SELECT page_id FROM {0}.{1} WHERE "
            "to_tsvector('english', page_body) @@ "
            "websearch_to_tsquery('english', %s)"
        ).format(
            Identifier(self.datastore.schema),
            Identifier(self.fts_table_name),
        )

    def insert_pages(self, pages: Sequence[PageInfo]) -> None:
        with self.datastore.transaction(commit=True) as curs:
            self._insert_pages(curs, pages)

    def update_pages(self, pages: Sequence[PageInfo]) -> None:
        with self.datastore.transaction(commit=True) as curs:
            self._update_pages(curs, pages)

    def _insert_pages(self, curs: Cursor[DictRow], pages: Sequence[PageInfo]) -> None:
        for page in pages:
            params = (page.id, page.slug, page.title, page.body)
            curs.execute(self.insert_page_statement, params, prepare=True)

    def _update_pages(self, curs: Cursor[DictRow], pages: Sequence[PageInfo]) -> None:
        for page in pages:
            params = (page.slug, page.title, page.body, page.id)
            curs.execute(self.update_page_statement, params, prepare=True)

    def search_pages(self, query: str) -> list[UUID]:
        with self.datastore.transaction(commit=False) as curs:
            curs.execute(self.search_pages_statement, [query], prepare=True)
            return [row["page_id"] for row in curs.fetchall()]

    def select_page(self, page_id: UUID) -> PageInfo:
        with self.datastore.transaction(commit=False) as curs:
            curs.execute(self.select_page_statement, [str(page_id)], prepare=True)
            for row in curs.fetchall():
                return PageInfo(
                    id=page_id,
                    slug=row["page_slug"],
                    title=row["page_title"],
                    body=row["page_body"],
                )
        msg = f"Page ID {page_id} not found"
        raise PageNotFoundError(msg)


class PostgresFtsApplicationRecorder(PostgresFtsRecorder, PostgresApplicationRecorder):
    def _insert_events(
        self,
        curs: Cursor[DictRow],
        stored_events: Sequence[StoredEvent],
        *,
        insert_pages: Sequence[PageInfo] = (),
        update_pages: Sequence[PageInfo] = (),
        **kwargs: Any,
    ) -> None:
        notification_ids = super()._insert_events(curs, stored_events, **kwargs)
        self._insert_pages(curs, pages=insert_pages)
        self._update_pages(curs, pages=update_pages)
        return notification_ids

SQLite

The SQLite recorder uses a virtual table and the MATCH operator. The SQLite Factory class is extended to involve this custom recorder in a custom SQLite persistence module so that it can be used by the ContentManagement application.

from __future__ import annotations

from typing import TYPE_CHECKING, Any
from uuid import UUID

from eventsourcing.sqlite import (
    SQLiteApplicationRecorder,
    SQLiteCursor,
    SQLiteDatastore,
    SQLiteRecorder,
)
from examples.contentmanagement.application import PageNotFoundError
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo

if TYPE_CHECKING:
    from collections.abc import Sequence

    from eventsourcing.persistence import StoredEvent


class SQLiteFtsRecorder(FtsRecorder, SQLiteRecorder):
    def __init__(
        self,
        datastore: SQLiteDatastore,
        **kwargs: Any,
    ):
        super().__init__(datastore, **kwargs)
        self.fts_table_name = "ftsprojection"
        self.pages_virtual_table_name = self.fts_table_name + "_fts"

        self.create_table_statements.append(
            "CREATE TABLE IF NOT EXISTS "
            f"{self.fts_table_name} ("
            "page_id TEXT, "
            "page_slug TEXT, "
            "page_title TEXT, "
            "page_body TEXT, "
            "PRIMARY KEY "
            "(page_id)) "
        )
        self.create_table_statements.append(
            f"CREATE VIRTUAL TABLE {self.pages_virtual_table_name} USING fts5("
            f"page_id, page_body, content='{self.fts_table_name}')"
        )
        self.create_table_statements.append(
            "CREATE TRIGGER projection_ai AFTER INSERT ON "
            f"{self.fts_table_name} BEGIN "
            f"INSERT INTO {self.pages_virtual_table_name} "
            "(rowid, page_id, page_body) "
            "VALUES (new.rowid, new.page_id, new.page_body); "
            "END"
        )
        self.create_table_statements.append(
            "CREATE TRIGGER projection_au AFTER UPDATE ON "
            f"{self.fts_table_name} "
            "BEGIN "
            f"INSERT INTO {self.pages_virtual_table_name} "
            f"({self.pages_virtual_table_name}, rowid, page_id, page_body) "
            "VALUES ('delete', old.rowid, old.page_id, old.page_body);"
            f"INSERT INTO {self.pages_virtual_table_name} "
            "(rowid, page_id, page_body) "
            "VALUES (new.rowid, new.page_id, new.page_body); "
            "END"
        )

        self.select_page_statement = (
            "SELECT page_slug, page_title, page_body FROM "
            f"{self.fts_table_name} WHERE page_id = ?"
        )
        self.select_page_from_virtual_table_statement = (
            "SELECT page_body FROM "
            f"{self.pages_virtual_table_name} WHERE page_id = ?"
        )
        self.insert_page_statement = (
            f"INSERT INTO {self.fts_table_name} VALUES (?, ?, ?, ?)"
        )
        self.update_page_statement = (
            f"UPDATE {self.fts_table_name} "
            "SET page_slug = ?, page_title = ?, page_body = ? WHERE page_id = ?"
        )
        self.search_pages_statement = (
            f"SELECT page_id FROM {self.pages_virtual_table_name} "
            f"WHERE page_body MATCH ?"
        )

    def insert_pages(self, pages: Sequence[PageInfo]) -> None:
        with self.datastore.transaction(commit=True) as c:
            self._insert_pages(c, pages=pages)

    def _insert_pages(self, c: SQLiteCursor, pages: Sequence[PageInfo]) -> None:
        for page in pages:
            c.execute(
                self.insert_page_statement,
                (str(page.id), page.slug, page.title, page.body),
            )

    def update_pages(self, pages: Sequence[PageInfo]) -> None:
        with self.datastore.transaction(commit=True) as c:
            self._update_pages(c, pages=pages)

    def _update_pages(self, c: SQLiteCursor, pages: Sequence[PageInfo]) -> None:
        for page in pages:
            c.execute(
                self.update_page_statement,
                (page.slug, page.title, page.body, str(page.id)),
            )

    def search_pages(self, query: str) -> list[UUID]:
        with self.datastore.transaction(commit=False) as c:
            c.execute(self.search_pages_statement, [query])
            return [UUID(row["page_id"]) for row in c.fetchall()]

    def select_page(self, page_id: UUID) -> PageInfo:
        with self.datastore.transaction(commit=False) as c:
            c.execute(self.select_page_statement, [str(page_id)])
            for row in c.fetchall():
                return PageInfo(
                    id=page_id,
                    slug=row["page_slug"],
                    title=row["page_title"],
                    body=row["page_body"],
                )
        msg = f"Page ID {page_id} not found"
        raise PageNotFoundError(msg)


class SQLiteFtsApplicationRecorder(SQLiteFtsRecorder, SQLiteApplicationRecorder):
    def _insert_events(
        self,
        c: SQLiteCursor,
        stored_events: Sequence[StoredEvent],
        *,
        insert_pages: Sequence[PageInfo] = (),
        update_pages: Sequence[PageInfo] = (),
        **kwargs: Any,
    ) -> Sequence[int] | None:
        notification_ids = super()._insert_events(c, stored_events, **kwargs)
        self._insert_pages(c, pages=insert_pages)
        self._update_pages(c, pages=update_pages)
        return notification_ids

Test case

The test case SearchableContentApplicationTestCase uses the SearchableContentApplication to create three pages, for ‘animals’, ‘plants’ and ‘minerals’. Content is added to the pages. The content is searched with various queries and the search results are checked. The test case is executed twice, once with the PostgreSQL persistence module, and once with the SQLite persistence module.

from __future__ import annotations

from typing import ClassVar
from unittest import TestCase
from uuid import uuid4

from eventsourcing.tests.postgres_utils import drop_tables
from eventsourcing.utils import get_topic
from examples.contentmanagement.domainmodel import user_id_cvar
from examples.ftscontentmanagement.application import FtsContentManagement
from examples.ftscontentmanagement.postgres import PostgresFtsApplicationRecorder
from examples.ftscontentmanagement.sqlite import SQLiteFtsApplicationRecorder


class FtsContentManagementTestCase(TestCase):
    env: ClassVar[dict[str, str]] = {}

    def test_app(self) -> None:

        app = FtsContentManagement(env=self.env)

        # Set user_id context variable.
        user_id = uuid4()
        user_id_cvar.set(user_id)

        # Create empty pages.
        app.create_page(title="Animals", slug="animals")
        app.create_page(title="Plants", slug="plants")
        app.create_page(title="Minerals", slug="minerals")

        # Search, expect no results.
        self.assertEqual(0, len(app.search("dog")))
        self.assertEqual(0, len(app.search("rose")))
        self.assertEqual(0, len(app.search("zinc")))

        # Update the pages.
        app.update_body(slug="animals", body="cat dog zebra")
        app.update_body(slug="plants", body="bluebell rose jasmine")
        app.update_body(slug="minerals", body="iron zinc calcium")

        # Search for single words, expect results.
        pages = app.search("dog")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "animals")
        self.assertEqual(pages[0]["body"], "cat dog zebra")

        pages = app.search("rose")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "plants")
        self.assertEqual(pages[0]["body"], "bluebell rose jasmine")

        pages = app.search("zinc")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "minerals")
        self.assertEqual(pages[0]["body"], "iron zinc calcium")

        # Search for multiple words in same page.
        pages = app.search("dog cat")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0]["slug"], "animals")
        self.assertEqual(pages[0]["body"], "cat dog zebra")

        # Search for multiple words in same page, expect no results.
        pages = app.search("rose zebra")
        self.assertEqual(0, len(pages))

        # Search for alternative words, expect two results.
        pages = app.search("rose OR zebra")
        self.assertEqual(2, len(pages))
        self.assertEqual(["animals", "plants"], sorted(p["slug"] for p in pages))


class TestWithSQLite(FtsContentManagementTestCase):
    env: ClassVar[dict[str, str]] = {
        "PERSISTENCE_MODULE": "eventsourcing.sqlite",
        "APPLICATION_RECORDER_TOPIC": get_topic(SQLiteFtsApplicationRecorder),
        "SQLITE_DBNAME": ":memory:",
    }


class TestWithPostgres(FtsContentManagementTestCase):
    env: ClassVar[dict[str, str]] = {
        "PERSISTENCE_MODULE": "eventsourcing.postgres",
        "POSTGRES_DBNAME": "eventsourcing",
        "POSTGRES_HOST": "127.0.0.1",
        "POSTGRES_PORT": "5432",
        "POSTGRES_USER": "eventsourcing",
        "POSTGRES_PASSWORD": "eventsourcing",
        "APPLICATION_RECORDER_TOPIC": get_topic(PostgresFtsApplicationRecorder),
    }

    def setUp(self) -> None:
        drop_tables()
        super().setUp()

    def tearDown(self) -> None:
        super().tearDown()
        drop_tables()


del FtsContentManagementTestCase

Code reference

class examples.ftscontentmanagement.application.FtsContentManagement(env: EnvType | None = None)[source]

Bases: ContentManagement

save(*objs: MutableOrImmutableAggregate[UUID] | DomainEventProtocol[UUID] | None, **kwargs: Any) list[Recording[UUID]][source]

Collects pending events from given aggregates and puts them in the application’s event store.

search(query: str) list[dict[str, str | Any]][source]
name = 'FtsContentManagement'
class examples.ftscontentmanagement.persistence.PageInfo(id: 'UUID', slug: 'str', title: 'str', body: 'str')[source]

Bases: object

id: UUID
slug: str
title: str
body: str
class examples.ftscontentmanagement.persistence.FtsRecorder[source]

Bases: Recorder, ABC

abstractmethod insert_pages(pages: Sequence[PageInfo]) None[source]

Insert a sequence of pages (id, slug, title, body).

abstractmethod update_pages(pages: Sequence[PageInfo]) None[source]

Update a sequence of pages (id, slug, title, body).

abstractmethod search_pages(query: str) list[UUID][source]

Returns IDs for pages that match query.

abstractmethod select_page(page_id: UUID) PageInfo[source]

Returns slug, title and body for given ID.

search(query: str) Sequence[PageInfo][source]
class examples.ftscontentmanagement.postgres.PostgresFtsRecorder(datastore: PostgresDatastore, fts_table_name: str = 'ftsprojection', **kwargs: Any)[source]

Bases: PostgresRecorder, FtsRecorder

insert_pages(pages: Sequence[PageInfo]) None[source]

Insert a sequence of pages (id, slug, title, body).

update_pages(pages: Sequence[PageInfo]) None[source]

Update a sequence of pages (id, slug, title, body).

_insert_pages(curs: Cursor[DictRow], pages: Sequence[PageInfo]) None[source]
_update_pages(curs: Cursor[DictRow], pages: Sequence[PageInfo]) None[source]
search_pages(query: str) list[UUID][source]

Returns IDs for pages that match query.

select_page(page_id: UUID) PageInfo[source]

Returns slug, title and body for given ID.

class examples.ftscontentmanagement.postgres.PostgresFtsApplicationRecorder(datastore: PostgresDatastore, fts_table_name: str = 'ftsprojection', **kwargs: Any)[source]

Bases: PostgresFtsRecorder, PostgresApplicationRecorder

class examples.ftscontentmanagement.sqlite.SQLiteFtsRecorder(datastore: SQLiteDatastore, **kwargs: Any)[source]

Bases: FtsRecorder, SQLiteRecorder

insert_pages(pages: Sequence[PageInfo]) None[source]

Insert a sequence of pages (id, slug, title, body).

_insert_pages(c: SQLiteCursor, pages: Sequence[PageInfo]) None[source]
update_pages(pages: Sequence[PageInfo]) None[source]

Update a sequence of pages (id, slug, title, body).

_update_pages(c: SQLiteCursor, pages: Sequence[PageInfo]) None[source]
search_pages(query: str) list[UUID][source]

Returns IDs for pages that match query.

select_page(page_id: UUID) PageInfo[source]

Returns slug, title and body for given ID.

class examples.ftscontentmanagement.sqlite.SQLiteFtsApplicationRecorder(datastore: SQLiteDatastore, **kwargs: Any)[source]

Bases: SQLiteFtsRecorder, SQLiteApplicationRecorder

class examples.ftscontentmanagement.test_application.TestWithSQLite(methodName='runTest')[source]

Bases: FtsContentManagementTestCase

env: ClassVar[dict[str, str]] = {'APPLICATION_RECORDER_TOPIC': 'examples.ftscontentmanagement.sqlite:SQLiteFtsApplicationRecorder', 'PERSISTENCE_MODULE': 'eventsourcing.sqlite', 'SQLITE_DBNAME': ':memory:'}
class examples.ftscontentmanagement.test_application.TestWithPostgres(methodName='runTest')[source]

Bases: FtsContentManagementTestCase

env: ClassVar[dict[str, str]] = {'APPLICATION_RECORDER_TOPIC': 'examples.ftscontentmanagement.postgres:PostgresFtsApplicationRecorder', 'PERSISTENCE_MODULE': 'eventsourcing.postgres', 'POSTGRES_DBNAME': 'eventsourcing', 'POSTGRES_HOST': '127.0.0.1', 'POSTGRES_PASSWORD': 'eventsourcing', 'POSTGRES_PORT': '5432', 'POSTGRES_USER': 'eventsourcing'}
setUp() None[source]

Hook method for setting up the test fixture before exercising it.

tearDown() None[source]

Hook method for deconstructing the test fixture after testing it.