projection — Projections

This module supports projections of event-sourced applications into materialised views.

The central idea of this module follows the notion from CQRS of having separate command and query interfaces. This idea is often implemented in event-sourced systems by developing distinct and separate “write” and “read” models. The “write model” is an event-sourced application, and the “read model” is one or many “materialised views” of the event-sourced application. The event-sourced application is projected into a materialised view by processing the application’s events, usually with an asynchronous event-processing component so that the materialised view is eventually-consistent.

By processing each domain event in an application sequence in order, and by recording updates to the materialised view atomically with tracking objects that indicate the position in the application sequence of the event that was processed, and by constraining the tracking records to be unique, and by resuming to process the application from the position indicated by the last tracking record, the materialised view will be a “reliable” deterministic function of the state of the application.

Application subscriptions

This module provides an ApplicationSubscription class, which can be used to “subscribe” to the domain events of an application.

Application subscription objects are iterators that return domain events from an application sequence. Each domain event is accompanied by a tracking object that identifies the position of the domain event in the application sequence. Iterating over an application subscription will block when all recorded domain events have been returned, and then continue when new events are recorded. Application subscriptions are conveniently used by event-processing components that project the state of an event-sourced application into a materialised view, because they continue returning newly recorded events, because they subscribe to a database rather than an application object, and because they convert the stored events returned by an application recorder into domain event objects using the application’s mapper. Encapsulating all of these concerns provides a convenient way to follow the domain events of an event-sourced application.

The ApplicationSubscription class has three constructor arguments, app, gt, and topics.

The constructor argument app is required, and is expected to be an event-sourced application object.

The constructor argument gt is optional, and if given is expected to be either a Python int that indicates a position in the application’s sequence (a notification ID) or None. This matches the return type of the max_tracking_id() method of tracking recorders. The intention here is that the max_tracking_id() method of a downstream event-processing component’s tracking recorder (or equivalent) can be called and the value used to start a subscription to an upstream event-sourced application from the correct position.

The constructor argument topics is optional, and if given is expected to be a Python tuple of str objects that are the topics of domain events to be returned by the application subscription. The purpose of this argument is to filter events within the event-sourced application’s database, avoiding the cost of transporting and reconstructing events that will just be ignored by an event-processing component. If a non-empty sequence of topics is provided, only events that have topics mentioned in this collection will be returned by the subscription. An empty sequence of topics, which is the default value, will mean events will not be filtered by topic.

An application subscription will return all domain events in the application sequence, except those which have notification IDs less than or equal to the position given by gt, and except those which do not have topics in the sequence given by topics if any are given. The selection of events by notification ID and the filtering of events by topic will usually be done in the application’s database server.

Application subscription objects usually open a database session, and either listen to the database for notifications and then select new event records, or otherwise directly stream records from a database. For this reason, application subscription objects support the Python context manager protocol, so that database connection resources can be freed in a controlled and convenient way when the subscription is stopped or exits.

Alternatively, application subscription objects have a stop() method which can be used to stop the subscription to the application recorder in a controlled way.

from uuid import UUID

from eventsourcing.application import Application
from eventsourcing.domain import Aggregate
from eventsourcing.projection import ApplicationSubscription

# Construct an application object.
app = Application[UUID]()

# Record an event.
aggregate = Aggregate()
app.save(aggregate)

# Position in application sequence from which to subscribe.
max_tracking_id = 0

with ApplicationSubscription(app, gt=max_tracking_id, topics=()) as subscription:
    for domain_event, tracking in subscription:
        # Process the event and record new state with tracking information.
        subscription.stop()  # ...so we can continue with the examples

Please note, the POPOApplicationRecorder and PostgresApplicationRecorder classes implement the required subscribe() method, but the SQLiteApplicationRecorder class does not.

Projection

The library’s Projection class is a generic abstract base class. It can be used to define how the domain events of an application will be processed. It is a generic class because it accepts one type argument, which is expected to be a type of tracking recorder that defines the interface of a “materialised view”. It is an abstract class because it defines an abstract method, process_event(), that must be implemented by subclasses.

The Projection class has one required constructor argument, view, which is expected to be a concrete materialised view object of the type specified by the type argument. The constructor argument is used to initialise the property view. The annotated type of view is bound to the type argument of the class.

The intention of this class is that it will be subclassed, and that a subclass’s implementation of process_event() will be called for each domain event in an application sequence. Implementations of process_event() will usually handle domain events of different types by calling a command method on the projection’s view object.

Subclasses of the Projection class can optionally specify a name attribute. This attribute will be used by a projection runner to distinguish environment variables to be used only for constructing and configuring a projection’s materialised view from those to be used only for constructing and configuring a projected event-sourced application. In some cases, this name will also be used by the materialised view to name its database tables.

Subclasses of the Projection class can optionally specify a topics attribute, so that an application subscription can be more selective when it is used by a projection runner to obtain events for the projection.

The example below shows how a projection can be defined.

from abc import ABC, abstractmethod
from eventsourcing.domain import DomainEventProtocol
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.persistence import Tracking
from eventsourcing.projection import Projection
from eventsourcing.utils import get_topic

class MyProjection(Projection["MyMaterialisedViewInterface"]):
    name = "myprojection"
    topics = (get_topic(Aggregate.Event), )

    @singledispatchmethod
    def process_event(self, domain_event: DomainEventProtocol[UUID], tracking: Tracking) -> None:
        pass

    @process_event.register
    def _(self, domain_event: Aggregate.Event, tracking: Tracking) -> None:
        self.view.my_command(tracking)

The example below indicates how the projection’s materialised view can be defined.

from abc import ABC, abstractmethod
from eventsourcing.persistence import TrackingRecorder
from eventsourcing.popo import POPOTrackingRecorder
from eventsourcing.postgres import PostgresTrackingRecorder
from eventsourcing.sqlite import SQLiteTrackingRecorder

class MyMaterialisedViewInterface(TrackingRecorder, ABC):
    @abstractmethod
    def my_command(self, tracking: Tracking) -> None:
        """Updates materialised view"""

class MyPOPOMaterialisedView(MyMaterialisedViewInterface, POPOTrackingRecorder):
    def my_command(self, tracking: Tracking) -> None:
        with self._database_lock:
            # Insert tracking record...
            self._assert_tracking_uniqueness(tracking)
            self._insert_tracking(tracking)
            # ...and then update materialised view.

class MySQLiteMaterialisedView(MyMaterialisedViewInterface, SQLiteTrackingRecorder):
    def my_command(self, tracking: Tracking) -> None:
        ...

class MyPostgresMaterialisedView(MyMaterialisedViewInterface, PostgresTrackingRecorder):
    def my_command(self, tracking: Tracking) -> None:
        ...

Projection runner

This module provides a ProjectionRunner class, which can be used to run projections.

Projection runner objects can be constructed by calling the ProjectionRunner class with an event-sourced application class, a projection class, a materialised view class, and an optional mapping object that contains environment variables to be used to configure the application and the materialised view. An application object will be constructed using the application class and the environment variables. An infrastructure factory will be constructed for the tracking recorder, also using the environment variables.

A projection object will then be constructed using the materialised view. The projection runner will start a subscription to the application, from the position indicated by the tracking recorder’s max_tracking_id() method. In a separate thread, the projection runner will iterate over the application subscription, calling the projection’s process_event() method for each domain event and tracking object returned by the application subscription.

Projection runner objects support the Python context manager protocol, so that database resources used by the application subscription and the materialised view can be freed in a controlled way when the projection runner is stopped or exits.

The projection runner method run_forever() will block until either process_event() raises an error, or until the application subscription raises an error, or until the optional timeout is reached, or until the stop() method is called.

The example below shows how to run a projection. In this example, the event-sourced application class is Application. It is constructed with the default eventsourcing.popo persistence module. The projection class and the materialised view class are taken from the examples above. The projection runner is used as a context manager. The projection runner’s run_forever() method is called which keeps the projection running. The stop() method is called by a signal handler when the operating system process receives an interrupt signal. The example below starts a thread which sends the interrupt signal after 1s.

import os, signal, threading, time

from eventsourcing.projection import ProjectionRunner

# For demonstration purposes, interrupt process with SIGINT after 1s.
def sleep_then_kill() -> None:
    time.sleep(1)
    os.kill(os.getpid(), signal.SIGINT)

threading.Thread(target=sleep_then_kill).start()

# Run projection as a context manager.
with ProjectionRunner(
    application_class=Application,
    view_class=MyPOPOMaterialisedView,
    projection_class=MyProjection,
    env={},
) as projection_runner:

    # Register signal handler.
    signal.signal(signal.SIGINT, lambda *args: projection_runner.stop())

    # Run until interrupted.
    projection_runner.run_forever()

The intention of a projection runner is to operate as a separate event-processing component, with potentially many instances of an upstream event-processing application, and many instances of a downstream materialised view, operating in different operating system processes. A user interface may transition from sending a command that results in new events being written to the “write model” over to presenting the results of querying the “read model”. Since the “read model” is eventually-consistent, and so may not immediately have been updated by processing the new events, running the risk that the view presented to a user will appear to be stale by not reflecting their recent work, the notification IDs returned from calls to the event-sourced application’s save() method can be used by the user interface to wait() until the “read model” has been updated.

See Tutorial - Part 4 for more guidance and examples.

Event-sourced projection

The library’s EventSourcedProjection class is an abstract base class that extends the library’s Application class by using a process recorder, and by introducing an abstract policy method.

It can be used to define how the events of an event-sourced application will be processed into another event-sourced application.

The abstract policy method should be implemented on subclasses. Implementations of this method should manipulate event-sourced aggregates and collect events onto the given ProcessingEvent object, which will record the new domain events atomically with a tracking object that indicates the position of the event that has been processed in its application sequence.

In the example below, the Counters application defines its policy() method to increment a Counter aggregate.

class Counters(EventSourcedProjection[UUID]):
    @singledispatchmethod
    def policy(
        self,
        domain_event: DomainEventProtocol[UUID],
        processing_event: ProcessingEvent[UUID],
    ) -> None:
        topic = get_topic(type(domain_event))
        try:
            counter_id = Counter.create_id(topic)
            counter: Counter = self.repository.get(counter_id)
        except AggregateNotFoundError:
            counter = Counter(topic)
        counter.increment()
        processing_event.collect_events(counter)

    def get_count(self, domain_event_class: type[DomainEventProtocol[UUID]]) -> int:
        topic = get_topic(domain_event_class)
        counter_id = Counter.create_id(topic)
        try:
            counter: Counter = self.repository.get(counter_id)
        except AggregateNotFoundError:
            return 0
        return counter.count
class Counter(Aggregate):
    def __init__(self, name: str) -> None:
        self.name = name
        self.count = 0

    @classmethod
    def create_id(cls, name: str) -> UUID:
        return uuid5(NAMESPACE_URL, f"/counters/{name}")

    @event("Incremented")
    def increment(self) -> None:
        self.count += 1

Event-sourced projection runner

The library’s EventSourcedProjectionRunner class can be used to run an event-sourced projection of an event-soured application. It works and can be used in a similar way to the projection runner described above. The only difference is that the constructor has a projection_class argument which is expected to be a subclass of EventSourcedProjection and it does not accept a separate view class. Environment variables given with the env argument will be used to configure the event-sourced applications when they are constructed from the given classes.

The example below run the Counters projection with an instance of the library’s Application class. Four Aggregate objects are generated and a subsequent Aggregate.Event is triggered on the fourth aggregate. The counted numbers of Aggregate.Created and Aggregate.Event are checked after waiting for the events to be processed. The env argument is used unnecessarily here, since the values given are the defaults, but it is included to show how the applications could easily be configured to use durable databases. Please note, the SQLite persistence module does not currently support application subscriptions.

from eventsourcing.projection import EventSourcedProjectionRunner

with EventSourcedProjectionRunner(
    application_class=Application,
    projection_class=Counters,
    env={
        "APPLICATION_PERSISTENCE_MODULE": "eventsourcing.popo",
        "COUNTERS_PERSISTENCE_MODULE": "eventsourcing.popo",
    },
) as runner:
    recordings = runner.app.save(Aggregate())
    runner.wait(recordings[-1].notification.id)
    assert runner.projection.get_count(Aggregate.Created) == 1
    assert runner.projection.get_count(Aggregate.Event) == 0

    recordings = runner.app.save(Aggregate())
    runner.wait(recordings[-1].notification.id)
    assert runner.projection.get_count(Aggregate.Created) == 2
    assert runner.projection.get_count(Aggregate.Event) == 0

    recordings = runner.app.save(Aggregate())
    runner.wait(recordings[-1].notification.id)
    assert runner.projection.get_count(Aggregate.Created) == 3
    assert runner.projection.get_count(Aggregate.Event) == 0

    aggregate = Aggregate()
    aggregate.trigger_event(Aggregate.Event)
    recordings = runner.app.save(aggregate)
    runner.wait(recordings[-1].notification.id)
    assert runner.projection.get_count(Aggregate.Created) == 4
    assert runner.projection.get_count(Aggregate.Event) == 1

Code reference

class eventsourcing.projection.ApplicationSubscription(app: Application[TAggregateID], gt: int | None = None, topics: Sequence[str] = ())[source]

Bases: Iterator[tuple[DomainEventProtocol[TAggregateID], Tracking]]

An iterator that yields all domain events recorded in an application sequence that have notification IDs greater than a given value. The iterator will block when all recorded domain events have been yielded, and then continue when new events are recorded. Domain events are returned along with tracking objects that identify the position in the application sequence.

__init__(app: Application[TAggregateID], gt: int | None = None, topics: Sequence[str] = ())[source]

Starts a subscription to application’s recorder.

stop() None[source]

Stops the subscription to the application’s recorder.

__enter__() Self[source]

Calls __enter__ on the stored event subscription.

__exit__(*args: object, **kwargs: Any) None[source]

Calls __exit__ on the stored event subscription.

__iter__() Self[source]
__next__() tuple[DomainEventProtocol[TAggregateID], Tracking][source]

Returns the next stored event from subscription to the application’s recorder. Constructs a tracking object that identifies the position of the event in the application sequence. Constructs a domain event object from the stored event object using the application’s mapper. Returns a tuple of the domain event object and the tracking object.

class eventsourcing.projection.Projection(view: TTrackingRecorder)[source]

Bases: ABC, Generic[TTrackingRecorder]

name: str = ''

Name of projection, used to pick prefixed environment variables and define database table names.

topics: tuple[str, ...] = ()

Event topics, used to filter events in database when subscribing to an application.

__init__(view: TTrackingRecorder)[source]

Initialises the view property with the given view argument.

property view: TTrackingRecorder

Materialised view of an event-sourced application.

abstractmethod process_event(domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking) None[source]

Process a domain event and track it.

class eventsourcing.projection.EventSourcedProjection(env: Mapping[str, str] | None = None)[source]

Bases: Application[TAggregateID], ABC

Extends the Application class by using a process recorder as its application recorder, and by processing domain events through its policy() method.

topics: Sequence[str] = ()
__init__(env: Mapping[str, str] | None = None) None[source]

Initialises an application with an InfrastructureFactory, a Mapper, an ApplicationRecorder, an EventStore, a Repository, and a LocalNotificationLog.

construct_recorder() ProcessRecorder[source]

Constructs and returns a ProcessRecorder for the application to use as its application recorder.

process_event(domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking) None[source]

Calls policy() method with the given domain event and a new ProcessingEvent constructed with the given tracking object.

The policy method should collect any new aggregate events on the process event object.

After the policy method returns, the processing event object will be recorded by calling _record(), which then returns list of Recording.

After calling _take_snapshots(), the recordings are passed in a call to _notify().

policy(domain_event: DomainEventProtocol[TAggregateID], processing_event: ProcessingEvent[TAggregateID]) None[source]

Abstract domain event processing policy method. Must be implemented by event processing applications. When processing the given domain event, event processing applications must use the collect_events() method of the given ProcessingEvent object (not the application’s save() method) so that the new domain events will be recorded atomically and uniquely with tracking information about the position of the processed event in its application sequence.

name = 'EventSourcedProjection'
class eventsourcing.projection.BaseProjectionRunner(*, projection: EventSourcedProjection[Any] | Projection[Any], application_class: type[TApplication], tracking_recorder: TrackingRecorder, topics: Sequence[str], env: Mapping[str, str] | None = None)[source]

Bases: Generic[TApplication]

__init__(*, projection: EventSourcedProjection[Any] | Projection[Any], application_class: type[TApplication], tracking_recorder: TrackingRecorder, topics: Sequence[str], env: Mapping[str, str] | None = None) None[source]
property is_interrupted: Event
stop() None[source]

Sets the “interrupted” event.

run_forever(timeout: float | None = None) None[source]

Blocks until timeout, or until the runner is stopped or errors. Re-raises any error otherwise exits normally

wait(notification_id: int | None, timeout: float = 1.0) None[source]

Blocks until timeout, or until the materialised view has recorded a tracking object that is greater than or equal to the given notification ID.

__enter__() Self[source]
__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) None[source]

Calls stop() and waits for the event-processing thread to exit.

class eventsourcing.projection.ProjectionRunner(*, application_class: type[TApplication], projection_class: type[Projection[TTrackingRecorder]], view_class: type[TTrackingRecorder], env: Mapping[str, str] | None = None)[source]

Bases: BaseProjectionRunner[TApplication], Generic[TApplication, TTrackingRecorder]

__init__(*, application_class: type[TApplication], projection_class: type[Projection[TTrackingRecorder]], view_class: type[TTrackingRecorder], env: Mapping[str, str] | None = None)[source]

Constructs application from given application class with given environment. Also constructs a materialised view from given class using an infrastructure factory constructed with an environment named after the projection. Also constructs a projection with the constructed materialised view object. Starts a subscription to application and, in a separate event-processing thread, calls projection’s process_event() method for each event and tracking object pair received from the subscription.

class eventsourcing.projection.EventSourcedProjectionRunner(*, application_class: type[TApplication], projection_class: type[TEventSourcedProjection], env: Mapping[str, str] | None = None)[source]

Bases: BaseProjectionRunner[TApplication], Generic[TApplication, TEventSourcedProjection]

__init__(*, application_class: type[TApplication], projection_class: type[TEventSourcedProjection], env: Mapping[str, str] | None = None)[source]