Tutorial - Part 4 - Projections¶
This part of the tutorial shows how the state of an event-sourced application can be projected into a materialised view that supports arbitrary queries.
As we saw in Part 3, we can use the library’s
Application class to define an event-sourced
application. The state of an event-sourced application is recorded as a set of events,
each of which is positioned in one of many “aggregate sequences” and also in the overall
“application sequence”.
We can reconstruct an aggregate by selecting events from an aggregate sequence, and we can propagate the state of an application by following its application sequence. But we can only query the current state of an event-sourced application in other ways by firstly projecting the application state into a “read model” that is designed to support such queries.
So that such queries can be performed quickly, it is useful to project the state of an event-sourced application into a read model that is stored in a database. Such “materialised views” are usually prepared and updated by processing the events of the event-sourced application in a separate event-processing component, which subscribes to the application sequence so that it can catch-up and then continue as further events are recorded.
In the sections below we will explore how the events of event-sourced applications can be processed into materialised views that support queries that are not directly supported by event-sourced applications.
View classes¶
Materialised views support queries of the state of an event-sourced application. Therefore, every materialised view will need to have at least one query method. Materialised views will also need to be updated when events are processed, and so a materialised view will also need to have at least one command method.
A materialised view is typically expected to be a reliable deterministic projection of the state of an event-sourced application. To achieve this, we need to record the position of each processed event uniquely, and record it atomically along with the updating of the materialised view. This ensures the results of processing an event are never successfully recorded more than once. And so that event processing can be resumed correctly, we also need to get the position of the last successfully processed event.
Because of the arbitrary nature of materialised views, the recording and reporting of the positions of processed events are the only common aspects shared by all materialised views. In this sense, all materialised views are special kinds of tracking recorders.
And so we can define an interface for a materialised view using the library’s abstract
TrackingRecorder class. The interface will define how the materialised view
will be queried and how events will be processed, independently of any concrete implementation.
We can then develop concrete implementations of this interface by also extending the library’s concrete tracking
recorder classes: POPOTrackingRecorder, SQLiteTrackingRecorder
and PostgresTrackingRecorder. The concrete tracking recorders help with recording the
positions of processed events uniquely and atomically in a particular database.
To show how this can work, let’s create a simple materialised view, for counting events, that can record and reveal the number of “created” events and the number of subsequent events that have been generated by an event-sourced application.
View test¶
Let’s begin by writing a test. The EventCountersViewTestCase shown below expresses requirements for a view object
that can record and reveal the number of events generated by an event-sourced application.
The test expects this view object:
to be an instance of
TrackingRecorder, and thereby to have amax_tracking_id()method. This will support resuming to process events from the correct position.to have command and query methods for incrementing and obtaining the recorded number of events. The values returned by the query methods are expected to be equal to the number of times the corresponding command methods have been called.
to have command methods with a tracking object parameter. The value returned by
max_tracking_id()is expected to reflect which tracking objects have been recorded by the command methods. The command methods are expected to raise anIntegrityErrorwhen called more than once with the same tracking object. The counted numbers are expected not to change when anIntegrityErroris raised by a command method.to have a
wait()method that returns normally without raising an exception when called with a position that has been recorded. ATimeoutErroris expected to be raised whenwait()is called with a position greater than those which have been recorded.
The test begins by constructing an instance of EventCountersView — to be defined below.
class EventCountersViewTestCase(TestCase):
def construct_event_counters_view(self) -> EventCountersView:
raise NotImplementedError
def test(self) -> None:
# Construct materialised view object.
view = self.construct_event_counters_view()
# Check the view object is a tracking recorder.
self.assertIsInstance(view, TrackingRecorder)
# Check the view has processed no events.
self.assertIsNone(view.max_tracking_id("upstream"))
# Check the event counters are zero.
self.assertEqual(view.get_created_event_counter(), 0)
self.assertEqual(view.get_subsequent_event_counter(), 0)
# Increment the "created" event counter.
view.incr_created_event_counter(Tracking("upstream", 1))
# Check the counted number of events.
self.assertEqual(view.get_created_event_counter(), 1)
self.assertEqual(view.get_subsequent_event_counter(), 0)
# Increment the subsequent event counter.
view.incr_subsequent_event_counter(Tracking("upstream", 2))
# Check the counted number of events.
self.assertEqual(view.get_created_event_counter(), 1)
self.assertEqual(view.get_subsequent_event_counter(), 1)
# Increment the subsequent event counter again.
view.incr_subsequent_event_counter(Tracking("upstream", 3))
# Check the counted number of events.
self.assertEqual(view.get_created_event_counter(), 1)
self.assertEqual(view.get_subsequent_event_counter(), 2)
# Check the tracking objects have been recorded.
self.assertEqual(view.max_tracking_id("upstream"), 3)
# Check the tracking objects are recorded uniquely and atomically.
with self.assertRaises(IntegrityError):
view.incr_created_event_counter(Tracking("upstream", 3))
# Check the counted number of events.
self.assertEqual(view.get_created_event_counter(), 1)
with self.assertRaises(IntegrityError):
view.incr_subsequent_event_counter(Tracking("upstream", 3))
# Check the counted number of events.
self.assertEqual(view.get_subsequent_event_counter(), 2)
# Check the wait() method returns normally when
# we wait for a position that has been recorded.
view.wait("upstream", 3)
# Check the wait() method raises a TimeoutError when
# we wait for a postion that has not been recorded.
with self.assertRaises(TimeoutError):
view.wait("upstream", 4, timeout=0.5)
View example¶
Towards satisfying this test, let’s define EventCountersView. We can extend the library’s abstract base
class TrackingRecorder and define abstract command and query methods
with names and signatures expected by the test above:
the command methods
incr_created_event_counter()andincr_subsequent_event_counter()have atrackingparameter, typed with the library’sTrackingclass.the query methods
get_created_event_counter()andget_subsequent_event_counter()return integers.
class EventCountersView(TrackingRecorder):
@abstractmethod
def get_created_event_counter(self) -> int:
pass
@abstractmethod
def get_subsequent_event_counter(self) -> int:
pass
@abstractmethod
def incr_created_event_counter(self, tracking: Tracking) -> None:
pass
@abstractmethod
def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
pass
We will implement this interface twice: once to create a materialised view stored in memory, and then again to create a materialised view stored in Postgres.
View in memory¶
The POPOEventCounters class implements EventCountersView using Python objects to hold the counted numbers
of events. It defines “private” attributes _created_event_counter and _subsequent_event_counter as Python
int objects initialised to zero. The values of these attributes are incremented
by the command methods and returned by the query methods.
It inherits the POPOTrackingRecorder class, using its database lock to
serialise commands, and the inherited methods _assert_tracking_uniqueness()
and _insert_tracking() to avoid events being counted more than once.
class POPOEventCounters(POPOTrackingRecorder, EventCountersView):
def __init__(self) -> None:
super().__init__()
self._created_event_counter = 0
self._subsequent_event_counter = 0
def get_created_event_counter(self) -> int:
return self._created_event_counter
def get_subsequent_event_counter(self) -> int:
return self._subsequent_event_counter
def incr_created_event_counter(self, tracking: Tracking) -> None:
with self._database_lock:
self._assert_tracking_uniqueness(tracking)
self._insert_tracking(tracking)
self._created_event_counter += 1
def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
with self._database_lock:
self._assert_tracking_uniqueness(tracking)
self._insert_tracking(tracking)
self._subsequent_event_counter += 1
The test case TestPOPOEventCounters extends will show whether the POPOEventCounters class satisfies our
requirements.
class TestPOPOEventCounters(EventCountersViewTestCase):
def construct_event_counters_view(self) -> EventCountersView:
return POPOEventCounters()
View in Postgres¶
The PostgresEventCounters class also implements EventCountersView. It defines a database table, and
SQL statements for incrementing and selecting event counters. The _select_counter() method is used to select
the current value of an event counter. The _incr_counter() method is used to increment a named event counter
atomically in the same database transaction as a tracking object is recorded.
It extends the PostgresTrackingRecorder class, using the
transaction() method to implement atomic transactions,
and the inherited method _insert_tracking()
to avoid events being counted more than once.
class PostgresEventCounters(PostgresTrackingRecorder, EventCountersView):
_created_event_counter_name = "CREATED_EVENTS"
_subsequent_event_counter_name = "SUBSEQUENT_EVENTS"
def __init__(
self,
datastore: PostgresDatastore,
**kwargs: Any,
):
super().__init__(datastore, **kwargs)
assert self.tracking_table_name.endswith("_tracking") # Because we replace it.
self.counters_table_name = self.tracking_table_name.replace("_tracking", "")
self.check_identifier_length(self.counters_table_name)
self.sql_create_statements.append(
SQL(
"CREATE TABLE IF NOT EXISTS {0}.{1} ("
"counter_name text, "
"counter bigint, "
"PRIMARY KEY "
"(counter_name))"
).format(
Identifier(self.datastore.schema),
Identifier(self.counters_table_name),
)
)
self.select_counter_statement = SQL(
"SELECT counter FROM {0}.{1} WHERE counter_name=%s"
).format(
Identifier(self.datastore.schema),
Identifier(self.counters_table_name),
)
self.incr_counter_statement = SQL(
"INSERT INTO {0}.{1} VALUES (%s, 1) "
"ON CONFLICT (counter_name) DO UPDATE "
"SET counter = {0}.{1}.counter + 1"
).format(
Identifier(self.datastore.schema),
Identifier(self.counters_table_name),
)
def get_created_event_counter(self) -> int:
return self._select_counter(self._created_event_counter_name)
def get_subsequent_event_counter(self) -> int:
return self._select_counter(self._subsequent_event_counter_name)
def incr_created_event_counter(self, tracking: Tracking) -> None:
self._incr_counter(self._created_event_counter_name, tracking)
def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
self._incr_counter(self._subsequent_event_counter_name, tracking)
def _select_counter(self, name: str) -> int:
with self.datastore.transaction(commit=False) as curs:
curs.execute(
query=self.select_counter_statement,
params=(name,),
prepare=True,
)
fetchone = curs.fetchone()
return fetchone["counter"] if fetchone else 0
def _incr_counter(self, name: str, tracking: Tracking) -> None:
with self.datastore.transaction(commit=True) as curs:
self._insert_tracking(curs, tracking)
curs.execute(
query=self.incr_counter_statement,
params=(name,),
prepare=True,
)
The test case TestPostgresEventCounters will show whether the PostgresEventCounters class satisfies our
requirements. This test uses the library’s InfrastructureFactory to construct
the materialised view object.
class TestPostgresEventCounters(EventCountersViewTestCase):
expected_factory_topic = "eventsourcing.postgres:PostgresFactory"
env: ClassVar[dict[str, str]] = {
"PERSISTENCE_MODULE": "eventsourcing.postgres",
"POSTGRES_DBNAME": "eventsourcing",
"POSTGRES_HOST": "127.0.0.1",
"POSTGRES_PORT": "5432",
"POSTGRES_USER": "eventsourcing",
"POSTGRES_PASSWORD": "eventsourcing",
"POSTGRES_SCHEMA": "public",
}
def setUp(self) -> None:
self.factory = InfrastructureFactory[EventCountersView].construct(self.env)
def tearDown(self) -> None:
self.factory.close()
drop_tables()
def construct_event_counters_view(self) -> EventCountersView:
return self.factory.tracking_recorder(PostgresEventCounters)
Projections¶
Let’s now consider how the events of an event-sourced application can be processed.
The library’s generic abstract base class Projection can be used to define how
the domain events of an event-sourced application will be processed. It is intended to be subclassed by users.
The source code of the Projection class is included here for clarity.
class Projection(ABC, Generic[TTrackingRecorder]):
name: str = ""
"""
Name of projection, used to pick prefixed environment
variables and define database table names.
"""
topics: tuple[str, ...] = ()
"""
Event topics, used to filter events in database when subscribing to an application.
"""
def __init_subclass__(cls, **kwargs: Any) -> None:
if "name" not in cls.__dict__:
cls.name = cls.__name__
def __init__(
self,
view: TTrackingRecorder,
):
"""Initialises the view property with the given view argument."""
self._view = view
@property
def view(self) -> TTrackingRecorder:
"""Materialised view of an event-sourced application."""
return self._view
@singledispatchmethod
@abstractmethod
def process_event(self, domain_event: Any, tracking: Tracking) -> None:
"""Process a domain event and track it."""
The Projection class is a generic class because it has a type parameter,
which is expected to be the interface type of a materialised view, with an upper bound for the type argument being
TrackingRecorder.
The constructor argument view is used to initialise the
view property. The annotated type of the constructor argument
and the property is bound to the type parameter. The given value of the constructor argument, and therefore
the value of the property, should be a concrete instance of a materialised view.
The Projection class is an abstract class because it defines an abstract method
process_event() that must be implemented by subclasses. Events will typically
be processed by calling methods on view.
To inform IDE code insight tooling, command completion, and static type checking, The type argument should be specified
by users when defining a subclass of Projection.
The Projection class has a name
attribute, which can be set on subclasses. It is expected to be a Python str. It is used by projection
runners, when constructing a materialised view object, to select prefixed environment variables, and in some cases
to specify database table names used by the materialised view. The value used by the projection runner defaults to
the class name of the subclass.
The Projection class has a topics
attribute, which can be set on subclasses. It is expected to be a Python tuple of str objects,
that contains domain events topics. Unless empty or undefined, projection runners will use these topics
when subscribing to an application, so that events can be filtered in the application database. The subscription will
then only yield events that have topics mentioned by this attribute. In many cases this will improve performance when
running a projection, by avoiding the cost of transporting and reconstructing events that will be ignored by the
projection. In some cases, filtering events by topic in this way will be necessary to avoid errors caused by attempting
to reconstruct domain event objects that have been recorded in the database which either your code or the library code
is not capable of reconstructing.
Projection example¶
The AggregateEventCountersProjection class processes events of an event-sourced application.
It inherits the Projection class, the type argument is EventCountersView.
It implements process_event() by calling incr_created_event_counter()
for Aggregate.Created events and incr_subsequent_event_counter()
for Aggregate.Event events.
If a SpannerThrown event is being processed, then an exception is raised, to demonstrate error handling in process runners.
class AggregateEventCountersProjection(Projection[EventCountersView]):
name = "eventcounters"
topics: tuple[str, ...] = (
get_topic(Aggregate.Created),
get_topic(Aggregate.Event),
get_topic(SpannerThrown),
)
@singledispatchmethod
def process_event(self, _: Aggregate.Event, tracking: Tracking) -> None:
self.view.insert_tracking(tracking)
@process_event.register
def aggregate_created(self, _: Aggregate.Created, tracking: Tracking) -> None:
self.view.incr_created_event_counter(tracking)
@process_event.register
def aggregate_event(self, _: Aggregate.Event, tracking: Tracking) -> None:
self.view.incr_subsequent_event_counter(tracking)
@process_event.register
def spanner_thrown(self, _: SpannerThrown, __: Tracking) -> None:
msg = "This is a deliberate bug"
raise SpannerThrownError(msg)
By default, other kinds of events will be processed by calling insert_tracking(),
so that progress along an application sequence can be recorded. This can help bridge large gaps without risking repetitive pulling
and processing of upstream events.
Projection runner¶
Let’s consider how to run the projection, so events of an event-sourced application can be counted.
As shown in the AggregateEventCountersProjectionTestCase class below, a projection runner
can be constructed with an application class, a projection class, a materialised view class, and an optional environment
Mapping object that specifies run-time configuration of the event-sourced application and the materialised view.
It will subscribe to the application,
from the position indicated by the view’s max_tracking_id() method,
and then call the process_event() method of the projection for each
domain event yielded by the application subscription.
The projection runner has a run_forever()
method, which blocks until an optional timeout, or until an exception is raised by the projection or
by the subscription, or until the projection runner is stopped by calling its stop() method.
This allows an event processing component to be started and run independently as a
separate operating system process for a controllable period of time, and then to terminate in a controlled
way when there is an error, or when it is interrupted.
Exceptions raised whilst running the projection will be re-raised by the
run_forever() method. Operators of the system can examine
any errors and resume processing by reconstructing the runner. Some types of errors may be transient operational
errors, such as database connectivity, in which case the processing could be resumed automatically. Some errors
may be programming errors, and will require manual intervention before the event processing can continue.
The wait() method of view objects can be used by
user interface requests, to wait until an event has been processed by the projection before calling a query method
on the materialised view. Separate instances of the view object can also be used to support such requests, if
the view is backed by a durable database.
class AggregateEventCountersProjectionTestCase(TestCase, ABC):
view_class: type[EventCountersView]
env: ClassVar[dict[str, str]] = {}
def test_event_counters_projection(self) -> None:
# Construct runner with application, projection, and recorder.
with ProjectionRunner(
application_class=Application[UUID],
projection_class=AggregateEventCountersProjection,
view_class=self.view_class,
env=self.env,
) as runner:
# Get "read" and "write" model instances from the runner.
write_model = runner.app
read_model = runner.projection.view
# Write some events.
aggregate = Aggregate()
aggregate.trigger_event(event_class=Aggregate.Event)
aggregate.trigger_event(event_class=Aggregate.Event)
recordings = write_model.save(aggregate)
# Wait for the events to be processed.
read_model.wait(
application_name=write_model.name,
notification_id=recordings[-1].notification.id,
)
# Query the read model.
self.assertEqual(read_model.get_created_event_counter(), 1)
self.assertEqual(read_model.get_subsequent_event_counter(), 2)
# Write some more events.
aggregate = Aggregate()
aggregate.trigger_event(event_class=Aggregate.Event)
aggregate.trigger_event(event_class=Aggregate.Event)
recordings = write_model.save(aggregate)
# Wait for the events to be processed.
read_model.wait(
application_name=write_model.name,
notification_id=recordings[-1].notification.id,
)
# Query the read model.
self.assertEqual(read_model.get_created_event_counter(), 2)
self.assertEqual(read_model.get_subsequent_event_counter(), 4)
def test_run_forever_raises_projection_error(self) -> None:
# Construct runner with application, projection, and recorder.
with ProjectionRunner(
application_class=Application[UUID],
projection_class=AggregateEventCountersProjection,
view_class=self.view_class,
env=self.env,
) as runner:
write_model = runner.app
read_model = runner.projection.view
# Write some events.
aggregate = Aggregate()
aggregate.trigger_event(event_class=SpannerThrown)
recordings = write_model.save(aggregate)
# Projection runner terminates with projection error.
with self.assertRaises(SpannerThrownError):
runner.run_forever(timeout=5)
# Wait times out (event has not been processed).
with self.assertRaises(TimeoutError):
read_model.wait(
application_name=write_model.name,
notification_id=recordings[-1].notification.id,
)
The test method test_event_counters_projection() constructs a runner, and from the runner gets references
to an event-sourced application “write model” and a materialised view “read model”.
Events are generated in the event-sourced application “write model”. These events are then
processed, according to the logic of the AggregateEventCountersProjection, by updating the
materialised event counters view “read model”.
The counted numbers of each kind of event can be are obtained from the “read model”. The materialised
view is “eventually consistent” because the event processing is asynchronous, and so the
wait() method is used to wait for the events to be
processed before querying the materialised view. This makes the test deterministic.
The test method test_run_forever_raises_projection_error() shows that after a SpannerThrown event
is generated by the event-sourced application, an error occurs with the event-processing, and an exception
representing this error is raised by the runner’s run_forever()
method. Because the event will not successfully be processed, the materialised view’s wait()
method will time out.
Running projection in memory¶
The test case below runs the projection class with the POPOEventCounters view.
class TestAggregateEventCountersProjectionWithPOPO(
AggregateEventCountersProjectionTestCase
):
view_class: type[EventCountersView] = POPOEventCounters
When generating and counting events in memory, we need to use the projection runner’s application and view objects for generating events and getting counted numbers.
But, as we shall see, when using a durable database such as Postgres, the events can be generated using other instances of the application, and the materialised view can be queries using other instances of the view class.
Running projection in Postgres¶
The test case below runs the projection class with the PostgresEventCounters view.
Because this test case uses a durable database for both the event-sourced application and the materialised view, any instance of the application can be used to write events, and any instance of the materialised view can be used to obtain the counted numbers of events.
class TestAggregateEventCountersProjectionWithPostgres(
AggregateEventCountersProjectionTestCase
):
view_class = PostgresEventCounters
env: ClassVar[dict[str, str]] = {
"APPLICATION_PERSISTENCE_MODULE": "eventsourcing.postgres",
"APPLICATION_POSTGRES_DBNAME": "eventsourcing",
"APPLICATION_POSTGRES_HOST": "127.0.0.1",
"APPLICATION_POSTGRES_PORT": "5432",
"APPLICATION_POSTGRES_USER": "eventsourcing",
"APPLICATION_POSTGRES_PASSWORD": "eventsourcing",
"EVENTCOUNTERS_PERSISTENCE_MODULE": "eventsourcing.postgres",
"EVENTCOUNTERS_POSTGRES_DBNAME": "eventsourcing",
"EVENTCOUNTERS_POSTGRES_HOST": "127.0.0.1",
"EVENTCOUNTERS_POSTGRES_PORT": "5432",
"EVENTCOUNTERS_POSTGRES_USER": "eventsourcing",
"EVENTCOUNTERS_POSTGRES_PASSWORD": "eventsourcing",
}
def setUp(self) -> None:
drop_tables()
super().setUp()
def tearDown(self) -> None:
super().tearDown()
drop_tables()
def test_event_counters_projection(self) -> None:
super().test_event_counters_projection()
# Resume....
with ProjectionRunner(
application_class=Application[UUID],
projection_class=AggregateEventCountersProjection,
view_class=self.view_class,
env=self.env,
):
# Construct separate instance of "write model".
write_model = Application[UUID](self.env)
# Construct separate instance of "read model".
read_model = (
InfrastructureFactory[EventCountersView]
.construct(
env=Environment(
name=AggregateEventCountersProjection.name, env=self.env
)
)
.tracking_recorder(self.view_class)
)
# Write some events.
aggregate = Aggregate()
aggregate.trigger_event(event_class=Aggregate.Event)
aggregate.trigger_event(event_class=Aggregate.Event)
recordings = write_model.save(aggregate)
# Wait for events to be processed.
read_model.wait(
application_name=write_model.name,
notification_id=recordings[-1].notification.id,
)
# Query the read model.
self.assertEqual(read_model.get_created_event_counter(), 3)
self.assertEqual(read_model.get_subsequent_event_counter(), 6)
# Write some more events.
aggregate = Aggregate()
aggregate.trigger_event(event_class=Aggregate.Event)
aggregate.trigger_event(event_class=Aggregate.Event)
recordings = write_model.save(aggregate)
# Wait for events to be processed.
read_model.wait(
application_name=write_model.name,
notification_id=recordings[-1].notification.id,
)
# Query the read model.
self.assertEqual(read_model.get_created_event_counter(), 4)
self.assertEqual(read_model.get_subsequent_event_counter(), 8)
def test_run_forever_raises_projection_error(self) -> None:
super().test_run_forever_raises_projection_error()
# Resume...
with ProjectionRunner(
application_class=Application[UUID],
projection_class=AggregateEventCountersProjection,
view_class=self.view_class,
env=self.env,
) as runner:
# Construct separate instance of "write model".
write_model = Application[UUID](self.env)
# Construct separate instance of "read model".
read_model = InfrastructureFactory.construct(
env=Environment(
name=AggregateEventCountersProjection.name, env=self.env
)
).tracking_recorder(self.view_class)
# Still terminates with projection error.
with self.assertRaises(SpannerThrownError):
runner.run_forever()
# Wait times out (event has not been processed).
with self.assertRaises(TimeoutError):
read_model.wait(
application_name=write_model.name,
notification_id=write_model.recorder.max_notification_id(),
)
If a durable database is used in production, the event-sourced application object and the materialised view object could be horizontally scaled, with the event-processing component running as a separate operating system process.
This is demonstrated by extending the test methods: to resume processing by reconstructing the projection runner, to
construct separate instances of the event-sourced application and of the materialised view, to continue generating
events using the separate instance of the event-sourced application, to query the counted numbers of events using
the separate instance of the materialised view, to expect the obtained counted numbers of events correctly report
the total numbers of events that have been generated, and to show that the event-processing error is still raised
and that the event-processing has not continued after the error caused by the SpannerThrown event.
The event-sourced application and the materialised view could also use separate databases. But in this example they are configured more simply to use different tables in the same database.
Exercises¶
Replicate the code in this tutorial in your development environment.
Develop and run a projection that counts dogs and tricks from a
DogSchoolapplication.Develop and run an event-sourced projection that counts dogs and tricks from a
DogSchoolapplication.
Next steps¶
Read the projection module documentation.
See also the Example projections.
To continue this tutorial, please read Part 5.