=============================== Tutorial - Part 4 - Projections =============================== This part of the tutorial shows how the state of an event-sourced application can be projected into a `materialised view `_ that supports arbitrary queries. As we saw in :doc:`Part 3 `, we can use the library's :class:`~eventsourcing.application.Application` class to define an event-sourced application. The state of an event-sourced application is recorded as a set of events, each of which is positioned in one of many "aggregate sequences" and also in the overall "application sequence". We can reconstruct an aggregate by selecting events from an aggregate sequence, and we can propagate the state of an application by following its application sequence. But we can only query the current state of an event-sourced application in other ways by firstly projecting the application state into a "read model" that is designed to support such queries. So that such queries can be performed quickly, it is useful to project the state of an event-sourced application into a read model that is stored in a database. Such "materialised views" are usually prepared and updated by processing the events of the event-sourced application in a separate event-processing component, which subscribes to the application sequence so that it can catch-up and then continue as further events are recorded. In the sections below we will explore how the events of event-sourced applications can be processed into materialised views that support queries that are not directly supported by event-sourced applications. View classes ============ Materialised views support queries of the state of an event-sourced application. Therefore, every materialised view will need to have at least one query method. Materialised views will also need to be updated when events are processed, and so a materialised view will also need to have at least one command method. A materialised view is typically expected to be a reliable deterministic projection of the state of an event-sourced application. To achieve this, we need to record the position of each processed event uniquely, and record it atomically along with the updating of the materialised view. This ensures the results of processing an event are never successfully recorded more than once. And so that event processing can be resumed correctly, we also need to get the position of the last successfully processed event. Because of the arbitrary nature of materialised views, the recording and reporting of the positions of processed events are the only common aspects shared by all materialised views. In this sense, all materialised views are special kinds of :ref:`tracking recorders `. And so we can define an interface for a materialised view using the library's abstract :class:`~eventsourcing.persistence.TrackingRecorder` class. The interface will define how the materialised view will be queried and how events will be processed, independently of any concrete implementation. We can then develop concrete implementations of this interface by also extending the library's concrete tracking recorder classes: :class:`~eventsourcing.popo.POPOTrackingRecorder`, :class:`~eventsourcing.sqlite.SQLiteTrackingRecorder` and :class:`~eventsourcing.postgres.PostgresTrackingRecorder`. The concrete tracking recorders help with recording the positions of processed events uniquely and atomically in a particular database. To show how this can work, let's create a simple materialised view, for counting events, that can record and reveal the number of :ref:`"created" events ` and the number of :ref:`subsequent events ` that have been generated by an event-sourced application. View test ========= Let's begin by writing a test. The ``EventCountersViewTestCase`` shown below expresses requirements for recording and revealing the number of events generated by an event-sourced application. The test expects this view object: * to be an instance of :class:`~eventsourcing.persistence.TrackingRecorder`, and thereby to have a :func:`~eventsourcing.persistence.TrackingRecorder.max_tracking_id` method. This will support resuming to process events from the correct position. * to have command and query methods for incrementing and obtaining the recorded number of events. The values returned by the query methods are expected to be equal to the number of times the corresponding command methods have been called. * to have command methods with a :ref:`tracking object ` parameter. The value returned by :func:`~eventsourcing.persistence.TrackingRecorder.max_tracking_id` is expected to reflect which tracking objects have been recorded by the command methods. The command methods are expected to raise an :class:`~eventsourcing.persistence.IntegrityError` when called more than once with the same tracking object. The counted numbers are expected not to change when an :class:`~eventsourcing.persistence.IntegrityError` is raised by a command method. * to have a :func:`~eventsourcing.persistence.TrackingRecorder.wait` method that returns normally without raising an exception when called with a position that has been recorded. A ``TimeoutError`` is expected to be raised when :func:`~eventsourcing.persistence.TrackingRecorder.wait` is called with a position greater than those which have been recorded. The test begins by constructing an instance of ``EventCountersView`` — to be defined below. .. literalinclude:: ../../../eventsourcing/tests/projection.py :pyobject: EventCountersViewTestCase View example ============ Towards satisfying this test, let's define ``EventCountersView``. We can extend the library's abstract base class :class:`~eventsourcing.persistence.TrackingRecorder` and define abstract command and query methods with names and signatures expected by the test above: * the command methods ``incr_created_event_counter()`` and ``incr_subsequent_event_counter()`` have a ``tracking`` parameter, typed with the library's :class:`~eventsourcing.persistence.Tracking` class. * the query methods ``get_created_event_counter()`` and ``get_subsequent_event_counter()`` return integers. .. literalinclude:: ../../../eventsourcing/tests/projection.py :pyobject: EventCountersView We will implement this interface twice: once to create a materialised view stored in memory, and then again to create a materialised view stored in Postgres. View in memory ============== The ``POPOEventCounters`` class implements ``EventCountersView`` using Python objects to hold the counted numbers of events. It defines "private" attributes ``_created_event_counter`` and ``_subsequent_event_counter`` as Python :class:`int` objects initialised to zero. The values of these attributes are incremented by the command methods and returned by the query methods. It inherits the :class:`~eventsourcing.popo.POPOTrackingRecorder` class, using its database lock to serialise commands, and the inherited methods :func:`_assert_tracking_uniqueness() ` and :func:`_insert_tracking() ` to avoid events being counted more than once. .. literalinclude:: ../../../tests/projection_tests/test_projection_with_popo.py :pyobject: POPOEventCounters The test case ``TestPOPOEventCounters`` extends will show whether the ``POPOEventCounters`` class satisfies our requirements. .. literalinclude:: ../../../tests/projection_tests/test_projection_with_popo.py :pyobject: TestPOPOEventCounters View in Postgres ================ The ``PostgresEventCounters`` class also implements ``EventCountersView``. It defines a database table, and SQL statements for incrementing and selecting event counters. The ``_select_counter()`` method is used to select the current value of an event counter. The ``_incr_counter()`` method is used to increment a named event counter atomically in the same database transaction as a tracking object is recorded. It extends the :class:`~eventsourcing.postgres.PostgresTrackingRecorder` class, using the :func:`~eventsourcing.postgres.PostgresDatastore.transaction` method to implement atomic transactions, and the inherited method :func:`~eventsourcing.postgres.PostgresTrackingRecorder._insert_tracking` to avoid events being counted more than once. .. literalinclude:: ../../../tests/projection_tests/test_projection_with_postgres.py :pyobject: PostgresEventCounters The test case ``TestPostgresEventCounters`` will show whether the ``PostgresEventCounters`` class satisfies our requirements. This test uses the library's :class:`~eventsourcing.persistence.InfrastructureFactory` to construct the materialised view object. .. literalinclude:: ../../../tests/projection_tests/test_projection_with_postgres.py :pyobject: TestPostgresEventCounters Projections =========== Let’s now consider how the events of an event-sourced application can be processed. The library's generic abstract base class :class:`~eventsourcing.projection.Projection` can be used to define how the domain events of an event-sourced application will be processed. It is intended to be subclassed by users. The source code of the :class:`~eventsourcing.projection.Projection` class is included here for clarity. .. literalinclude:: ../../../eventsourcing/projection.py :pyobject: Projection The :class:`~eventsourcing.projection.Projection` class is a `generic` class because it has a type parameter, which is expected to be the interface type of a materialised view, with an upper bound for the type argument being :class:`~eventsourcing.persistence.TrackingRecorder`. The constructor argument :func:`view ` is used to initialise the :data:`~eventsourcing.projection.Projection.view` property. The annotated type of the constructor argument and the property is bound to the type parameter. The given value of the constructor argument, and therefore the value of the property, should be a concrete instance of a materialised view. The :class:`~eventsourcing.projection.Projection` class is an `abstract` class because it defines an abstract method :func:`~eventsourcing.projection.Projection.process_event` that must be implemented by subclasses. Events will typically be processed by calling methods on :data:`~eventsourcing.projection.Projection.view`. To inform IDE code insight tooling, command completion, and static type checking, The type argument should be specified by users when defining a subclass of :class:`~eventsourcing.projection.Projection`. The :class:`~eventsourcing.projection.Projection` class has a :py:attr:`~eventsourcing.projection.Projection.name` attribute, which can be set on subclasses. It is expected to be a Python :class:`str`. It is used by projection runners, when constructing a materialised view object, to select prefixed environment variables, and in some cases to specify database table names used by the materialised view. The value used by the projection runner defaults to the class name of the subclass. The :class:`~eventsourcing.projection.Projection` class has a :py:attr:`~eventsourcing.projection.Projection.topics` attribute, which can be set on subclasses. It is expected to be a Python :class:`tuple` of :class:`str` objects, that contains domain events :ref:`topics `. Unless empty or undefined, projection runners will use these topics when subscribing to an application, so that events can be filtered in the application database. The subscription will then only yield events that have topics mentioned by this attribute. In many cases this will improve performance when running a projection, by avoiding the cost of transporting and reconstructing events that will be ignored by the projection. In some cases, filtering events by topic in this way will be necessary to avoid errors caused by attempting to reconstruct domain event objects that have been recorded in the database which either your code or the library code is not capable of reconstructing. Projection example ================== The ``AggregateEventCountersProjection`` class processes events of an event-sourced application. It inherits the :class:`~eventsourcing.projection.Projection` class, the type argument is ``EventCountersView``. It implements :func:`~eventsourcing.projection.Projection.process_event` by calling ``incr_created_event_counter()`` for :class:`Aggregate.Created ` events and ``incr_subsequent_event_counter()`` for :class:`Aggregate.Event ` events. If a ``SpannerThrown`` event is being processed, then an exception is raised, to demonstrate error handling in process runners. .. literalinclude:: ../../../eventsourcing/tests/projection.py :pyobject: AggregateEventCountersProjection By default, other kinds of events will be processed by calling :func:`~eventsourcing.persistence.TrackingRecorder.insert_tracking`, so that progress along an application sequence can be recorded. This can help bridge large gaps without risking repetitive pulling and processing of upstream events. Projection runner ================= Let's consider how to run the projection, so events of an event-sourced application can be counted. As shown in the ``AggregateEventCountersProjectionTestCase`` class below, a :ref:`projection runner ` can be constructed with an application class, a projection class, a materialised view class, and an optional environment :class:`Mapping` object that specifies run-time configuration of the event-sourced application and the materialised view. It will :ref:`subscribe to the application `, from the position indicated by the view's :func:`~eventsourcing.persistence.TrackingRecorder.max_tracking_id` method, and then call the :func:`~eventsourcing.projection.Projection.process_event` method of the projection for each domain event yielded by the application subscription. The projection runner has a :func:`~eventsourcing.projection.BaseProjectionRunner.run_forever` method, which blocks until an optional timeout, or until an exception is raised by the projection or by the subscription, or until the projection runner is stopped by calling its :func:`~eventsourcing.projection.BaseProjectionRunner.stop` method. This allows an event processing component to be started and run independently as a separate operating system process for a controllable period of time, and then to terminate in a controlled way when there is an error, or when it is interrupted. Exceptions raised whilst running the projection will be re-raised by the :func:`~eventsourcing.projection.BaseProjectionRunner.run_forever` method. Operators of the system can examine any errors and resume processing by reconstructing the runner. Some types of errors may be transient operational errors, such as database connectivity, in which case the processing could be resumed automatically. Some errors may be programming errors, and will require manual intervention before the event processing can continue. The :func:`~eventsourcing.persistence.TrackingRecorder.wait` method of view objects can be used by user interface requests, to wait until an event has been processed by the projection before calling a query method on the materialised view. Separate instances of the view object can also be used to support such requests, if the view is backed by a durable database. .. literalinclude:: ../../../eventsourcing/tests/projection.py :pyobject: AggregateEventCountersProjectionTestCase The test method ``test_event_counters_projection()`` constructs a runner, and from the runner gets references to an event-sourced application "write model" and a materialised view "read model". Events are generated in the event-sourced application "write model". These events are then processed, according to the logic of the ``AggregateEventCountersProjection``, by updating the materialised event counters view "read model". The counted numbers of each kind of event can be are obtained from the "read model". The materialised view is "eventually consistent" because the event processing is asynchronous, and so the :func:`~eventsourcing.persistence.TrackingRecorder.wait` method is used to wait for the events to be processed before querying the materialised view. This makes the test deterministic. The test method ``test_run_forever_raises_projection_error()`` shows that after a ``SpannerThrown`` event is generated by the event-sourced application, an error occurs with the event-processing, and an exception representing this error is raised by the runner's :func:`~eventsourcing.projection.ProjectionRunner.run_forever` method. Because the event will not successfully be processed, the materialised view's :func:`~eventsourcing.persistence.TrackingRecorder.wait` method will time out. Running projection in memory ============================ The test case below runs the projection class with the ``POPOEventCounters`` view. .. literalinclude:: ../../../tests/projection_tests/test_projection_with_popo.py :pyobject: TestAggregateEventCountersProjectionWithPOPO When generating and counting events in memory, we need to use the projection runner's application and view objects for generating events and getting counted numbers. But, as we shall see, when using a durable database such as Postgres, the events can be generated using other instances of the application, and the materialised view can be queries using other instances of the view class. Running projection in Postgres ============================== The test case below runs the projection class with the ``PostgresEventCounters`` view. Because this test case uses a durable database for both the event-sourced application and the materialised view, any instance of the application can be used to write events, and any instance of the materialised view can be used to obtain the counted numbers of events. .. literalinclude:: ../../../tests/projection_tests/test_projection_with_postgres.py :pyobject: TestAggregateEventCountersProjectionWithPostgres If a durable database is used in production, the event-sourced application object and the materialised view object could be horizontally scaled, with the event-processing component running as a separate operating system process. This is demonstrated by extending the test methods: to resume processing by reconstructing the projection runner, to construct separate instances of the event-sourced application and of the materialised view, to continue generating events using the separate instance of the event-sourced application, to query the counted numbers of events using the separate instance of the materialised view, to expect the obtained counted numbers of events correctly report the total numbers of events that have been generated, and to show that the event-processing error is still raised and that the event-processing has not continued after the error caused by the ``SpannerThrown`` event. The event-sourced application and the materialised view could also use separate databases. But in this example they are configured more simply to use different tables in the same database. Exercises ========= 1. Replicate the code in this tutorial in your development environment. 2. Develop and run a projection that counts dogs and tricks from a ``DogSchool`` application. 3. Develop and run an :ref:`event-sourced projection` that counts dogs and tricks from a ``DogSchool`` application. Next steps ========== * Read the :doc:`projection module documentation `. * See also the :ref:`Example projections`. * To continue this tutorial, please read :doc:`Part 5 `.