projection
— Projections¶
This module supports projections of event-sourced applications into materialised views.
The central idea of this module follows the notion from CQRS of having separate command and query interfaces. This idea is often implemented in event-sourced systems by developing distinct and separate “write” and “read” models. The “write model” is an event-sourced application, and the “read model” is one or many “materialised views” of the event-sourced application. The event-sourced application is projected into a materialised view by processing the application’s events, usually with an asynchronous event-processing component so that the materialised view is eventually-consistent.
By processing each domain event in an application sequence in order, and by recording updates to the materialised view atomically with tracking objects that indicate the position in the application sequence of the event that was processed, and by constraining the tracking records to be unique, and by resuming to process the application from the position indicated by the last tracking record, the materialised view will be a “reliable” deterministic function of the state of the application.
Application subscriptions¶
This module provides an ApplicationSubscription
class, which can
be used to “subscribe” to the domain events of an application.
Application subscription objects are iterators that return domain events from an application sequence. Each domain event is accompanied by a tracking object that identifies the position of the domain event in the application sequence. Iterating over an application subscription will block when all recorded domain events have been returned, and then continue when new events are recorded. Application subscriptions are conveniently used by event-processing components that project the state of an event-sourced application into a materialised view, because they continue returning newly recorded events, because they subscribe to a database rather than an application object, and because they convert the stored events returned by an application recorder into domain event objects using the application’s mapper. Encapsulating all of these concerns provides a convenient way to follow the domain events of an event-sourced application.
The ApplicationSubscription
class has three constructor arguments,
app
,
gt
, and
topics
.
The constructor argument app
is required,
and is expected to be an event-sourced application object.
The constructor argument gt
is optional,
and if given is expected to be either a Python int
that indicates a position in the application’s sequence
(a notification ID) or None
. This matches the return type of the max_tracking_id()
method of tracking recorders. The intention here is that the max_tracking_id()
method of a downstream event-processing component’s tracking recorder (or equivalent) can be called and the value used
to start a subscription to an upstream event-sourced application from the correct position.
The constructor argument topics
is optional,
and if given is expected to be a Python tuple
of str
objects that are the topics
of domain events to be returned by the application subscription. The purpose of this argument is to filter events
within the event-sourced application’s database, avoiding the cost of transporting and reconstructing events that
will just be ignored by an event-processing component. If a non-empty sequence of topics is provided, only events
that have topics mentioned in this collection will be returned by the subscription. An empty sequence of topics,
which is the default value, will mean events will not be filtered by topic.
An application subscription will return all domain events in the application sequence, except those which have notification
IDs less than or equal to the position given by gt
,
and except those which do not have topics in the sequence given by topics
if any are given. The selection of events by notification ID and the filtering of events by topic will
usually be done in the application’s database server.
Application subscription objects usually open a database session, and either listen to the database for notifications and then select new event records, or otherwise directly stream records from a database. For this reason, application subscription objects support the Python context manager protocol, so that database connection resources can be freed in a controlled and convenient way when the subscription is stopped or exits.
Alternatively, application subscription objects have a stop()
method which can be used to stop the subscription to the application recorder in a controlled way.
from uuid import UUID
from eventsourcing.application import Application
from eventsourcing.domain import Aggregate
from eventsourcing.projection import ApplicationSubscription
# Construct an application object.
app = Application[UUID]()
# Record an event.
aggregate = Aggregate()
app.save(aggregate)
# Position in application sequence from which to subscribe.
max_tracking_id = 0
with ApplicationSubscription(app, gt=max_tracking_id, topics=()) as subscription:
for domain_event, tracking in subscription:
# Process the event and record new state with tracking information.
subscription.stop() # ...so we can continue with the examples
Please note, the POPOApplicationRecorder
and
PostgresApplicationRecorder
classes implement the
required subscribe()
method, but the SQLiteApplicationRecorder
class does not.
Projection¶
The library’s Projection
class is a generic abstract base class.
It can be used to define how the domain events of an application will be processed. It is a generic
class because it accepts one type argument, which is expected to be a type of tracking recorder that
defines the interface of a “materialised view”. It is an abstract class because it defines an abstract method,
process_event()
, that must be implemented by subclasses.
The Projection
class has one required constructor argument,
view
, which is expected to be a concrete
materialised view object of the type specified by the type argument. The constructor argument is
used to initialise the property view
. The annotated
type of view
is bound to the type argument of the
class.
The intention of this class is that it will be subclassed, and that a subclass’s implementation of
process_event()
will be called for each domain event in
an application sequence. Implementations of process_event()
will usually handle domain events of different types by calling a command method on the projection’s
view
object.
Subclasses of the Projection
class can optionally specify a
name
attribute. This attribute will be used
by a projection runner to distinguish environment variables to be used only for constructing
and configuring a projection’s materialised view from those to be used only for constructing
and configuring a projected event-sourced application. In some cases, this name will also be
used by the materialised view to name its database tables.
Subclasses of the Projection
class can optionally specify a
topics
attribute, so that an application subscription
can be more selective when it is used by a projection runner to obtain events for the projection.
The example below shows how a projection can be defined.
from abc import ABC, abstractmethod
from eventsourcing.domain import DomainEventProtocol
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.persistence import Tracking
from eventsourcing.projection import Projection
from eventsourcing.utils import get_topic
class MyProjection(Projection["MyMaterialisedViewInterface"]):
name = "myprojection"
topics = (get_topic(Aggregate.Event), )
@singledispatchmethod
def process_event(self, domain_event: DomainEventProtocol[UUID], tracking: Tracking) -> None:
pass
@process_event.register
def _(self, domain_event: Aggregate.Event, tracking: Tracking) -> None:
self.view.my_command(tracking)
The example below indicates how the projection’s materialised view can be defined.
from abc import ABC, abstractmethod
from eventsourcing.persistence import TrackingRecorder
from eventsourcing.popo import POPOTrackingRecorder
from eventsourcing.postgres import PostgresTrackingRecorder
from eventsourcing.sqlite import SQLiteTrackingRecorder
class MyMaterialisedViewInterface(TrackingRecorder, ABC):
@abstractmethod
def my_command(self, tracking: Tracking) -> None:
"""Updates materialised view"""
class MyPOPOMaterialisedView(MyMaterialisedViewInterface, POPOTrackingRecorder):
def my_command(self, tracking: Tracking) -> None:
with self._database_lock:
# Insert tracking record...
self._assert_tracking_uniqueness(tracking)
self._insert_tracking(tracking)
# ...and then update materialised view.
class MySQLiteMaterialisedView(MyMaterialisedViewInterface, SQLiteTrackingRecorder):
def my_command(self, tracking: Tracking) -> None:
...
class MyPostgresMaterialisedView(MyMaterialisedViewInterface, PostgresTrackingRecorder):
def my_command(self, tracking: Tracking) -> None:
...
Projection runner¶
This module provides a ProjectionRunner
class, which can be used to run projections.
Projection runner objects can be constructed by calling the ProjectionRunner
class with an event-sourced application class, a projection class, a materialised view class, and an optional
mapping object that contains environment variables to be used to configure the application and the materialised
view. An application object will be constructed using the application class and the environment variables. An
infrastructure factory will be constructed for the tracking recorder, also using the environment variables.
A projection object will then be constructed using the materialised view. The projection runner will start
a subscription to the application, from the position indicated by the tracking recorder’s
max_tracking_id()
method. In a separate thread, the
projection runner will iterate over the application subscription, calling the projection’s
process_event()
method for each domain event
and tracking object returned by the application subscription.
Projection runner objects support the Python context manager protocol, so that database resources used by the application subscription and the materialised view can be freed in a controlled way when the projection runner is stopped or exits.
The projection runner method run_forever()
will block
until either process_event()
raises an error, or until
the application subscription raises an error, or until the optional timeout is reached, or until the
stop()
method is called.
The example below shows how to run a projection. In this example, the event-sourced application class is
Application
. It is constructed with the default
eventsourcing.popo
persistence module. The projection class and the materialised view class are taken
from the examples above. The projection runner is used as a context manager. The projection runner’s
run_forever()
method is called which keeps the projection running.
The stop()
method is called by a signal handler when the
operating system process receives an interrupt signal. The example below starts a thread which sends the interrupt
signal after 1s.
import os, signal, threading, time
from eventsourcing.projection import ProjectionRunner
# For demonstration purposes, interrupt process with SIGINT after 1s.
def sleep_then_kill() -> None:
time.sleep(1)
os.kill(os.getpid(), signal.SIGINT)
threading.Thread(target=sleep_then_kill).start()
# Run projection as a context manager.
with ProjectionRunner(
application_class=Application,
view_class=MyPOPOMaterialisedView,
projection_class=MyProjection,
env={},
) as projection_runner:
# Register signal handler.
signal.signal(signal.SIGINT, lambda *args: projection_runner.stop())
# Run until interrupted.
projection_runner.run_forever()
The intention of a projection runner is to operate as a separate event-processing component,
with potentially many instances of an upstream event-processing application, and many instances
of a downstream materialised view, operating in different operating system processes. A user interface
may transition from sending a command that results in new events being written to the “write model”
over to presenting the results of querying the “read model”. Since the “read model” is eventually-consistent,
and so may not immediately have been updated by processing the new events, running the risk that the view
presented to a user will appear to be stale by not reflecting their recent work, the notification IDs
returned from calls to the event-sourced application’s save()
method can be used by the user interface to wait()
until
the “read model” has been updated.
See Tutorial - Part 4 for more guidance and examples.
Event-sourced projection¶
The library’s EventSourcedProjection
class is an abstract base class
that extends the library’s Application
class by using a
process recorder, and by introducing an abstract
policy
method.
It can be used to define how the events of an event-sourced application will be processed into another event-sourced application.
The abstract policy
method should be implemented on subclasses. Implementations of this method should manipulate event-sourced
aggregates and collect events onto the given ProcessingEvent
object,
which will record the new domain events atomically with a tracking object that indicates the position of
the event that has been processed in its application sequence.
In the example below, the Counters
application defines its policy()
method
to increment a Counter
aggregate.
class Counters(EventSourcedProjection[UUID]):
@singledispatchmethod
def policy(
self,
domain_event: DomainEventProtocol[UUID],
processing_event: ProcessingEvent[UUID],
) -> None:
topic = get_topic(type(domain_event))
try:
counter_id = Counter.create_id(topic)
counter: Counter = self.repository.get(counter_id)
except AggregateNotFoundError:
counter = Counter(topic)
counter.increment()
processing_event.collect_events(counter)
def get_count(self, domain_event_class: type[DomainEventProtocol[UUID]]) -> int:
topic = get_topic(domain_event_class)
counter_id = Counter.create_id(topic)
try:
counter: Counter = self.repository.get(counter_id)
except AggregateNotFoundError:
return 0
return counter.count
class Counter(Aggregate):
def __init__(self, name: str) -> None:
self.name = name
self.count = 0
@classmethod
def create_id(cls, name: str) -> UUID:
return uuid5(NAMESPACE_URL, f"/counters/{name}")
@event("Incremented")
def increment(self) -> None:
self.count += 1
Event-sourced projection runner¶
The library’s EventSourcedProjectionRunner
class can
be used to run an event-sourced projection of an event-soured application. It works and can be used
in a similar way to the projection runner described above. The only difference is
that the constructor has a projection_class
argument which is expected to be a subclass of EventSourcedProjection
and it does not accept a separate view class. Environment variables given with the
env
argument will be used to
configure the event-sourced applications when they are constructed
from the given classes.
The example below run the Counters
projection with an instance of the library’s
Application
class. Four Aggregate
objects are generated and a subsequent Aggregate.Event
is triggered on the fourth aggregate. The counted numbers of Aggregate.Created
and Aggregate.Event
are checked after waiting for the events to be processed.
The env
argument is used unnecessarily
here, since the values given are the defaults, but it is included to show how the applications could easily be
configured to use durable databases. Please note, the SQLite persistence module does not
currently support application subscriptions.
from eventsourcing.projection import EventSourcedProjectionRunner
with EventSourcedProjectionRunner(
application_class=Application,
projection_class=Counters,
env={
"APPLICATION_PERSISTENCE_MODULE": "eventsourcing.popo",
"COUNTERS_PERSISTENCE_MODULE": "eventsourcing.popo",
},
) as runner:
recordings = runner.app.save(Aggregate())
runner.wait(recordings[-1].notification.id)
assert runner.projection.get_count(Aggregate.Created) == 1
assert runner.projection.get_count(Aggregate.Event) == 0
recordings = runner.app.save(Aggregate())
runner.wait(recordings[-1].notification.id)
assert runner.projection.get_count(Aggregate.Created) == 2
assert runner.projection.get_count(Aggregate.Event) == 0
recordings = runner.app.save(Aggregate())
runner.wait(recordings[-1].notification.id)
assert runner.projection.get_count(Aggregate.Created) == 3
assert runner.projection.get_count(Aggregate.Event) == 0
aggregate = Aggregate()
aggregate.trigger_event(Aggregate.Event)
recordings = runner.app.save(aggregate)
runner.wait(recordings[-1].notification.id)
assert runner.projection.get_count(Aggregate.Created) == 4
assert runner.projection.get_count(Aggregate.Event) == 1
Code reference¶
- class eventsourcing.projection.ApplicationSubscription(app: Application[TAggregateID], gt: int | None = None, topics: Sequence[str] = ())[source]¶
Bases:
Iterator
[tuple
[DomainEventProtocol
[TAggregateID
],Tracking
]]An iterator that yields all domain events recorded in an application sequence that have notification IDs greater than a given value. The iterator will block when all recorded domain events have been yielded, and then continue when new events are recorded. Domain events are returned along with tracking objects that identify the position in the application sequence.
- __init__(app: Application[TAggregateID], gt: int | None = None, topics: Sequence[str] = ())[source]¶
Starts a subscription to application’s recorder.
- __exit__(*args: object, **kwargs: Any) None [source]¶
Calls __exit__ on the stored event subscription.
- __next__() tuple[DomainEventProtocol[TAggregateID], Tracking] [source]¶
Returns the next stored event from subscription to the application’s recorder. Constructs a tracking object that identifies the position of the event in the application sequence. Constructs a domain event object from the stored event object using the application’s mapper. Returns a tuple of the domain event object and the tracking object.
- class eventsourcing.projection.Projection(view: TTrackingRecorder)[source]¶
Bases:
ABC
,Generic
[TTrackingRecorder
]- name: str = ''¶
Name of projection, used to pick prefixed environment variables and define database table names.
- topics: tuple[str, ...] = ()¶
Event topics, used to filter events in database when subscribing to an application.
- __init__(view: TTrackingRecorder)[source]¶
Initialises the view property with the given view argument.
- property view: TTrackingRecorder¶
Materialised view of an event-sourced application.
- abstractmethod process_event(domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking) None [source]¶
Process a domain event and track it.
- class eventsourcing.projection.EventSourcedProjection(env: Mapping[str, str] | None = None)[source]¶
Bases:
Application
[TAggregateID
],ABC
Extends the
Application
class by using a process recorder as its application recorder, and by processing domain events through itspolicy()
method.- topics: Sequence[str] = ()¶
- __init__(env: Mapping[str, str] | None = None) None [source]¶
Initialises an application with an
InfrastructureFactory
, aMapper
, anApplicationRecorder
, anEventStore
, aRepository
, and aLocalNotificationLog
.
- construct_recorder() ProcessRecorder [source]¶
Constructs and returns a
ProcessRecorder
for the application to use as its application recorder.
- process_event(domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking) None [source]¶
Calls
policy()
method with the given domain event and a newProcessingEvent
constructed with the given tracking object.The policy method should collect any new aggregate events on the process event object.
After the policy method returns, the processing event object will be recorded by calling
_record()
, which then returns list ofRecording
.After calling
_take_snapshots()
, the recordings are passed in a call to_notify()
.
- policy(domain_event: DomainEventProtocol[TAggregateID], processing_event: ProcessingEvent[TAggregateID]) None [source]¶
Abstract domain event processing policy method. Must be implemented by event processing applications. When processing the given domain event, event processing applications must use the
collect_events()
method of the givenProcessingEvent
object (not the application’ssave()
method) so that the new domain events will be recorded atomically and uniquely with tracking information about the position of the processed event in its application sequence.
- name = 'EventSourcedProjection'¶
- class eventsourcing.projection.BaseProjectionRunner(*, projection: EventSourcedProjection[Any] | Projection[Any], application_class: type[TApplication], tracking_recorder: TrackingRecorder, topics: Sequence[str], env: Mapping[str, str] | None = None)[source]¶
Bases:
Generic
[TApplication
]- __init__(*, projection: EventSourcedProjection[Any] | Projection[Any], application_class: type[TApplication], tracking_recorder: TrackingRecorder, topics: Sequence[str], env: Mapping[str, str] | None = None) None [source]¶
- property is_interrupted: Event¶
- run_forever(timeout: float | None = None) None [source]¶
Blocks until timeout, or until the runner is stopped or errors. Re-raises any error otherwise exits normally
- class eventsourcing.projection.ProjectionRunner(*, application_class: type[TApplication], projection_class: type[Projection[TTrackingRecorder]], view_class: type[TTrackingRecorder], env: Mapping[str, str] | None = None)[source]¶
Bases:
BaseProjectionRunner
[TApplication
],Generic
[TApplication
,TTrackingRecorder
]- __init__(*, application_class: type[TApplication], projection_class: type[Projection[TTrackingRecorder]], view_class: type[TTrackingRecorder], env: Mapping[str, str] | None = None)[source]¶
Constructs application from given application class with given environment. Also constructs a materialised view from given class using an infrastructure factory constructed with an environment named after the projection. Also constructs a projection with the constructed materialised view object. Starts a subscription to application and, in a separate event-processing thread, calls projection’s process_event() method for each event and tracking object pair received from the subscription.
- class eventsourcing.projection.EventSourcedProjectionRunner(*, application_class: type[TApplication], projection_class: type[TEventSourcedProjection], env: Mapping[str, str] | None = None)[source]¶
Bases:
BaseProjectionRunner
[TApplication
],Generic
[TApplication
,TEventSourcedProjection
]