System 1 - Content management system

In this example, event notifications from the ContentManagementApplication from Application 3 - Content management are processed and projected into an eventually-consistent full text search index, a searchable “materialized view” of the pages’ body text just like Application 5 - Searchable content.

This is an example of CQRS. By separating the search engine “read model” from the content management “write model”, the commands that update pages will perform faster. But, more importantly, the search engine can be redesigned and rebuilt by reprocessing those events. The projected searchable content can be deleted and rebuilt, perhaps also to include page titles, or timestamps, or other information contained in the domain events such as the authors, because it is updated by processing events. This is the main advantage of “CQRS” over the “inline” technique used in Application 5 - Searchable content where the search index is simply updated whenever new events are recorded. Please note, it is possible to migrate from the “inline” technique to CQRS, by adding the downstream processing and then removing the inline updating, since the domain model is already event sourced. Similarly, other projections can be added to work alongside and concurrently with the updating of the search engine.

Application

The SearchIndexApplication defined below is a ProcessApplication. Its policy() function is coded to process the Page.Created and Page.BodyUpdated domain events of the ContentManagementApplication. It also has a search() method that returns a list of page IDs.

The SearchIndexApplication class in this example works in a similar way to the SearchableContentApplication class in Application 5 - Searchable content, by setting variable keyword arguments insert_pages and update_pages on a the ProcessingEvent object. However, rather than populating the variable keyword arguments in the save() method, it populates insert_pages and update_pages within its policy() function. The insert_pages and update_pages arguments are set on the ProcessingEvent object passed into the policy() function, which carries an event notification ID that indicates the position in the application sequence of the domain event that is being processed.

The application will be configured to run with a custom ProcessRecorder so that search index records will be updated atomically with the inserting of a tracking record which indicates which upstream event notification has been processed.

Because the Page.BodyUpdated event carries only the diff of the page body, the policy() function must first select the current page body from its own records and then apply the diff as a patch. The “exactly once” semantics provided by the library’s system module guarantees that the diffs will always be applied in the correct order. Without this guarantee, the projection could become inconsistent, with the consequence that the diffs will fail to be applied.

from __future__ import annotations

from typing import TYPE_CHECKING, ClassVar, Dict, List, cast

from eventsourcing.examples.contentmanagement.domainmodel import Page
from eventsourcing.examples.contentmanagement.utils import apply_patch
from eventsourcing.examples.searchablecontent.persistence import (
    SearchableContentRecorder,
)
from eventsourcing.system import ProcessApplication

if TYPE_CHECKING:  # pragma: nocover
    from uuid import UUID

    from eventsourcing.application import ProcessingEvent
    from eventsourcing.domain import DomainEventProtocol


class SearchIndexApplication(ProcessApplication):
    env: ClassVar[Dict[str, str]] = {
        "COMPRESSOR_TOPIC": "gzip",
    }

    def policy(
        self,
        domain_event: DomainEventProtocol,
        processing_event: ProcessingEvent,
    ) -> None:
        if isinstance(domain_event, Page.Created):
            processing_event.saved_kwargs["insert_pages"] = [
                (
                    domain_event.originator_id,
                    domain_event.slug,
                    domain_event.title,
                    domain_event.body,
                )
            ]
        elif isinstance(domain_event, Page.BodyUpdated):
            recorder = cast(SearchableContentRecorder, self.recorder)
            page_id = domain_event.originator_id
            page_slug, page_title, page_body = recorder.select_page(page_id)
            page_body = apply_patch(page_body, domain_event.diff)
            processing_event.saved_kwargs["update_pages"] = [
                (
                    page_id,
                    page_slug,
                    page_title,
                    page_body,
                )
            ]

    def search(self, query: str) -> List[UUID]:
        recorder = cast(SearchableContentRecorder, self.recorder)
        return recorder.search_pages(query)

System

A System of applications is defined, in which the SearchIndexApplication follows the ContentManagementApplication. This system can then be used in any Runner.

from __future__ import annotations

from eventsourcing.examples.contentmanagement.application import (
    ContentManagementApplication,
)
from eventsourcing.examples.contentmanagementsystem.application import (
    SearchIndexApplication,
)
from eventsourcing.system import System


class ContentManagementSystem(System):
    def __init__(self) -> None:
        super().__init__(pipes=[[ContentManagementApplication, SearchIndexApplication]])

PostgreSQL

The PostgresSearchableContentRecorder from Application 5 - Searchable content is used to define a custom ProcessRecorder for PostgreSQL. The PostgreSQL Factory class is extended to involve this custom recorder in a custom persistence module so that it can be used by the SearchIndexApplication.

from eventsourcing.examples.searchablecontent.postgres import (
    PostgresSearchableContentRecorder,
)
from eventsourcing.postgres import Factory, PostgresProcessRecorder


class SearchableContentProcessRecorder(
    PostgresSearchableContentRecorder, PostgresProcessRecorder
):
    pass


class SearchableContentInfrastructureFactory(Factory):
    process_recorder_class = SearchableContentProcessRecorder


del Factory

SQLite

The SqliteSearchableContentRecorder from Application 5 - Searchable content is used to define a custom ProcessRecorder for SQLite. The SQLite Factory class is extended to involve this custom recorder in a custom persistence module so that it can be used by the SearchIndexApplication.

from eventsourcing.examples.searchablecontent.sqlite import (
    SQLiteSearchableContentRecorder,
)
from eventsourcing.sqlite import Factory, SQLiteProcessRecorder


class SearchableContentProcessRecorder(
    SQLiteSearchableContentRecorder, SQLiteProcessRecorder
):
    pass


class SearchableContentInfrastructureFactory(Factory):
    process_recorder_class = SearchableContentProcessRecorder


del Factory

Test case

The test case ContentManagementSystemTestCase creates three pages, for ‘animals’, ‘plants’ and ‘minerals’. Content is added to the pages. The content is searched with various queries and the search results are checked. The test is executed twice, once with the application configured for both PostgreSQL, and once for SQLite.

from __future__ import annotations

from typing import ClassVar, Dict
from unittest import TestCase
from uuid import uuid4

from eventsourcing.examples.contentmanagement.application import (
    ContentManagementApplication,
)
from eventsourcing.examples.contentmanagement.domainmodel import user_id_cvar
from eventsourcing.examples.contentmanagementsystem.application import (
    SearchIndexApplication,
)
from eventsourcing.examples.contentmanagementsystem.system import (
    ContentManagementSystem,
)
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.system import SingleThreadedRunner
from eventsourcing.tests.postgres_utils import drop_postgres_table


class ContentManagementSystemTestCase(TestCase):
    env: ClassVar[Dict[str, str]] = {}

    def test_system(self) -> None:
        runner = SingleThreadedRunner(system=ContentManagementSystem(), env=self.env)
        runner.start()

        content_management_app = runner.get(ContentManagementApplication)
        search_index_app = runner.get(SearchIndexApplication)

        # Set user_id context variable.
        user_id = uuid4()
        user_id_cvar.set(user_id)

        # Create empty pages.
        content_management_app.create_page(title="Animals", slug="animals")
        content_management_app.create_page(title="Plants", slug="plants")
        content_management_app.create_page(title="Minerals", slug="minerals")

        # Search, expect no results.
        self.assertEqual(0, len(search_index_app.search("cat")))
        self.assertEqual(0, len(search_index_app.search("rose")))
        self.assertEqual(0, len(search_index_app.search("calcium")))

        # Update the pages.
        content_management_app.update_body(slug="animals", body="cat")
        content_management_app.update_body(slug="plants", body="rose")
        content_management_app.update_body(slug="minerals", body="calcium")

        # Search for single words.
        page_ids = search_index_app.search("cat")
        self.assertEqual(1, len(page_ids))
        page = content_management_app.get_page_by_id(page_ids[0])
        self.assertEqual(page["slug"], "animals")
        self.assertEqual(page["body"], "cat")

        page_ids = search_index_app.search("rose")
        self.assertEqual(1, len(page_ids))
        page = content_management_app.get_page_by_id(page_ids[0])
        self.assertEqual(page["slug"], "plants")
        self.assertEqual(page["body"], "rose")

        page_ids = search_index_app.search("calcium")
        self.assertEqual(1, len(page_ids))
        page = content_management_app.get_page_by_id(page_ids[0])
        self.assertEqual(page["slug"], "minerals")
        self.assertEqual(page["body"], "calcium")

        self.assertEqual(len(search_index_app.search("dog")), 0)
        self.assertEqual(len(search_index_app.search("bluebell")), 0)
        self.assertEqual(len(search_index_app.search("zinc")), 0)

        # Update the pages again.
        content_management_app.update_body(slug="animals", body="cat dog zebra")
        content_management_app.update_body(slug="plants", body="bluebell rose jasmine")
        content_management_app.update_body(slug="minerals", body="iron zinc calcium")

        # Search for single words.
        page_ids = search_index_app.search("cat")
        self.assertEqual(1, len(page_ids))
        page = content_management_app.get_page_by_id(page_ids[0])
        self.assertEqual(page["slug"], "animals")
        self.assertEqual(page["body"], "cat dog zebra")

        page_ids = search_index_app.search("rose")
        self.assertEqual(1, len(page_ids))
        page = content_management_app.get_page_by_id(page_ids[0])
        self.assertEqual(page["slug"], "plants")
        self.assertEqual(page["body"], "bluebell rose jasmine")

        page_ids = search_index_app.search("calcium")
        self.assertEqual(1, len(page_ids))
        page = content_management_app.get_page_by_id(page_ids[0])
        self.assertEqual(page["slug"], "minerals")
        self.assertEqual(page["body"], "iron zinc calcium")

        page_ids = search_index_app.search("dog")
        self.assertEqual(1, len(page_ids))
        page = content_management_app.get_page_by_id(page_ids[0])
        self.assertEqual(page["slug"], "animals")
        self.assertEqual(page["body"], "cat dog zebra")

        page_ids = search_index_app.search("bluebell")
        self.assertEqual(1, len(page_ids))
        page = content_management_app.get_page_by_id(page_ids[0])
        self.assertEqual(page["slug"], "plants")
        self.assertEqual(page["body"], "bluebell rose jasmine")

        page_ids = search_index_app.search("zinc")
        self.assertEqual(1, len(page_ids))
        page = content_management_app.get_page_by_id(page_ids[0])
        self.assertEqual(page["slug"], "minerals")
        self.assertEqual(page["body"], "iron zinc calcium")

        # Search for multiple words in same page.
        page_ids = search_index_app.search("dog cat")
        self.assertEqual(1, len(page_ids))
        page = content_management_app.get_page_by_id(page_ids[0])
        self.assertEqual(page["slug"], "animals")
        self.assertEqual(page["body"], "cat dog zebra")

        # Search for multiple words in same page, expect no results.
        page_ids = search_index_app.search("rose zebra")
        self.assertEqual(0, len(page_ids))

        # Search for alternative words, expect two results.
        page_ids = search_index_app.search("rose OR zebra")
        pages = [content_management_app.get_page_by_id(page_id) for page_id in page_ids]
        self.assertEqual(2, len(pages))
        self.assertEqual(["animals", "plants"], sorted(p["slug"] for p in pages))


class TestWithSQLite(ContentManagementSystemTestCase):
    env: ClassVar[Dict[str, str]] = {
        "PERSISTENCE_MODULE": "eventsourcing.examples.contentmanagementsystem.sqlite",
        "SQLITE_DBNAME": ":memory:",
    }


class TestWithPostgres(ContentManagementSystemTestCase):
    env: ClassVar[Dict[str, str]] = {
        "PERSISTENCE_MODULE": "eventsourcing.examples.contentmanagementsystem.postgres",
        "POSTGRES_DBNAME": "eventsourcing",
        "POSTGRES_HOST": "127.0.0.1",
        "POSTGRES_PORT": "5432",
        "POSTGRES_USER": "eventsourcing",
        "POSTGRES_PASSWORD": "eventsourcing",
    }

    def setUp(self) -> None:
        super().setUp()
        self.drop_tables()

    def tearDown(self) -> None:
        self.drop_tables()
        super().tearDown()

    def drop_tables(self) -> None:
        db = PostgresDatastore(
            self.env["POSTGRES_DBNAME"],
            self.env["POSTGRES_HOST"],
            self.env["POSTGRES_PORT"],
            self.env["POSTGRES_USER"],
            self.env["POSTGRES_PASSWORD"],
        )
        drop_postgres_table(db, "public.contentmanagementapplication_events")
        drop_postgres_table(db, "public.pages_projection_example")
        drop_postgres_table(db, "public.searchindexapplication_events")
        drop_postgres_table(db, "public.searchindexapplication_tracking")
        db.close()


del ContentManagementSystemTestCase