eventsourcing
  • Installation
  • Support
  • Introduction
  • Tutorial
  • Modules
  • Examples
    • Example aggregates
    • Example applications
    • Example projections
      • Projections 1 - Full text search
        • Persistence
        • PostgreSQL
        • Projection
        • Test case
        • Code reference
    • Example systems
  • Release notes
eventsourcing
  • Examples
  • Projections 1 - Full text search
  • View page source

Projections 1 - Full text search¶

In this example, a projection is defined, using the projection module, that processes events from example Application 3 - Content management. The events are processed into an eventually-consistent full text search index, a searchable “materialised view” of the content of the application.

This is an example of CQRS. In this example, only the body values of the Page aggregates are indexed in the search engine. By separating the search engine “read model” from the content management “write model”, the search engine can be redesigned and rebuilt by reprocessing those events. The projected searchable content can be deleted and rebuilt, perhaps also to include page titles, or timestamps, or other information contained in the domain events such as the authors.

This is the main advantage of “CQRS” over the “inline” technique used in Application 5 - Searchable content where the search index is simply updated whenever new events are recorded.

Persistence¶

Firstly, let’s consider the “read model”.

The FtsViewInterface defines an abstract interface for searching, inserting, and updating pages in a full text search index with tracking information. Abstract methods are defined so that a projection can be defined independently of a particular database.

It defines abstract method signatures insert_pages_with_tracking() and update_pages_with_tracking() so that pages may be inserted and updated atomically in a full text search index along with tracking information.

class FtsViewInterface(FtsRecorder, TrackingRecorder, ABC):
    @abstractmethod
    def insert_pages_with_tracking(
        self, pages: Sequence[PageInfo], tracking: Tracking
    ) -> None:
        pass

    @abstractmethod
    def update_pages_with_tracking(
        self, pages: Sequence[PageInfo], tracking: Tracking
    ) -> None:
        pass

It extends both the abstract FtsRecorder class from example Application 5 - Searchable content and the library’s abstract TrackingRecorder class, so that subclasses will implement both the interface for full text search and for recording tracking information.

Abstract search method signatures are inherited from the FtsRecorder class.

PostgreSQL¶

Now let’s consider how we might implement this “read model” interface to work with a PostgreSQL database.

The PostgresFtsView implements the abstract FtsViewInterface by inheriting the PostgresFtsRecorder class from example Application 5 - Searchable content and the library’s PostgresTrackingRecorder class.

It implements the method insert_pages_with_tracking() required by FtsViewInterface by calling within a database transaction both _insert_pages() of PostgresFtsRecorder and _insert_tracking() of PostgresTrackingRecorder, so that new pages will be inserted in the full text search index atomically with tracking information.

It implements the method update_pages_with_tracking() required by FtsViewInterface by calling within a database transaction both _update_pages() of PostgresFtsRecorder and _insert_tracking() of PostgresTrackingRecorder, so that existing pages will be updated in the full text search index atomically with tracking information.

class PostgresFtsView(PostgresFtsRecorder, PostgresTrackingRecorder, FtsViewInterface):
    def insert_pages_with_tracking(
        self, pages: Sequence[PageInfo], tracking: Tracking
    ) -> None:
        with self.datastore.transaction(commit=True) as curs:
            self._insert_pages(curs, pages)
            self._insert_tracking(curs, tracking)

    def update_pages_with_tracking(
        self, pages: Sequence[PageInfo], tracking: Tracking
    ) -> None:
        with self.datastore.transaction(commit=True) as curs:
            self._update_pages(curs, pages)
            self._insert_tracking(curs, tracking)

Search method implementations are inherited from the PostgresFtsRecorder class.

Projection¶

Having defined a read model, let’s consider how the domain events of the content management application can be processed. This section uses the projection class from the projection module.

The FtsProjection class implements the library’s Projection class, the abstract base class for projections of event-sourced applications. It can be run as an event-processing component using the library’s projection runner.

Its process_event() function is coded to process the Page.Created and Page.BodyUpdated events of the domain model in Application 3 - Content management.

When a Page.Created event is received, the method insert_pages_with_tracking() of an FtsViewInterface object is called.

When a Page.BodyUpdated event is received, the method update_pages_with_tracking() of an FtsViewInterface object is called.

class FtsProjection(Projection[FtsViewInterface]):
    @singledispatchmethod
    def process_event(
        self, domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking
    ) -> None:
        self.view.insert_tracking(tracking)

    @process_event.register
    def page_created(self, domain_event: Page.Created, tracking: Tracking) -> None:
        new_page = PageInfo(
            id=domain_event.originator_id,
            slug=domain_event.slug,
            title=domain_event.title,
            body=domain_event.body,
        )
        self.view.insert_pages_with_tracking([new_page], tracking)

    @process_event.register
    def body_updated(self, domain_event: Page.BodyUpdated, tracking: Tracking) -> None:
        page_id = domain_event.originator_id
        old_page = self.view.select_page(page_id)
        new_page = PageInfo(
            id=page_id,
            slug=old_page.slug,
            title=old_page.title,
            body=apply_diff(old_page.body, domain_event.diff),
        )
        self.view.update_pages_with_tracking([new_page], tracking)

The Projection class is a generic class that requires one type variable, which is expected to be a subclass of TrackingRecorder. In this case, the type variable is specified to be FtsViewInterface, which means the projection should be constructed with a subclass of FtsViewInterface, for example PostgresFtsView.

Test case¶

The test case TestFtsProjection shows how the library’s projection runner class can be used to run the full text search projection of the content application.

The test demonstrates that the projection firstly catches up with existing content, and then continues automatically to process new content.

The test creates two pages, for ‘animals’ and for ‘plants’. Content is added to the pages. The projection is then started. The tracking recorder method wait() is called so that the search index will have been be updated with the results of processing new events before the projection is queried. After waiting for the projection to process the application’s events, the search index is queried, and the search results are checked. A third page for ‘minerals’ is then created.

A ProjectionRunner object is constructed with the application class ContentManagement, the projection class FtsProjection, and the tracking recorder class PostgresFtsView.

An environment is specified that defines persistence infrastructure for the application and the tracking recorder.

Because the projection uses a subscription, the projection will follow events from every instance of the ContentManagement application “write model” that is configured to use the same database. And because the projection is recorded in the database, it can be queried from any instance of the PostgresFtsView recorder “read model” that is configured to use the same database. To demonstrate this, separate instances of the application and the recorder are used as the “write model” and “read model” interfaces. The projection runs independently.

The application and the recorder could use different databases, but in this example they use different tables in the same PostgreSQL database.

class TestFtsProjection(unittest.TestCase):
    env: ClassVar[dict[str, str]] = {
        "CONTENTMANAGEMENT_PERSISTENCE_MODULE": "eventsourcing.postgres",
        "CONTENTMANAGEMENT_POSTGRES_DBNAME": "eventsourcing",
        "CONTENTMANAGEMENT_POSTGRES_HOST": "127.0.0.1",
        "CONTENTMANAGEMENT_POSTGRES_PORT": "5432",
        "CONTENTMANAGEMENT_POSTGRES_USER": "eventsourcing",
        "CONTENTMANAGEMENT_POSTGRES_PASSWORD": "eventsourcing",
        "FTSPROJECTION_PERSISTENCE_MODULE": "eventsourcing.postgres",
        "FTSPROJECTION_POSTGRES_DBNAME": "eventsourcing",
        "FTSPROJECTION_POSTGRES_HOST": "127.0.0.1",
        "FTSPROJECTION_POSTGRES_PORT": "5432",
        "FTSPROJECTION_POSTGRES_USER": "eventsourcing",
        "FTSPROJECTION_POSTGRES_PASSWORD": "eventsourcing",
    }

    def test(self) -> None:
        # Construct an instance of the application ("write model").
        write_model = ContentManagement(env=self.env)

        # Construct tracking recorder ("read model").
        read_model = PostgresFtsView(
            datastore=PostgresDatastore(
                dbname="eventsourcing",
                host="127.0.0.1",
                port=5432,
                user="eventsourcing",
                password="eventsourcing",  # noqa:  S106
            ),
            tracking_table_name="ftsprojection_tracking",
        )
        read_model.create_table()

        # Create some content in the write model.
        user_id = uuid4()
        user_id_cvar.set(user_id)
        write_model.create_page(title="Animals", slug="animals")
        write_model.update_body(slug="animals", body="cat dog zebra")
        write_model.create_page(title="Plants", slug="plants")
        notification_id = write_model.update_body(
            slug="plants", body="bluebell rose jasmine"
        )

        # Wait for the content to be processed (should time out).
        with self.assertRaises(TimeoutError):
            read_model.wait(write_model.name, notification_id)

        # Search in the read model, expect no results.
        self.assertEqual(0, len(read_model.search("dog")))
        self.assertEqual(0, len(read_model.search("rose")))
        self.assertEqual(0, len(read_model.search("zinc")))

        # Run the projection (independently of read and write model objects).
        _ = ProjectionRunner(
            application_class=ContentManagement,
            projection_class=FtsProjection,
            view_class=PostgresFtsView,
            env=self.env,
        )

        # Wait for content to be processed (projection catches up).
        read_model.wait(write_model.name, notification_id)

        # Search in the read model, expect results.
        pages = read_model.search("dog")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0].slug, "animals")
        self.assertEqual(pages[0].body, "cat dog zebra")

        pages = read_model.search("rose")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0].slug, "plants")
        self.assertEqual(pages[0].body, "bluebell rose jasmine")

        pages = read_model.search("zinc")
        self.assertEqual(0, len(pages))

        # Search for multiple words in same page.
        pages = read_model.search("dog cat")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0].slug, "animals")
        self.assertEqual(pages[0].body, "cat dog zebra")

        # Search for multiple words in same page, expect no results.
        pages = read_model.search("rose zebra")
        self.assertEqual(0, len(pages))

        # Search for alternative words, expect two results.
        pages = read_model.search("rose OR zebra")
        self.assertEqual(2, len(pages))
        self.assertEqual({"animals", "plants"}, {p.slug for p in pages})

        # Create some more content.
        write_model.create_page(title="Minerals", slug="minerals")
        notification_id = write_model.update_body(
            slug="minerals", body="iron zinc calcium"
        )

        # Wait for content to be processed (projection continues processing).
        read_model.wait(write_model.name, notification_id)

        # Search for the new content in the read model.
        pages = read_model.search("zinc")
        self.assertEqual(1, len(pages))
        self.assertEqual(pages[0].slug, "minerals")
        self.assertEqual(pages[0].body, "iron zinc calcium")

    def setUp(self) -> None:
        drop_tables()
        super().setUp()

    def tearDown(self) -> None:
        super().tearDown()
        drop_tables()

Code reference¶

class examples.ftsprojection.projection.FtsViewInterface[source]¶

Bases: FtsRecorder, TrackingRecorder, ABC

abstractmethod insert_pages_with_tracking(pages: Sequence[PageInfo], tracking: Tracking) → None[source]¶
abstractmethod update_pages_with_tracking(pages: Sequence[PageInfo], tracking: Tracking) → None[source]¶
class examples.ftsprojection.projection.FtsProjection(view: TTrackingRecorder)[source]¶

Bases: Projection[FtsViewInterface]

process_event(domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking) → None[source]¶
process_event(domain_event: Created, tracking: Tracking) → None
process_event(domain_event: BodyUpdated, tracking: Tracking) → None

Process a domain event and track it.

page_created(domain_event: Created, tracking: Tracking) → None[source]¶
body_updated(domain_event: BodyUpdated, tracking: Tracking) → None[source]¶
name: str = 'FtsProjection'¶

Name of projection, used to pick prefixed environment variables and define database table names.

class examples.ftsprojection.projection.PostgresFtsView(datastore: PostgresDatastore, fts_table_name: str = 'ftsprojection', **kwargs: Any)[source]¶

Bases: PostgresFtsRecorder, PostgresTrackingRecorder, FtsViewInterface

insert_pages_with_tracking(pages: Sequence[PageInfo], tracking: Tracking) → None[source]¶
update_pages_with_tracking(pages: Sequence[PageInfo], tracking: Tracking) → None[source]¶
class examples.ftsprojection.test_projection.TestFtsProjection(methodName='runTest')[source]¶

Bases: TestCase

env: ClassVar[dict[str, str]] = {'CONTENTMANAGEMENT_PERSISTENCE_MODULE': 'eventsourcing.postgres', 'CONTENTMANAGEMENT_POSTGRES_DBNAME': 'eventsourcing', 'CONTENTMANAGEMENT_POSTGRES_HOST': '127.0.0.1', 'CONTENTMANAGEMENT_POSTGRES_PASSWORD': 'eventsourcing', 'CONTENTMANAGEMENT_POSTGRES_PORT': '5432', 'CONTENTMANAGEMENT_POSTGRES_USER': 'eventsourcing', 'FTSPROJECTION_PERSISTENCE_MODULE': 'eventsourcing.postgres', 'FTSPROJECTION_POSTGRES_DBNAME': 'eventsourcing', 'FTSPROJECTION_POSTGRES_HOST': '127.0.0.1', 'FTSPROJECTION_POSTGRES_PASSWORD': 'eventsourcing', 'FTSPROJECTION_POSTGRES_PORT': '5432', 'FTSPROJECTION_POSTGRES_USER': 'eventsourcing'}¶
test() → None[source]¶
setUp() → None[source]¶

Hook method for setting up the test fixture before exercising it.

tearDown() → None[source]¶

Hook method for deconstructing the test fixture after testing it.

Previous Next

© Copyright 2025, John Bywater.

Built with Sphinx using a theme provided by Read the Docs.