Projections 1 - Full text search¶
In this example, a projection is defined, using the projection module, that processes events from example Application 3 - Content management. The events are processed into an eventually-consistent full text search index, a searchable “materialised view” of the content of the application.
This is an example of CQRS. In this example, only the body
values of the Page
aggregates are indexed in the search engine.
By separating the search engine “read model” from the content management “write model”, the search engine can
be redesigned and rebuilt by reprocessing those events. The projected searchable content can be deleted and
rebuilt, perhaps also to include page titles, or timestamps, or other information contained in the domain events
such as the authors.
This is the main advantage of “CQRS” over the “inline” technique used in Application 5 - Searchable content where the search index is simply updated whenever new events are recorded.
Persistence¶
Firstly, let’s consider the “read model”.
The FtsViewInterface
defines an abstract interface for
searching, inserting, and updating pages in a full text search index with tracking information. Abstract
methods are defined so that a projection can be defined independently of a particular database.
It defines abstract method signatures insert_pages_with_tracking()
and update_pages_with_tracking()
so that
pages may be inserted and updated atomically in a full text search index along with tracking information.
class FtsViewInterface(FtsRecorder, TrackingRecorder, ABC):
@abstractmethod
def insert_pages_with_tracking(
self, pages: Sequence[PageInfo], tracking: Tracking
) -> None:
pass
@abstractmethod
def update_pages_with_tracking(
self, pages: Sequence[PageInfo], tracking: Tracking
) -> None:
pass
It extends both the abstract FtsRecorder
class
from example Application 5 - Searchable content and the library’s abstract
TrackingRecorder
class, so that subclasses will implement
both the interface for full text search and for recording tracking information.
Abstract search method signatures are inherited from the FtsRecorder
class.
PostgreSQL¶
Now let’s consider how we might implement this “read model” interface to work with a PostgreSQL database.
The PostgresFtsView
implements the
abstract FtsViewInterface
by inheriting
the PostgresFtsRecorder
class from example
Application 5 - Searchable content and the library’s
PostgresTrackingRecorder
class.
It implements the method insert_pages_with_tracking()
required by FtsViewInterface
by calling within a database transaction both
_insert_pages()
of PostgresFtsRecorder
and _insert_tracking()
of PostgresTrackingRecorder
,
so that new pages will be inserted in the full text search index atomically with tracking information.
It implements the method update_pages_with_tracking()
required by FtsViewInterface
by calling within a database transaction both
_update_pages()
of PostgresFtsRecorder
and _insert_tracking()
of PostgresTrackingRecorder
,
so that existing pages will be updated in the full text search index atomically with tracking information.
class PostgresFtsView(PostgresFtsRecorder, PostgresTrackingRecorder, FtsViewInterface):
def insert_pages_with_tracking(
self, pages: Sequence[PageInfo], tracking: Tracking
) -> None:
with self.datastore.transaction(commit=True) as curs:
self._insert_pages(curs, pages)
self._insert_tracking(curs, tracking)
def update_pages_with_tracking(
self, pages: Sequence[PageInfo], tracking: Tracking
) -> None:
with self.datastore.transaction(commit=True) as curs:
self._update_pages(curs, pages)
self._insert_tracking(curs, tracking)
Search method implementations are inherited from the PostgresFtsRecorder
class.
Projection¶
Having defined a read model, let’s consider how the domain events of the content management application can be processed. This section uses the projection class from the projection module.
The FtsProjection
class implements the library’s
Projection
class, the abstract base class for projections
of event-sourced applications. It can be run as an event-processing component using
the library’s projection runner.
Its process_event()
function is coded to
process the Page.Created
and
Page.BodyUpdated
events of the domain model in Application 3 - Content management.
When a Page.Created
event is received,
the method insert_pages_with_tracking()
of an FtsViewInterface
object is called.
When a Page.BodyUpdated
event is received,
the method update_pages_with_tracking()
of an FtsViewInterface
object is called.
class FtsProjection(Projection[FtsViewInterface]):
@singledispatchmethod
def process_event(
self, domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking
) -> None:
self.view.insert_tracking(tracking)
@process_event.register
def page_created(self, domain_event: Page.Created, tracking: Tracking) -> None:
new_page = PageInfo(
id=domain_event.originator_id,
slug=domain_event.slug,
title=domain_event.title,
body=domain_event.body,
)
self.view.insert_pages_with_tracking([new_page], tracking)
@process_event.register
def body_updated(self, domain_event: Page.BodyUpdated, tracking: Tracking) -> None:
page_id = domain_event.originator_id
old_page = self.view.select_page(page_id)
new_page = PageInfo(
id=page_id,
slug=old_page.slug,
title=old_page.title,
body=apply_diff(old_page.body, domain_event.diff),
)
self.view.update_pages_with_tracking([new_page], tracking)
The Projection
class is a generic class that requires one type variable, which
is expected to be a subclass of TrackingRecorder
. In this case, the type variable
is specified to be FtsViewInterface
, which means the projection
should be constructed with a subclass of FtsViewInterface
,
for example PostgresFtsView
.
Test case¶
The test case TestFtsProjection
shows how the
library’s projection runner class can be used to run the full text search
projection of the content application.
The test demonstrates that the projection firstly catches up with existing content, and then continues automatically to process new content.
The test creates two pages, for ‘animals’ and for ‘plants’. Content is added to the pages.
The projection is then started. The tracking recorder method wait()
is called so that the search index will have been be updated with the results of processing new events before the
projection is queried. After waiting for the projection to process the application’s events,
the search index is queried, and the search results are checked. A third
page for ‘minerals’ is then created.
A ProjectionRunner
object is constructed with the
application class ContentManagement
,
the projection class FtsProjection
, and the
tracking recorder class PostgresFtsView
.
An environment is specified that defines persistence infrastructure for the application and the tracking recorder.
Because the projection uses a subscription, the projection will follow events from every instance
of the ContentManagement
application “write model”
that is configured to use the same database. And because the projection is recorded in the database,
it can be queried from any instance of the PostgresFtsView
recorder “read model” that is configured to use the same database. To demonstrate this, separate instances
of the application and the recorder are used as the “write model” and “read model” interfaces. The projection
runs independently.
The application and the recorder could use different databases, but in this example they use different tables in the same PostgreSQL database.
class TestFtsProjection(unittest.TestCase):
env: ClassVar[dict[str, str]] = {
"CONTENTMANAGEMENT_PERSISTENCE_MODULE": "eventsourcing.postgres",
"CONTENTMANAGEMENT_POSTGRES_DBNAME": "eventsourcing",
"CONTENTMANAGEMENT_POSTGRES_HOST": "127.0.0.1",
"CONTENTMANAGEMENT_POSTGRES_PORT": "5432",
"CONTENTMANAGEMENT_POSTGRES_USER": "eventsourcing",
"CONTENTMANAGEMENT_POSTGRES_PASSWORD": "eventsourcing",
"FTSPROJECTION_PERSISTENCE_MODULE": "eventsourcing.postgres",
"FTSPROJECTION_POSTGRES_DBNAME": "eventsourcing",
"FTSPROJECTION_POSTGRES_HOST": "127.0.0.1",
"FTSPROJECTION_POSTGRES_PORT": "5432",
"FTSPROJECTION_POSTGRES_USER": "eventsourcing",
"FTSPROJECTION_POSTGRES_PASSWORD": "eventsourcing",
}
def test(self) -> None:
# Construct an instance of the application ("write model").
write_model = ContentManagement(env=self.env)
# Construct tracking recorder ("read model").
read_model = PostgresFtsView(
datastore=PostgresDatastore(
dbname="eventsourcing",
host="127.0.0.1",
port=5432,
user="eventsourcing",
password="eventsourcing", # noqa: S106
),
tracking_table_name="ftsprojection_tracking",
)
read_model.create_table()
# Create some content in the write model.
user_id = uuid4()
user_id_cvar.set(user_id)
write_model.create_page(title="Animals", slug="animals")
write_model.update_body(slug="animals", body="cat dog zebra")
write_model.create_page(title="Plants", slug="plants")
notification_id = write_model.update_body(
slug="plants", body="bluebell rose jasmine"
)
# Wait for the content to be processed (should time out).
with self.assertRaises(TimeoutError):
read_model.wait(write_model.name, notification_id)
# Search in the read model, expect no results.
self.assertEqual(0, len(read_model.search("dog")))
self.assertEqual(0, len(read_model.search("rose")))
self.assertEqual(0, len(read_model.search("zinc")))
# Run the projection (independently of read and write model objects).
_ = ProjectionRunner(
application_class=ContentManagement,
projection_class=FtsProjection,
view_class=PostgresFtsView,
env=self.env,
)
# Wait for content to be processed (projection catches up).
read_model.wait(write_model.name, notification_id)
# Search in the read model, expect results.
pages = read_model.search("dog")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0].slug, "animals")
self.assertEqual(pages[0].body, "cat dog zebra")
pages = read_model.search("rose")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0].slug, "plants")
self.assertEqual(pages[0].body, "bluebell rose jasmine")
pages = read_model.search("zinc")
self.assertEqual(0, len(pages))
# Search for multiple words in same page.
pages = read_model.search("dog cat")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0].slug, "animals")
self.assertEqual(pages[0].body, "cat dog zebra")
# Search for multiple words in same page, expect no results.
pages = read_model.search("rose zebra")
self.assertEqual(0, len(pages))
# Search for alternative words, expect two results.
pages = read_model.search("rose OR zebra")
self.assertEqual(2, len(pages))
self.assertEqual({"animals", "plants"}, {p.slug for p in pages})
# Create some more content.
write_model.create_page(title="Minerals", slug="minerals")
notification_id = write_model.update_body(
slug="minerals", body="iron zinc calcium"
)
# Wait for content to be processed (projection continues processing).
read_model.wait(write_model.name, notification_id)
# Search for the new content in the read model.
pages = read_model.search("zinc")
self.assertEqual(1, len(pages))
self.assertEqual(pages[0].slug, "minerals")
self.assertEqual(pages[0].body, "iron zinc calcium")
def setUp(self) -> None:
drop_tables()
super().setUp()
def tearDown(self) -> None:
super().tearDown()
drop_tables()
Code reference¶
- class examples.ftsprojection.projection.FtsViewInterface[source]¶
Bases:
FtsRecorder
,TrackingRecorder
,ABC
- class examples.ftsprojection.projection.FtsProjection(view: TTrackingRecorder)[source]¶
Bases:
Projection
[FtsViewInterface
]- process_event(domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking) None [source]¶
- process_event(domain_event: Created, tracking: Tracking) None
- process_event(domain_event: BodyUpdated, tracking: Tracking) None
Process a domain event and track it.
- body_updated(domain_event: BodyUpdated, tracking: Tracking) None [source]¶
- name: str = 'FtsProjection'¶
Name of projection, used to pick prefixed environment variables and define database table names.
- class examples.ftsprojection.projection.PostgresFtsView(datastore: PostgresDatastore, fts_table_name: str = 'ftsprojection', **kwargs: Any)[source]¶
Bases:
PostgresFtsRecorder
,PostgresTrackingRecorder
,FtsViewInterface
- class examples.ftsprojection.test_projection.TestFtsProjection(methodName='runTest')[source]¶
Bases:
TestCase
- env: ClassVar[dict[str, str]] = {'CONTENTMANAGEMENT_PERSISTENCE_MODULE': 'eventsourcing.postgres', 'CONTENTMANAGEMENT_POSTGRES_DBNAME': 'eventsourcing', 'CONTENTMANAGEMENT_POSTGRES_HOST': '127.0.0.1', 'CONTENTMANAGEMENT_POSTGRES_PASSWORD': 'eventsourcing', 'CONTENTMANAGEMENT_POSTGRES_PORT': '5432', 'CONTENTMANAGEMENT_POSTGRES_USER': 'eventsourcing', 'FTSPROJECTION_PERSISTENCE_MODULE': 'eventsourcing.postgres', 'FTSPROJECTION_POSTGRES_DBNAME': 'eventsourcing', 'FTSPROJECTION_POSTGRES_HOST': '127.0.0.1', 'FTSPROJECTION_POSTGRES_PASSWORD': 'eventsourcing', 'FTSPROJECTION_POSTGRES_PORT': '5432', 'FTSPROJECTION_POSTGRES_USER': 'eventsourcing'}¶