System 1 - Content management system¶
In this example, event notifications from the ContentManagement
application described in
Application 3 - Content management are processed and projected into an
eventually-consistent full text search index, a searchable “materialised view” of
the pages’ body text just like Application 5 - Searchable content.
This is an example of CQRS. By separating the search engine “read model” from the content management “write model”, the commands that update pages will perform faster. But, more importantly, the search engine can be redesigned and rebuilt by reprocessing those events. The projected searchable content can be deleted and rebuilt, perhaps also to include page titles, or timestamps, or other information contained in the domain events such as the authors, because it is updated by processing events. This is the main advantage of “CQRS” over the “inline” technique used in Application 5 - Searchable content where the search index is simply updated whenever new events are recorded. Please note, it is possible to migrate from the “inline” technique to CQRS, by adding the downstream processing and then removing the inline updating, since the domain model is already event sourced. Similarly, other projections can be added to work alongside and concurrently with the updating of the search engine.
Application¶
The PagesIndexApplication
defined below is a ProcessApplication
.
Its policy()
function is coded to process the Page.Created
and Page.BodyUpdated
domain
events of the ContentManagement
application. It also has a search()
method that returns
a list of page IDs.
The PagesIndexApplication
class in this example works in a similar way to the SearchableContentApplication
class in Application 5 - Searchable content, by setting variable keyword arguments
insert_pages
and update_pages
on a ProcessingEvent
object.
However, rather than populating the variable keyword arguments in the save()
method, it populates insert_pages
and update_pages
within its policy()
function. The insert_pages
and update_pages
arguments are set
on the ProcessingEvent
object passed into the policy()
function, which carries an event notification ID that indicates the position in the application sequence of
the domain event that is being processed.
The application will be configured to run with a custom ProcessRecorder
so that search index records will be updated atomically with the inserting of a tracking record which
indicates which upstream event notification has been processed.
Because the Page.BodyUpdated
event carries only the diff
of the page body, the
policy()
function must first select the current page body from its own records
and then apply the diff as a patch. The “exactly once” semantics provided by the library’s
system module guarantees that the diffs will always be applied in the correct order. Without
this guarantee, the projection could become inconsistent, with the consequence that the diffs
will fail to be applied.
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar, cast
from uuid import UUID
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.system import ProcessApplication
from examples.contentmanagement.domainmodel import Page
from examples.contentmanagement.utils import apply_diff
from examples.ftscontentmanagement.persistence import FtsRecorder, PageInfo
if TYPE_CHECKING:
from eventsourcing.application import ProcessingEvent
from eventsourcing.domain import DomainEventProtocol
class FtsProcess(ProcessApplication[UUID]):
env: ClassVar[dict[str, str]] = {
"COMPRESSOR_TOPIC": "gzip",
}
@singledispatchmethod
def policy(
self,
domain_event: DomainEventProtocol[UUID],
processing_event: ProcessingEvent[UUID],
) -> None:
if isinstance(domain_event, Page.Created):
processing_event.collect_events(
insert_pages=[
PageInfo(
id=domain_event.originator_id,
slug=domain_event.slug,
title=domain_event.title,
body=domain_event.body,
)
]
)
elif isinstance(domain_event, Page.BodyUpdated):
recorder = cast("FtsRecorder", self.recorder)
page_id = domain_event.originator_id
page = recorder.select_page(page_id)
page_body = apply_diff(page.body, domain_event.diff)
processing_event.collect_events(
update_pages=[
PageInfo(
id=page_id,
slug=page.slug,
title=page.title,
body=page_body,
)
]
)
def search(self, query: str) -> list[UUID]:
recorder = cast("FtsRecorder", self.recorder)
return recorder.search_pages(query)
System¶
A System
of applications is defined, in which the
PagesIndexApplication
follows the ContentManagement
application. This system
can then be used in any Runner
.
from __future__ import annotations
from eventsourcing.system import System
from examples.contentmanagement.application import ContentManagement
from examples.ftsprocess.application import FtsProcess
class ContentManagementSystem(System):
def __init__(self) -> None:
super().__init__(pipes=[[ContentManagement, FtsProcess]])
PostgreSQL¶
The PostgresSearchableContentRecorder
from Application 5 - Searchable content
is used to define a custom ProcessRecorder
for PostgreSQL.
The PostgreSQL Factory
class is extended to involve this custom recorder
in a custom persistence module so that it can be used by the PagesIndexApplication
.
from __future__ import annotations
from eventsourcing.postgres import PostgresProcessRecorder
from examples.ftscontentmanagement.postgres import PostgresFtsApplicationRecorder
class PostgresFtsProcessRecorder(
PostgresFtsApplicationRecorder, PostgresProcessRecorder
):
pass
SQLite¶
The SqliteSearchableContentRecorder
from Application 5 - Searchable content
is used to define a custom ProcessRecorder
for SQLite.
The SQLite Factory
class is extended to involve this custom recorder
in a custom persistence module so that it can be used by the PagesIndexApplication
.
from eventsourcing.sqlite import SQLiteProcessRecorder
from examples.ftscontentmanagement.sqlite import SQLiteFtsApplicationRecorder
class SQLiteFtsProcessRecorder(SQLiteFtsApplicationRecorder, SQLiteProcessRecorder):
pass
Test case¶
The test case ContentManagementSystemTestCase
creates three pages, for ‘animals’, ‘plants’
and ‘minerals’. Content is added to the pages. The content is searched with various queries and
the search results are checked. The test is executed twice, once with the application configured
for both PostgreSQL, and once for SQLite.
from __future__ import annotations
from typing import ClassVar
from unittest import TestCase
from uuid import UUID, uuid4
from eventsourcing.system import SingleThreadedRunner
from eventsourcing.tests.postgres_utils import drop_tables
from eventsourcing.utils import get_topic
from examples.contentmanagement.application import ContentManagement
from examples.contentmanagement.domainmodel import user_id_cvar
from examples.ftsprocess.application import FtsProcess
from examples.ftsprocess.postgres import PostgresFtsProcessRecorder
from examples.ftsprocess.sqlite import SQLiteFtsProcessRecorder
from examples.ftsprocess.system import ContentManagementSystem
class ContentManagementSystemTestCase(TestCase):
env: ClassVar[dict[str, str]] = {}
def test_system(self) -> None:
with SingleThreadedRunner[UUID](
system=ContentManagementSystem(), env=self.env
) as runner:
content_management_app = runner.get(ContentManagement)
search_index_app = runner.get(FtsProcess)
# Set user_id context variable.
user_id = uuid4()
user_id_cvar.set(user_id)
# Create empty pages.
content_management_app.create_page(title="Animals", slug="animals")
content_management_app.create_page(title="Plants", slug="plants")
content_management_app.create_page(title="Minerals", slug="minerals")
# Search, expect no results.
self.assertEqual(0, len(search_index_app.search("cat")))
self.assertEqual(0, len(search_index_app.search("rose")))
self.assertEqual(0, len(search_index_app.search("calcium")))
# Update the pages.
content_management_app.update_body(slug="animals", body="cat")
content_management_app.update_body(slug="plants", body="rose")
content_management_app.update_body(slug="minerals", body="calcium")
# Search for single words.
page_ids = search_index_app.search("cat")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "animals")
self.assertEqual(page["body"], "cat")
page_ids = search_index_app.search("rose")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "plants")
self.assertEqual(page["body"], "rose")
page_ids = search_index_app.search("calcium")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "minerals")
self.assertEqual(page["body"], "calcium")
self.assertEqual(len(search_index_app.search("dog")), 0)
self.assertEqual(len(search_index_app.search("bluebell")), 0)
self.assertEqual(len(search_index_app.search("zinc")), 0)
# Update the pages again.
content_management_app.update_body(slug="animals", body="cat dog zebra")
content_management_app.update_body(
slug="plants", body="bluebell rose jasmine"
)
content_management_app.update_body(
slug="minerals", body="iron zinc calcium"
)
# Search for single words.
page_ids = search_index_app.search("cat")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "animals")
self.assertEqual(page["body"], "cat dog zebra")
page_ids = search_index_app.search("rose")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "plants")
self.assertEqual(page["body"], "bluebell rose jasmine")
page_ids = search_index_app.search("calcium")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "minerals")
self.assertEqual(page["body"], "iron zinc calcium")
page_ids = search_index_app.search("dog")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "animals")
self.assertEqual(page["body"], "cat dog zebra")
page_ids = search_index_app.search("bluebell")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "plants")
self.assertEqual(page["body"], "bluebell rose jasmine")
page_ids = search_index_app.search("zinc")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "minerals")
self.assertEqual(page["body"], "iron zinc calcium")
# Search for multiple words in same page.
page_ids = search_index_app.search("dog cat")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "animals")
self.assertEqual(page["body"], "cat dog zebra")
# Search for multiple words in same page, expect no results.
page_ids = search_index_app.search("rose zebra")
self.assertEqual(0, len(page_ids))
# Search for alternative words, expect two results.
page_ids = search_index_app.search("rose OR zebra")
pages = [
content_management_app.get_page_by_id(page_id) for page_id in page_ids
]
self.assertEqual(2, len(pages))
self.assertEqual(["animals", "plants"], sorted(p["slug"] for p in pages))
class TestWithSQLite(ContentManagementSystemTestCase):
env: ClassVar[dict[str, str]] = {
"PERSISTENCE_MODULE": "eventsourcing.sqlite",
"PROCESS_RECORDER_TOPIC": get_topic(SQLiteFtsProcessRecorder),
"SQLITE_DBNAME": ":memory:",
}
class TestWithPostgres(ContentManagementSystemTestCase):
env: ClassVar[dict[str, str]] = {
"PERSISTENCE_MODULE": "eventsourcing.postgres",
"PROCESS_RECORDER_TOPIC": get_topic(PostgresFtsProcessRecorder),
"POSTGRES_DBNAME": "eventsourcing",
"POSTGRES_HOST": "127.0.0.1",
"POSTGRES_PORT": "5432",
"POSTGRES_USER": "eventsourcing",
"POSTGRES_PASSWORD": "eventsourcing",
}
def setUp(self) -> None:
drop_tables()
super().setUp()
def tearDown(self) -> None:
super().tearDown()
drop_tables()
def test_system(self) -> None:
super().test_system()
del ContentManagementSystemTestCase
Code reference¶
- class examples.ftsprocess.application.FtsProcess(env: Mapping[str, str] | None = None)[source]¶
Bases:
ProcessApplication
[UUID
]- env: ClassVar[dict[str, str]] = {'COMPRESSOR_TOPIC': 'gzip'}¶
- policy(domain_event: DomainEventProtocol[UUID], processing_event: ProcessingEvent[UUID]) None [source]¶
Abstract domain event processing policy method. Must be implemented by event processing applications. When processing the given domain event, event processing applications must use the
collect_events()
method of the givenProcessingEvent
object (not the application’ssave()
method) so that the new domain events will be recorded atomically and uniquely with tracking information about the position of the processed event in its application sequence.
- name = 'FtsProcess'¶
- topics: Sequence[str] = ()¶
- class examples.ftsprocess.test_system.TestWithSQLite(methodName='runTest')[source]¶
Bases:
ContentManagementSystemTestCase
- env: ClassVar[dict[str, str]] = {'PERSISTENCE_MODULE': 'eventsourcing.sqlite', 'PROCESS_RECORDER_TOPIC': 'examples.ftsprocess.sqlite:SQLiteFtsProcessRecorder', 'SQLITE_DBNAME': ':memory:'}¶
- class examples.ftsprocess.test_system.TestWithPostgres(methodName='runTest')[source]¶
Bases:
ContentManagementSystemTestCase
- env: ClassVar[dict[str, str]] = {'PERSISTENCE_MODULE': 'eventsourcing.postgres', 'POSTGRES_DBNAME': 'eventsourcing', 'POSTGRES_HOST': '127.0.0.1', 'POSTGRES_PASSWORD': 'eventsourcing', 'POSTGRES_PORT': '5432', 'POSTGRES_USER': 'eventsourcing', 'PROCESS_RECORDER_TOPIC': 'examples.ftsprocess.postgres:PostgresFtsProcessRecorder'}¶