System 1 - Content management system

In this example, event notifications from the ContentManagement application described in Application 3 - Content management are processed and projected into an eventually-consistent full text search index, a searchable “materialised view” of the pages’ body text just like Application 5 - Searchable content.

This is an example of CQRS. By separating the search engine “read model” from the content management “write model”, the commands that update pages will perform faster. But, more importantly, the search engine can be redesigned and rebuilt by reprocessing those events. The projected searchable content can be deleted and rebuilt, perhaps also to include page titles, or timestamps, or other information contained in the domain events such as the authors, because it is updated by processing events. This is the main advantage of “CQRS” over the “inline” technique used in Application 5 - Searchable content where the search index is simply updated whenever new events are recorded. Please note, it is possible to migrate from the “inline” technique to CQRS, by adding the downstream processing and then removing the inline updating, since the domain model is already event sourced. Similarly, other projections can be added to work alongside and concurrently with the updating of the search engine.

Application

The PagesIndexApplication defined below is a ProcessApplication. Its policy() function is coded to process the Page.Created and Page.BodyUpdated domain events of the ContentManagement application. It also has a search() method that returns a list of page IDs.

The PagesIndexApplication class in this example works in a similar way to the SearchableContentApplication class in Application 5 - Searchable content, by setting variable keyword arguments insert_pages and update_pages on a ProcessingEvent object. However, rather than populating the variable keyword arguments in the save() method, it populates insert_pages and update_pages within its policy() function. The insert_pages and update_pages arguments are set on the ProcessingEvent object passed into the policy() function, which carries an event notification ID that indicates the position in the application sequence of the domain event that is being processed.

The application will be configured to run with a custom ProcessRecorder so that search index records will be updated atomically with the inserting of a tracking record which indicates which upstream event notification has been processed.

Because the Page.BodyUpdated event carries only the diff of the page body, the policy() function must first select the current page body from its own records and then apply the diff as a patch. The “exactly once” semantics provided by the library’s system module guarantees that the diffs will always be applied in the correct order. Without this guarantee, the projection could become inconsistent, with the consequence that the diffs will fail to be applied.

from __future__ import annotations

from typing import TYPE_CHECKING, ClassVar, cast
from uuid import UUID

from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.system import ProcessApplication
from examples.contentmanagement.domainmodel import Page
from examples.contentmanagement.utils import apply_diff
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo

if TYPE_CHECKING:
    from eventsourcing.application import ProcessingEvent
    from eventsourcing.domain import DomainEventProtocol


class FtsProcess(ProcessApplication[UUID]):
    env: ClassVar[dict[str, str]] = {
        "COMPRESSOR_TOPIC": "gzip",
    }

    @singledispatchmethod
    def policy(
        self,
        domain_event: DomainEventProtocol[UUID],
        processing_event: ProcessingEvent[UUID],
    ) -> None:
        if isinstance(domain_event, Page.Created):
            processing_event.collect_events(
                insert_pages=[
                    PageInfo(
                        id=domain_event.originator_id,
                        slug=domain_event.slug,
                        title=domain_event.title,
                        body=domain_event.body,
                    )
                ]
            )
        elif isinstance(domain_event, Page.BodyUpdated):
            recorder = cast("FtsRecorder", self.recorder)
            page_id = domain_event.originator_id
            page = recorder.select_page(page_id)
            page_body = apply_diff(page.body, domain_event.diff)
            processing_event.collect_events(
                update_pages=[
                    PageInfo(
                        id=page_id,
                        slug=page.slug,
                        title=page.title,
                        body=page_body,
                    )
                ]
            )

    def search(self, query: str) -> list[UUID]:
        recorder = cast("FtsRecorder", self.recorder)
        return recorder.search_pages(query)

System

A System of applications is defined, in which the PagesIndexApplication follows the ContentManagement application. This system can then be used in any Runner.

from __future__ import annotations

from eventsourcing.system import System
from examples.contentmanagement.application import ContentManagement
from examples.ftsprocess.application import FtsProcess


class ContentManagementSystem(System):
    def __init__(self) -> None:
        super().__init__(pipes=[[ContentManagement, FtsProcess]])

PostgreSQL

The PostgresSearchableContentRecorder from Application 5 - Searchable content is used to define a custom ProcessRecorder for PostgreSQL. The PostgreSQL Factory class is extended to involve this custom recorder in a custom persistence module so that it can be used by the PagesIndexApplication.

from __future__ import annotations

from eventsourcing.postgres import PostgresProcessRecorder
from examples.ftscontentmanagement.postgres import PostgresFtsApplicationRecorder


class PostgresFtsProcessRecorder(
    PostgresFtsApplicationRecorder, PostgresProcessRecorder
):
    pass

SQLite

The SqliteSearchableContentRecorder from Application 5 - Searchable content is used to define a custom ProcessRecorder for SQLite. The SQLite Factory class is extended to involve this custom recorder in a custom persistence module so that it can be used by the PagesIndexApplication.

from eventsourcing.sqlite import SQLiteProcessRecorder
from examples.ftscontentmanagement.sqlite import SQLiteFtsApplicationRecorder


class SQLiteFtsProcessRecorder(SQLiteFtsApplicationRecorder, SQLiteProcessRecorder):
    pass

Test case

The test case ContentManagementSystemTestCase creates three pages, for ‘animals’, ‘plants’ and ‘minerals’. Content is added to the pages. The content is searched with various queries and the search results are checked. The test is executed twice, once with the application configured for both PostgreSQL, and once for SQLite.

from __future__ import annotations

from typing import ClassVar
from unittest import TestCase
from uuid import UUID, uuid4

from eventsourcing.system import SingleThreadedRunner
from eventsourcing.tests.postgres_utils import drop_tables
from eventsourcing.utils import get_topic
from examples.contentmanagement.application import ContentManagement
from examples.contentmanagement.domainmodel import user_id_cvar
from examples.ftsprocess.application import FtsProcess
from examples.ftsprocess.postgres import PostgresFtsProcessRecorder
from examples.ftsprocess.sqlite import SQLiteFtsProcessRecorder
from examples.ftsprocess.system import ContentManagementSystem


class ContentManagementSystemTestCase(TestCase):
    env: ClassVar[dict[str, str]] = {}

    def test_system(self) -> None:
        with SingleThreadedRunner[UUID](
            system=ContentManagementSystem(), env=self.env
        ) as runner:

            content_management_app = runner.get(ContentManagement)
            search_index_app = runner.get(FtsProcess)

            # Set user_id context variable.
            user_id = uuid4()
            user_id_cvar.set(user_id)

            # Create empty pages.
            content_management_app.create_page(title="Animals", slug="animals")
            content_management_app.create_page(title="Plants", slug="plants")
            content_management_app.create_page(title="Minerals", slug="minerals")

            # Search, expect no results.
            self.assertEqual(0, len(search_index_app.search("cat")))
            self.assertEqual(0, len(search_index_app.search("rose")))
            self.assertEqual(0, len(search_index_app.search("calcium")))

            # Update the pages.
            content_management_app.update_body(slug="animals", body="cat")
            content_management_app.update_body(slug="plants", body="rose")
            content_management_app.update_body(slug="minerals", body="calcium")

            # Search for single words.
            page_ids = search_index_app.search("cat")
            self.assertEqual(1, len(page_ids))
            page = content_management_app.get_page_by_id(page_ids[0])
            self.assertEqual(page["slug"], "animals")
            self.assertEqual(page["body"], "cat")

            page_ids = search_index_app.search("rose")
            self.assertEqual(1, len(page_ids))
            page = content_management_app.get_page_by_id(page_ids[0])
            self.assertEqual(page["slug"], "plants")
            self.assertEqual(page["body"], "rose")

            page_ids = search_index_app.search("calcium")
            self.assertEqual(1, len(page_ids))
            page = content_management_app.get_page_by_id(page_ids[0])
            self.assertEqual(page["slug"], "minerals")
            self.assertEqual(page["body"], "calcium")

            self.assertEqual(len(search_index_app.search("dog")), 0)
            self.assertEqual(len(search_index_app.search("bluebell")), 0)
            self.assertEqual(len(search_index_app.search("zinc")), 0)

            # Update the pages again.
            content_management_app.update_body(slug="animals", body="cat dog zebra")
            content_management_app.update_body(
                slug="plants", body="bluebell rose jasmine"
            )
            content_management_app.update_body(
                slug="minerals", body="iron zinc calcium"
            )

            # Search for single words.
            page_ids = search_index_app.search("cat")
            self.assertEqual(1, len(page_ids))
            page = content_management_app.get_page_by_id(page_ids[0])
            self.assertEqual(page["slug"], "animals")
            self.assertEqual(page["body"], "cat dog zebra")

            page_ids = search_index_app.search("rose")
            self.assertEqual(1, len(page_ids))
            page = content_management_app.get_page_by_id(page_ids[0])
            self.assertEqual(page["slug"], "plants")
            self.assertEqual(page["body"], "bluebell rose jasmine")

            page_ids = search_index_app.search("calcium")
            self.assertEqual(1, len(page_ids))
            page = content_management_app.get_page_by_id(page_ids[0])
            self.assertEqual(page["slug"], "minerals")
            self.assertEqual(page["body"], "iron zinc calcium")

            page_ids = search_index_app.search("dog")
            self.assertEqual(1, len(page_ids))
            page = content_management_app.get_page_by_id(page_ids[0])
            self.assertEqual(page["slug"], "animals")
            self.assertEqual(page["body"], "cat dog zebra")

            page_ids = search_index_app.search("bluebell")
            self.assertEqual(1, len(page_ids))
            page = content_management_app.get_page_by_id(page_ids[0])
            self.assertEqual(page["slug"], "plants")
            self.assertEqual(page["body"], "bluebell rose jasmine")

            page_ids = search_index_app.search("zinc")
            self.assertEqual(1, len(page_ids))
            page = content_management_app.get_page_by_id(page_ids[0])
            self.assertEqual(page["slug"], "minerals")
            self.assertEqual(page["body"], "iron zinc calcium")

            # Search for multiple words in same page.
            page_ids = search_index_app.search("dog cat")
            self.assertEqual(1, len(page_ids))
            page = content_management_app.get_page_by_id(page_ids[0])
            self.assertEqual(page["slug"], "animals")
            self.assertEqual(page["body"], "cat dog zebra")

            # Search for multiple words in same page, expect no results.
            page_ids = search_index_app.search("rose zebra")
            self.assertEqual(0, len(page_ids))

            # Search for alternative words, expect two results.
            page_ids = search_index_app.search("rose OR zebra")
            pages = [
                content_management_app.get_page_by_id(page_id) for page_id in page_ids
            ]
            self.assertEqual(2, len(pages))
            self.assertEqual(["animals", "plants"], sorted(p["slug"] for p in pages))


class TestWithSQLite(ContentManagementSystemTestCase):
    env: ClassVar[dict[str, str]] = {
        "PERSISTENCE_MODULE": "eventsourcing.sqlite",
        "PROCESS_RECORDER_TOPIC": get_topic(SQLiteFtsProcessRecorder),
        "SQLITE_DBNAME": ":memory:",
    }


class TestWithPostgres(ContentManagementSystemTestCase):
    env: ClassVar[dict[str, str]] = {
        "PERSISTENCE_MODULE": "eventsourcing.postgres",
        "PROCESS_RECORDER_TOPIC": get_topic(PostgresFtsProcessRecorder),
        "POSTGRES_DBNAME": "eventsourcing",
        "POSTGRES_HOST": "127.0.0.1",
        "POSTGRES_PORT": "5432",
        "POSTGRES_USER": "eventsourcing",
        "POSTGRES_PASSWORD": "eventsourcing",
    }

    def setUp(self) -> None:
        drop_tables()
        super().setUp()

    def tearDown(self) -> None:
        super().tearDown()
        drop_tables()

    def test_system(self) -> None:
        super().test_system()


del ContentManagementSystemTestCase

Code reference

class examples.ftsprocess.application.FtsProcess(env: Mapping[str, str] | None = None)[source]

Bases: ProcessApplication[UUID]

env: ClassVar[dict[str, str]] = {'COMPRESSOR_TOPIC': 'gzip'}
policy(domain_event: DomainEventProtocol[UUID], processing_event: ProcessingEvent[UUID]) None[source]

Abstract domain event processing policy method. Must be implemented by event processing applications. When processing the given domain event, event processing applications must use the collect_events() method of the given ProcessingEvent object (not the application’s save() method) so that the new domain events will be recorded atomically and uniquely with tracking information about the position of the processed event in its application sequence.

search(query: str) list[UUID][source]
name = 'FtsProcess'
topics: Sequence[str] = ()
class examples.ftsprocess.test_system.TestWithSQLite(methodName='runTest')[source]

Bases: ContentManagementSystemTestCase

env: ClassVar[dict[str, str]] = {'PERSISTENCE_MODULE': 'eventsourcing.sqlite', 'PROCESS_RECORDER_TOPIC': 'examples.ftsprocess.sqlite:SQLiteFtsProcessRecorder', 'SQLITE_DBNAME': ':memory:'}
class examples.ftsprocess.test_system.TestWithPostgres(methodName='runTest')[source]

Bases: ContentManagementSystemTestCase

env: ClassVar[dict[str, str]] = {'PERSISTENCE_MODULE': 'eventsourcing.postgres', 'POSTGRES_DBNAME': 'eventsourcing', 'POSTGRES_HOST': '127.0.0.1', 'POSTGRES_PASSWORD': 'eventsourcing', 'POSTGRES_PORT': '5432', 'POSTGRES_USER': 'eventsourcing', 'PROCESS_RECORDER_TOPIC': 'examples.ftsprocess.postgres:PostgresFtsProcessRecorder'}
setUp() None[source]

Hook method for setting up the test fixture before exercising it.

tearDown() None[source]

Hook method for deconstructing the test fixture after testing it.

test_system() None[source]