System 1 - Content management system¶
In this example, event notifications from the ContentManagementApplication
from
Application 3 - Content management are processed and projected into an
eventually-consistent full text search index, a searchable “materialized view” of
the pages’ body text just like Application 5 - Searchable content.
This is an example of CQRS. By separating the search engine “read model” from the content management “write model”, the commands that update pages will perform faster. But, more importantly, the search engine can be redesigned and rebuilt by reprocessing those events. The projected searchable content can be deleted and rebuilt, perhaps also to include page titles, or timestamps, or other information contained in the domain events such as the authors, because it is updated by processing events. This is the main advantage of “CQRS” over the “inline” technique used in Application 5 - Searchable content where the search index is simply updated whenever new events are recorded. Please note, it is possible to migrate from the “inline” technique to CQRS, by adding the downstream processing and then removing the inline updating, since the domain model is already event sourced. Similarly, other projections can be added to work alongside and concurrently with the updating of the search engine.
Application¶
The SearchIndexApplication
defined below is a ProcessApplication
.
Its policy()
function is coded to process the Page.Created
and Page.BodyUpdated
domain
events of the ContentManagementApplication
. It also has a search()
method that returns
a list of page IDs.
The SearchIndexApplication
class in this example works in a similar way to the SearchableContentApplication
class in Application 5 - Searchable content, by setting variable keyword arguments
insert_pages
and update_pages
on a the ProcessingEvent
object.
However, rather than populating the variable keyword arguments in the save()
method, it populates insert_pages
and update_pages
within its policy()
function. The insert_pages
and update_pages
arguments are set
on the ProcessingEvent
object passed into the policy()
function, which carries an event notification ID that indicates the position in the application sequence of
the domain event that is being processed.
The application will be configured to run with a custom ProcessRecorder
so that search index records will be updated atomically with the inserting of a tracking record which
indicates which upstream event notification has been processed.
Because the Page.BodyUpdated
event carries only the diff
of the page body, the
policy()
function must first select the current page body from its own records
and then apply the diff as a patch. The “exactly once” semantics provided by the library’s
system module guarantees that the diffs will always be applied in the correct order. Without
this guarantee, the projection could become inconsistent, with the consequence that the diffs
will fail to be applied.
from __future__ import annotations
from typing import List, cast
from uuid import UUID
from eventsourcing.application import ProcessingEvent
from eventsourcing.domain import DomainEventProtocol
from eventsourcing.examples.contentmanagement.domainmodel import Page
from eventsourcing.examples.contentmanagement.utils import apply_patch
from eventsourcing.examples.searchablecontent.persistence import (
SearchableContentRecorder,
)
from eventsourcing.system import ProcessApplication
class SearchIndexApplication(ProcessApplication):
env = {
"COMPRESSOR_TOPIC": "gzip",
}
def policy(
self,
domain_event: DomainEventProtocol,
processing_event: ProcessingEvent,
) -> None:
if isinstance(domain_event, Page.Created):
processing_event.saved_kwargs["insert_pages"] = [
(
domain_event.originator_id,
domain_event.slug,
domain_event.title,
domain_event.body,
)
]
elif isinstance(domain_event, Page.BodyUpdated):
recorder = cast(SearchableContentRecorder, self.recorder)
page_id = domain_event.originator_id
page_slug, page_title, page_body = recorder.select_page(page_id)
page_body = apply_patch(page_body, domain_event.diff)
processing_event.saved_kwargs["update_pages"] = [
(
page_id,
page_slug,
page_title,
page_body,
)
]
def search(self, query: str) -> List[UUID]:
recorder = cast(SearchableContentRecorder, self.recorder)
return recorder.search_pages(query)
System¶
A System
of applications is defined, in which the
SearchIndexApplication
follows the ContentManagementApplication
. This system
can then be used in any Runner
.
from __future__ import annotations
from eventsourcing.examples.contentmanagement.application import (
ContentManagementApplication,
)
from eventsourcing.examples.contentmanagementsystem.application import (
SearchIndexApplication,
)
from eventsourcing.system import System
class ContentManagementSystem(System):
def __init__(self) -> None:
super().__init__(pipes=[[ContentManagementApplication, SearchIndexApplication]])
PostgreSQL¶
The PostgresSearchableContentRecorder
from Application 5 - Searchable content
is used to define a custom ProcessRecorder
for PostgreSQL.
The PostgreSQL Factory
class is extended to involve this custom recorder
in a custom persistence module so that it can be used by the SearchIndexApplication
.
from eventsourcing.examples.searchablecontent.postgres import (
PostgresSearchableContentRecorder,
)
from eventsourcing.postgres import Factory, PostgresProcessRecorder
class SearchableContentProcessRecorder(
PostgresSearchableContentRecorder, PostgresProcessRecorder
):
pass
class SearchableContentInfrastructureFactory(Factory):
process_recorder_class = SearchableContentProcessRecorder
del Factory
SQLite¶
The SqliteSearchableContentRecorder
from Application 5 - Searchable content
is used to define a custom ProcessRecorder
for SQLite.
The SQLite Factory
class is extended to involve this custom recorder
in a custom persistence module so that it can be used by the SearchIndexApplication
.
from eventsourcing.examples.searchablecontent.sqlite import (
SQLiteSearchableContentRecorder,
)
from eventsourcing.sqlite import Factory, SQLiteProcessRecorder
class SearchableContentProcessRecorder(
SQLiteSearchableContentRecorder, SQLiteProcessRecorder
):
pass
class SearchableContentInfrastructureFactory(Factory):
process_recorder_class = SearchableContentProcessRecorder
del Factory
Test case¶
The test case ContentManagementSystemTestCase
creates three pages, for ‘animals’, ‘plants’
and ‘minerals’. Content is added to the pages. The content is searched with various queries and
the search results are checked. The test is executed twice, once with the application configured
for both PostgreSQL, and once for SQLite.
from __future__ import annotations
from typing import Dict
from unittest import TestCase
from uuid import uuid4
from eventsourcing.examples.contentmanagement.application import (
ContentManagementApplication,
)
from eventsourcing.examples.contentmanagement.domainmodel import user_id_cvar
from eventsourcing.examples.contentmanagementsystem.application import (
SearchIndexApplication,
)
from eventsourcing.examples.contentmanagementsystem.system import (
ContentManagementSystem,
)
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.system import SingleThreadedRunner
from eventsourcing.tests.postgres_utils import drop_postgres_table
class ContentManagementSystemTestCase(TestCase):
env: Dict[str, str] = {}
def test_system(self) -> None:
runner = SingleThreadedRunner(system=ContentManagementSystem(), env=self.env)
runner.start()
content_management_app = runner.get(ContentManagementApplication)
search_index_app = runner.get(SearchIndexApplication)
# Set user_id context variable.
user_id = uuid4()
user_id_cvar.set(user_id)
# Create empty pages.
content_management_app.create_page(title="Animals", slug="animals")
content_management_app.create_page(title="Plants", slug="plants")
content_management_app.create_page(title="Minerals", slug="minerals")
# Search, expect no results.
self.assertEqual(0, len(search_index_app.search("cat")))
self.assertEqual(0, len(search_index_app.search("rose")))
self.assertEqual(0, len(search_index_app.search("calcium")))
# Update the pages.
content_management_app.update_body(slug="animals", body="cat")
content_management_app.update_body(slug="plants", body="rose")
content_management_app.update_body(slug="minerals", body="calcium")
# Search for single words.
page_ids = search_index_app.search("cat")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "animals")
self.assertEqual(page["body"], "cat")
page_ids = search_index_app.search("rose")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "plants")
self.assertEqual(page["body"], "rose")
page_ids = search_index_app.search("calcium")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "minerals")
self.assertEqual(page["body"], "calcium")
self.assertEqual(len(search_index_app.search("dog")), 0)
self.assertEqual(len(search_index_app.search("bluebell")), 0)
self.assertEqual(len(search_index_app.search("zinc")), 0)
# Update the pages again.
content_management_app.update_body(slug="animals", body="cat dog zebra")
content_management_app.update_body(slug="plants", body="bluebell rose jasmine")
content_management_app.update_body(slug="minerals", body="iron zinc calcium")
# Search for single words.
page_ids = search_index_app.search("cat")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "animals")
self.assertEqual(page["body"], "cat dog zebra")
page_ids = search_index_app.search("rose")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "plants")
self.assertEqual(page["body"], "bluebell rose jasmine")
page_ids = search_index_app.search("calcium")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "minerals")
self.assertEqual(page["body"], "iron zinc calcium")
page_ids = search_index_app.search("dog")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "animals")
self.assertEqual(page["body"], "cat dog zebra")
page_ids = search_index_app.search("bluebell")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "plants")
self.assertEqual(page["body"], "bluebell rose jasmine")
page_ids = search_index_app.search("zinc")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "minerals")
self.assertEqual(page["body"], "iron zinc calcium")
# Search for multiple words in same page.
page_ids = search_index_app.search("dog cat")
self.assertEqual(1, len(page_ids))
page = content_management_app.get_page_by_id(page_ids[0])
self.assertEqual(page["slug"], "animals")
self.assertEqual(page["body"], "cat dog zebra")
# Search for multiple words in same page, expect no results.
page_ids = search_index_app.search("rose zebra")
self.assertEqual(0, len(page_ids))
# Search for alternative words, expect two results.
page_ids = search_index_app.search("rose OR zebra")
pages = [content_management_app.get_page_by_id(page_id) for page_id in page_ids]
self.assertEqual(2, len(pages))
self.assertEqual(["animals", "plants"], sorted(p["slug"] for p in pages))
class TestWithSQLite(ContentManagementSystemTestCase):
env = {
"PERSISTENCE_MODULE": "eventsourcing.examples.contentmanagementsystem.sqlite",
"SQLITE_DBNAME": ":memory:",
}
class TestWithPostgres(ContentManagementSystemTestCase):
env = {
"PERSISTENCE_MODULE": "eventsourcing.examples.contentmanagementsystem.postgres",
"POSTGRES_DBNAME": "eventsourcing",
"POSTGRES_HOST": "127.0.0.1",
"POSTGRES_PORT": "5432",
"POSTGRES_USER": "eventsourcing",
"POSTGRES_PASSWORD": "eventsourcing",
}
def setUp(self) -> None:
super().setUp()
self.drop_tables()
def tearDown(self) -> None:
self.drop_tables()
super().tearDown()
def drop_tables(self) -> None:
db = PostgresDatastore(
self.env["POSTGRES_DBNAME"],
self.env["POSTGRES_HOST"],
self.env["POSTGRES_PORT"],
self.env["POSTGRES_USER"],
self.env["POSTGRES_PASSWORD"],
)
drop_postgres_table(db, "public.contentmanagementapplication_events")
drop_postgres_table(db, "public.pages_projection_example")
drop_postgres_table(db, "public.searchindexapplication_events")
drop_postgres_table(db, "public.searchindexapplication_tracking")
db.close()
del ContentManagementSystemTestCase