Tutorial - Part 4 - Projections¶
This part of the tutorial shows how the state of an event-sourced application can be projected into a materialised view that supports arbitrary queries.
As we saw in Part 3, we can use the library’s
Application
class to define an event-sourced
application. The state of an event-sourced application is recorded as a set of events,
each of which is positioned in one of many “aggregate sequences” and also in the overall
“application sequence”.
We can reconstruct an aggregate by selecting events from an aggregate sequence, and we can propagate the state of an application by following its application sequence. But we can only query the current state of an event-sourced application in other ways by firstly projecting the application state into a “read model” that is designed to support such queries.
So that such queries can be performed quickly, it is useful to project the state of an event-sourced application into a read model that is stored in a database. Such “materialised views” are usually prepared and updated by processing the events of the event-sourced application in a separate event-processing component, which subscribes to the application sequence so that it can catch-up and then continue as further events are recorded.
In the sections below we will explore how the events of event-sourced applications can be processed into materialised views that support queries that are not directly supported by event-sourced applications.
Views¶
Materialised views are primarily designed to support queries of the state of an event-sourced application that are not directly supported by the event-sourced application itself. Therefore, every materialised view will need to have at least one query method. Materialised views will also need to be updated when events are processed, therefore each materialised view will also need at least one command method.
In order to ensure a materialised view is a reliable deterministic projection of the state of an event-sourced application, it will need to track which events have been processed. We will need it to record the positions of events that have been processed uniquely and atomically with the updating of the materialised view, so that effectively each event is never successfully processed more than once. We will also need the materialised view to report the position of the last event that has been processed, so that event processing can be stopped and resumed correctly. Because of the arbitrary nature of queries supported by materialised views, this is the only common aspect shared by all materialised views. In this sense, all materialised views are special kinds of tracking recorders.
We can define an interface for a materialised view using the library’s abstract TrackingRecorder
class and develop concrete implementations using the library’s concrete tracking recorder classes,
POPOTrackingRecorder
, SQLiteTrackingRecorder
, and
PostgresTrackingRecorder
. The interface will allow us to define how the materialised
view will be queried and how events will be processed independently of any concrete implementation. The concrete
tracking recorders make it easier to correctly implement command methods that record the positions of events uniquely
and atomically in a particular database.
To show how this can work, let’s create a materialised view for counting events that can record and reveal the number of “created” events and the number of subsequent events that have been generated by an event-sourced application.
View test case¶
Let’s begin by writing a test. The EventCountersViewTestCase
shown below expresses requirements for recording and
revealing the number of events generated by an event-sourced application.
It begins by constructing an “event counters view” object. The view object is expected to be a
TrackingRecorder
, and therefore to have a
max_tracking_id()
method. This method will be called when
starting or resuming the processing of events, so that event-processing can run from the correct position
in an application sequence.
The test continues by calling query methods to get the counted number of “created” and subsequent events. The query methods are expected to return integers. The values returned by the query methods are expected to be equal to the number of times the corresponding command methods have been called.
The command methods are called to increment the counted number of events. The command methods are called
with a unique tracking object. The value returned by max_tracking_id()
is expected to reflect which tracking objects
have been recorded by the command methods. The command methods are expected to raise an IntegrityError
when called more than once with the same tracking object. The counted numbers are expected not to change when an
IntegrityError
is raised. The view’s wait()
method is called with a position that has been recorded, expecting the method to return normally without raising
an exception. The wait()
method is called with a position that
has not been recorded, expecting that a TimeoutError
will be raised.
class EventCountersViewTestCase(TestCase):
def construct_event_counters_view(self) -> EventCountersInterface:
raise NotImplementedError
def test(self) -> None:
# Construct materialised view object.
view = self.construct_event_counters_view()
# Check the view object is a tracking recorder.
self.assertIsInstance(view, TrackingRecorder)
# Check the view has processed no events.
self.assertIsNone(view.max_tracking_id("upstream"))
# Check the event counters are zero.
self.assertEqual(view.get_created_event_counter(), 0)
self.assertEqual(view.get_subsequent_event_counter(), 0)
# Increment the "created" event counter.
view.incr_created_event_counter(Tracking("upstream", 1))
# Check the counted number of events.
self.assertEqual(view.get_created_event_counter(), 1)
self.assertEqual(view.get_subsequent_event_counter(), 0)
# Increment the subsequent event counter.
view.incr_subsequent_event_counter(Tracking("upstream", 2))
# Check the counted number of events.
self.assertEqual(view.get_created_event_counter(), 1)
self.assertEqual(view.get_subsequent_event_counter(), 1)
# Increment the subsequent event counter again.
view.incr_subsequent_event_counter(Tracking("upstream", 3))
# Check the counted number of events.
self.assertEqual(view.get_created_event_counter(), 1)
self.assertEqual(view.get_subsequent_event_counter(), 2)
# Check the tracking objects have been recorded.
self.assertEqual(view.max_tracking_id("upstream"), 3)
# Check the tracking objects are recorded uniquely and atomically.
with self.assertRaises(IntegrityError):
view.incr_created_event_counter(Tracking("upstream", 3))
# Check the counted number of events.
self.assertEqual(view.get_created_event_counter(), 1)
with self.assertRaises(IntegrityError):
view.incr_subsequent_event_counter(Tracking("upstream", 3))
# Check the counted number of events.
self.assertEqual(view.get_subsequent_event_counter(), 2)
# Check the wait() method returns normally when
# we wait for a position that has been recorded.
view.wait("upstream", 3)
# Check the wait() method raises a TimeoutError when
# we wait for a postion that has not been recorded.
with self.assertRaises(TimeoutError):
view.wait("upstream", 4, timeout=0.5)
View interface¶
Towards satisfying this test, the EventCountersInterface
class shown below extends the library’s abstract base
class TrackingRecorder
by defining abstract command and query methods with the
expected method names and method signatures.
class EventCountersInterface(TrackingRecorder):
@abstractmethod
def get_created_event_counter(self) -> int:
pass
@abstractmethod
def get_subsequent_event_counter(self) -> int:
pass
@abstractmethod
def incr_created_event_counter(self, tracking: Tracking) -> None:
pass
@abstractmethod
def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
pass
The query methods get_created_event_counter()
and get_subsequent_event_counter()
return integers and have no arguments.
The command methods incr_created_event_counter()
and incr_subsequent_event_counter()
have a tracking
argument expected to be an instance of the library’s Tracking
class and return nothing.
In-memory view¶
We can implement the interface so that the counted numbers of events are held in memory. The POPOEventCounters
class,
shown below, implements the abstract interface using Python objects to hold the counted numbers of events in memory.
It defines “private” attributes _created_event_counter
and _subsequent_event_counter
. The values of these
attributes are Python int
objects, initialised to be zero. The values of these attributes are returned by
the query methods, and incremented by the command methods.
class POPOEventCounters(POPOTrackingRecorder, EventCountersInterface):
def __init__(self) -> None:
super().__init__()
self._created_event_counter = 0
self._subsequent_event_counter = 0
def get_created_event_counter(self) -> int:
return self._created_event_counter
def get_subsequent_event_counter(self) -> int:
return self._subsequent_event_counter
def incr_created_event_counter(self, tracking: Tracking) -> None:
with self._database_lock:
self._assert_tracking_uniqueness(tracking)
self._insert_tracking(tracking)
self._created_event_counter += 1
def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
with self._database_lock:
self._assert_tracking_uniqueness(tracking)
self._insert_tracking(tracking)
self._subsequent_event_counter += 1
It inherits and extends the POPOTrackingRecorder
class, using its database lock to
serialise commands, so that the data integrity of the counters is preserved. It uses the “private” methods
_assert_tracking_uniqueness()
and _insert_tracking()
to avoid any event being counted more than once, whilst keeping track of which events have been processed so that
event-processing can be resumed correctly.
The test case TestPOPOEventCounters
will show whether the POPOEventCounters
class satisfies our
requirements. Running this test shows that it does.
class TestPOPOEventCounters(EventCountersViewTestCase):
def construct_event_counters_view(self) -> EventCountersInterface:
return POPOEventCounters()
PostgreSQL view¶
Similarly, we can implement the interface to store the counted numbers of events in PostgreSQL. The PostgresEventCounters
class, shown below, implements EventCountersInterface
by defining a database table for event counters with SQL
statements that select and increment event counters.
class PostgresEventCounters(PostgresTrackingRecorder, EventCountersInterface):
_created_event_counter_name = "CREATED_EVENTS"
_subsequent_event_counter_name = "SUBSEQUENT_EVENTS"
def __init__(
self,
datastore: PostgresDatastore,
**kwargs: Any,
):
super().__init__(datastore, **kwargs)
assert self.tracking_table_name.endswith("_tracking") # Because we replace it.
self.counters_table_name = self.tracking_table_name.replace("_tracking", "")
self.check_table_name_length(self.counters_table_name)
self.create_table_statements.append(
SQL(
"CREATE TABLE IF NOT EXISTS {0}.{1} ("
"counter_name text, "
"counter bigint, "
"PRIMARY KEY "
"(counter_name))"
).format(
Identifier(self.datastore.schema),
Identifier(self.counters_table_name),
)
)
self.select_counter_statement = SQL(
"SELECT counter FROM {0}.{1} WHERE counter_name=%s"
).format(
Identifier(self.datastore.schema),
Identifier(self.counters_table_name),
)
self.incr_counter_statement = SQL(
"INSERT INTO {0}.{1} VALUES (%s, 1) "
"ON CONFLICT (counter_name) DO UPDATE "
"SET counter = {0}.{1}.counter + 1"
).format(
Identifier(self.datastore.schema),
Identifier(self.counters_table_name),
)
def get_created_event_counter(self) -> int:
return self._select_counter(self._created_event_counter_name)
def get_subsequent_event_counter(self) -> int:
return self._select_counter(self._subsequent_event_counter_name)
def incr_created_event_counter(self, tracking: Tracking) -> None:
self._incr_counter(self._created_event_counter_name, tracking)
def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
self._incr_counter(self._subsequent_event_counter_name, tracking)
def _select_counter(self, name: str) -> int:
with self.datastore.transaction(commit=False) as curs:
curs.execute(
query=self.select_counter_statement,
params=(name,),
prepare=True,
)
fetchone = curs.fetchone()
return fetchone["counter"] if fetchone else 0
def _incr_counter(self, name: str, tracking: Tracking) -> None:
with self.datastore.transaction(commit=True) as curs:
self._insert_tracking(curs, tracking)
curs.execute(
query=self.incr_counter_statement,
params=(name,),
prepare=True,
)
The _select_counter()
method is used to select the current value of an event counter.
The _incr_counter()
method is used to increment a named event counter atomically in the
same database transaction as a tracking object is recorded.
It inherits and extends the PostgresTrackingRecorder
class, using the
transaction()
method of its datastore so that the data
integrity of the counters is preserved.
It uses the “private” _insert_tracking()
method of
PostgresTrackingRecorder
to avoid any event being counted more than once,
whilst keeping track of which events have been processed so that
event-processing can be resumed correctly.
The test case TestPostgresEventCounters
will show whether the PostgresEventCounters
class satisfies our
requirements. Running this test shows that it does.
class TestPostgresEventCounters(EventCountersViewTestCase):
def setUp(self) -> None:
self.factory = cast(
PostgresFactory,
InfrastructureFactory.construct(
env=Environment(
name="eventcounters",
env={
"PERSISTENCE_MODULE": "eventsourcing.postgres",
"POSTGRES_DBNAME": "eventsourcing",
"POSTGRES_HOST": "127.0.0.1",
"POSTGRES_PORT": "5432",
"POSTGRES_USER": "eventsourcing",
"POSTGRES_PASSWORD": "eventsourcing",
},
)
),
)
def construct_event_counters_view(self) -> EventCountersInterface:
return self.factory.tracking_recorder(PostgresEventCounters)
def tearDown(self) -> None:
drop_tables()
This test uses the library’s InfrastructureFactory
to construct the materialised
view object, which is configured using environment variables. This powerful technique for constructing materialised
view objects is used internally by the library’s projection runner class, which is described at the end of this
part of the tutorial.
Projections¶
Let’s now consider how the events of an event-sourced application can be processed.
The library’s generic abstract base class Projection
can be used to define how
the domain events of an event-sourced application will be processed. It is intended to be subclassed by users.
class Projection(ABC, Generic[TTrackingRecorder]):
name: str = ""
"""
Name of projection, used to pick prefixed environment
variables and define database table names.
"""
topics: tuple[str, ...] = ()
"""
Event topics, used to filter events in database when subscribing to an application.
"""
def __init_subclass__(cls, **kwargs: Any) -> None:
if "name" not in cls.__dict__:
cls.name = cls.__name__
def __init__(
self,
view: TTrackingRecorder,
):
"""Initialises the view property with the given view argument."""
self._view = view
@property
def view(self) -> TTrackingRecorder:
"""Materialised view of an event-sourced application."""
return self._view
@singledispatchmethod
@abstractmethod
def process_event(
self, domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking
) -> None:
"""Process a domain event and track it."""
The Projection
class is a generic class because it has one type argument, expected
to be the interface of a materialised view, a subclass of TrackingRecorder
. The type
argument should be specified by users when defining a subclass of Projection
, to
inform IDE code insight, command completion, and static type checking.
The Projection
class has one required constructor argument,
view
, and one property also called
view
. The constructor argument is used to initialise the property.
The annotated type of both is bound to the type argument of Projection
.
The given value of the constructor argument, and therefore the value of the property, should be a concrete instance
of a materialised view.
The Projection
class is an abstract class because it defines an abstract method
process_event()
that must be implemented by subclasses. Events will typically
be processed by calling methods on view
.
The Projection
class has a name
attribute, which can be set on subclasses. It is expected to be a Python str
. It is used by projection
runners, when constructing a materialised view object, to select prefixed environment variables, and in some cases
to specify database table names used by the materialised view. The value used by the projection runner defaults to
the class name of the subclass.
The Projection
class has a topics
attribute, which can be set on subclasses. It is expected to be a Python tuple
of str
objects,
that contains domain events topics. Unless empty or undefined, projection runners will use these topics
when subscribing to an application, so that events can be filtered in the application database. The subscription will
then only yield events that have topics mentioned by this attribute. In many cases this will improve performance when
running a projection, by avoiding the cost of transporting and reconstructing events that will be ignored by the
projection. In some cases, filtering events by topic in this way will be necessary to avoid errors caused by attempting
to reconstruct domain event objects that have been recorded in the database which either your code or the library code
is not capable of reconstructing.
Example projection¶
For example, the EventCountersProjection
class, shown below, processes events of an event-sourced application by
calling methods of a concrete instance of EventCountersInterface
. It inherits the Projection
class, setting the type argument as EventCountersInterface
class. It sets the name
attribute as 'eventcounters'
. It sets the topics
attribute
to mention the topics of the domain events processed by this projection.
The EventCountersProjection
class implements the process_event()
by calling the incr_created_event_counter()
and incr_subsequent_event_counter()
methods of EventCountersInterface
available on view
.
If the given event is an Aggregate.Created
event,
then incr_created_event_counter()
is called. Alternatively, incr_subsequent_event_counter()
is called
if the given event is an Aggregate.Event
. An exception is raised
if the given event is a SpannerThrown
event, to demonstrate error handling in process runners. Other kinds
of events can be handled by calling insert_tracking()
so that
progress along an application sequence can be recorded, for example if large gaps are spanned by an application
subscription returning checkpoints.
class EventCountersProjection(Projection[EventCountersInterface]):
name = "eventcounters"
topics: tuple[str, ...] = (
get_topic(Aggregate.Created),
get_topic(Aggregate.Event),
get_topic(SpannerThrown),
)
@singledispatchmethod
def process_event(
self, _: DomainEventProtocol[TAggregateID], tracking: Tracking
) -> None:
self.view.insert_tracking(tracking)
@process_event.register
def aggregate_created(self, event: Aggregate.Created, tracking: Tracking) -> None:
self.view.incr_created_event_counter(tracking)
@process_event.register
def aggregate_event(self, _: Aggregate.Event, tracking: Tracking) -> None:
self.view.incr_subsequent_event_counter(tracking)
@process_event.register
def spanner_thrown(self, _: SpannerThrown, __: Tracking) -> None:
msg = "This is a deliberate bug"
raise SpannerThrownError(msg)
Runners¶
Let’s consider how to run the projection, so events of an event-sourced application can be counted.
The library’s ProjectionRunner
class is provided for the purpose
of running projections. A projection runner can be constructed with an application class, a projection
class, a materialised view class, and an optional environment Mapping
object that specifies run-time
configuration of the event-sourced application and the materialised view.
The projection runner will construct an instance of the given application class, and an instance of the given projection class with an instance of the given materialised view class. Any items in the given environment will be used to override any operating system environment variables, and the resulting environment variables will be used when the application object is constructed and when the materialised view class is constructed. The names of the application and of the projection are used to select prefixed environment variables before defaulting to unprefixed names. In this way, the event-sourced application can be configured separately from the materialised view.
It will subscribe to the application, from the position indicated by the
max_tracking_id()
method of the projection’s
materialised view, and then call the process_event()
method of the projection for each domain event yielded by the application subscription.
Because the projection runner starts a subscription to the application, it will first catch up by processing already recorded events that have not yet been processed, and then it will continue to process events that are subsequently recorded in the application’s database.
The ProjectionRunner
class has a run_forever()
method, which blocks until an optional timeout, or until an exception is raised by the projection or
by the subscription, or until the projection runner is stopped by calling its stop()
method.
This allows an event processing component to be started and run independently as a
separate operating system process for a controllable period of time, and then to terminate in a controlled
way when there is an error, or when it is interrupted. Exceptions raised whilst running the projection will be re-raised by the
run_forever()
method. Operators of the system can examine
any errors and resume processing by reconstructing the runner. Some types of errors may be transient operational
errors, such as database connectivity, in which case the processing could be resumed automatically. Some errors
may be programming errors, and will require manual intervention before the event processing can continue.
The wait()
method of materialised view objects can be used
to wait until an event has been processed by the projection before calling a query method on the materialised view.
Counting events in memory¶
For example, the TestEventCountersProjection
class, shown below, tests the EventCountersProjection
projection class with the POPOEventCounters
class.
class TestEventCountersProjection(TestCase, ABC):
view_class: type[EventCountersInterface] = POPOEventCounters
env: ClassVar[dict[str, str]] = {}
def test_event_counters_projection(self) -> None:
# Construct runner with application, projection, and recorder.
with ProjectionRunner(
application_class=Application[UUID],
projection_class=EventCountersProjection,
view_class=self.view_class,
env=self.env,
) as runner:
# Get "read" and "write" model instances from the runner.
write_model = runner.app
read_model = runner.projection.view
# Write some events.
aggregate = Aggregate()
aggregate.trigger_event(event_class=Aggregate.Event)
aggregate.trigger_event(event_class=Aggregate.Event)
recordings = write_model.save(aggregate)
# Wait for the events to be processed.
read_model.wait(
application_name=write_model.name,
notification_id=recordings[-1].notification.id,
)
# Query the read model.
self.assertEqual(read_model.get_created_event_counter(), 1)
self.assertEqual(read_model.get_subsequent_event_counter(), 2)
# Write some more events.
aggregate = Aggregate()
aggregate.trigger_event(event_class=Aggregate.Event)
aggregate.trigger_event(event_class=Aggregate.Event)
recordings = write_model.save(aggregate)
# Wait for the events to be processed.
read_model.wait(
application_name=write_model.name,
notification_id=recordings[-1].notification.id,
)
# Query the read model.
self.assertEqual(read_model.get_created_event_counter(), 2)
self.assertEqual(read_model.get_subsequent_event_counter(), 4)
def test_run_forever_raises_projection_error(self) -> None:
# Construct runner with application, projection, and recorder.
with ProjectionRunner(
application_class=Application[UUID],
projection_class=EventCountersProjection,
view_class=self.view_class,
env=self.env,
) as runner:
write_model = runner.app
read_model = runner.projection.view
# Write some events.
aggregate = Aggregate()
aggregate.trigger_event(event_class=SpannerThrown)
recordings = write_model.save(aggregate)
# Projection runner terminates with projection error.
with self.assertRaises(SpannerThrownError):
runner.run_forever()
# Wait times out (event has not been processed).
with self.assertRaises(TimeoutError):
read_model.wait(
application_name=write_model.name,
notification_id=recordings[-1].notification.id,
)
The method test_event_counters_projection()
constructs a runner, and from the runner gets references
to an event-sourced application “write model” and a materialised view “read model”.
In this case, because we are generating and counting events in memory, we need to use the same instance of the application object and of the materialised view object as the projection runner for generating events and for getting the counted numbers of events.
Events are generated in the event-sourced application “write model”. The “created” and subsequent events are
processed, by updating the materialised view “read model”, according to the logic of the EventCountersProjection
projection. The counted numbers of each kind of event are obtained from the “read model”. The materialised view is
“eventually consistent” because the event processing is asynchronous, and so the
wait()
method is used to wait for the events to be
processed before querying the materialised view.
The method test_run_forever_raises_projection_error()
shows that after a SpannerThrown
event
is generated by the event-sourced application, an error occurs with the event-processing, and an exception
representing this error is raised by the runner’s run_forever()
method. Because the event will not successfully be processed, the materialised view’s wait()
method will time out.
Counting events in PostgreSQL¶
The TestEventCountersProjectionWithPostgres
class extends TestEventCountersProjection
and runs
EventCountersProjection
with the PostgresEventCounters
class.
Because this test case uses a durable database for both the event-sourced application and the materialised view,
any instance of the application can be used to write events, and any instance of the materialised view can be used to
obtain the counted numbers of events. If a durable database is used in production, the event-sourced
application object and the materialised view object could be horizontally scaled, with the event-processing component
running as a separate operating system process. This is demonstrated by extending the test methods:
to resume processing by reconstructing the projection runner, to construct separate instances of the event-sourced
application and of the materialised view, to continue generating events using the separate instance of the event-sourced
application, to query the counted numbers of events using the separate instance of the materialised view, to expect
the obtained counted numbers of events correctly report the total numbers of events that have been generated, and to
show that the event-processing error is still raised and that the event-processing has not continued after the error
caused by the SpannerThrown
event. The event-sourced application and the materialised view could also use separate
databases. But in this example they are configured more simply to use different tables in the same database.
class TestEventCountersProjectionWithPostgres(TestEventCountersProjection):
view_class = PostgresEventCounters
env: ClassVar[dict[str, str]] = {
"APPLICATION_PERSISTENCE_MODULE": "eventsourcing.postgres",
"APPLICATION_POSTGRES_DBNAME": "eventsourcing",
"APPLICATION_POSTGRES_HOST": "127.0.0.1",
"APPLICATION_POSTGRES_PORT": "5432",
"APPLICATION_POSTGRES_USER": "eventsourcing",
"APPLICATION_POSTGRES_PASSWORD": "eventsourcing",
"EVENTCOUNTERS_PERSISTENCE_MODULE": "eventsourcing.postgres",
"EVENTCOUNTERS_POSTGRES_DBNAME": "eventsourcing",
"EVENTCOUNTERS_POSTGRES_HOST": "127.0.0.1",
"EVENTCOUNTERS_POSTGRES_PORT": "5432",
"EVENTCOUNTERS_POSTGRES_USER": "eventsourcing",
"EVENTCOUNTERS_POSTGRES_PASSWORD": "eventsourcing",
}
def test_event_counters_projection(self) -> None:
super().test_event_counters_projection()
# Resume....
with ProjectionRunner(
application_class=Application[UUID],
projection_class=EventCountersProjection,
view_class=self.view_class,
env=self.env,
):
# Construct separate instance of "write model".
write_model = Application[UUID](self.env)
# Construct separate instance of "read model".
read_model = (
InfrastructureFactory[EventCountersInterface]
.construct(
env=Environment(name=EventCountersProjection.name, env=self.env)
)
.tracking_recorder(self.view_class)
)
# Write some events.
aggregate = Aggregate()
aggregate.trigger_event(event_class=Aggregate.Event)
aggregate.trigger_event(event_class=Aggregate.Event)
recordings = write_model.save(aggregate)
# Wait for events to be processed.
read_model.wait(
application_name=write_model.name,
notification_id=recordings[-1].notification.id,
)
# Query the read model.
self.assertEqual(read_model.get_created_event_counter(), 3)
self.assertEqual(read_model.get_subsequent_event_counter(), 6)
# Write some more events.
aggregate = Aggregate()
aggregate.trigger_event(event_class=Aggregate.Event)
aggregate.trigger_event(event_class=Aggregate.Event)
recordings = write_model.save(aggregate)
# Wait for events to be processed.
read_model.wait(
application_name=write_model.name,
notification_id=recordings[-1].notification.id,
)
# Query the read model.
self.assertEqual(read_model.get_created_event_counter(), 4)
self.assertEqual(read_model.get_subsequent_event_counter(), 8)
def test_run_forever_raises_projection_error(self) -> None:
super().test_run_forever_raises_projection_error()
# Resume...
with ProjectionRunner(
application_class=Application[UUID],
projection_class=EventCountersProjection,
view_class=self.view_class,
env=self.env,
) as runner:
# Construct separate instance of "write model".
write_model = Application[UUID](self.env)
# Construct separate instance of "read model".
read_model = InfrastructureFactory.construct(
env=Environment(name=EventCountersProjection.name, env=self.env)
).tracking_recorder(self.view_class)
# Still terminates with projection error.
with self.assertRaises(SpannerThrownError):
runner.run_forever()
# Wait times out (event has not been processed).
with self.assertRaises(TimeoutError):
read_model.wait(
application_name=write_model.name,
notification_id=write_model.recorder.max_notification_id(),
)
def setUp(self) -> None:
drop_tables()
super().setUp()
def tearDown(self) -> None:
super().tearDown()
drop_tables()
Exercises¶
Replicate the code in this tutorial in your development environment.
Develop and run a projection that counts dogs and tricks from a
DogSchool
application.Develop and run an event-sourced projection that counts dogs and tricks from a
DogSchool
application.
Next steps¶
Read the projection module documentation.
See also the Example projections.
To continue this tutorial, please read Part 5.