Tutorial - Part 4 - Projections

This part of the tutorial shows how the state of an event-sourced application can be projected into a materialised view that supports arbitrary queries.

As we saw in Part 3, we can use the library’s Application class to define an event-sourced application. The state of an event-sourced application is recorded as a set of events, each of which is positioned in one of many “aggregate sequences” and also in the overall “application sequence”.

We can reconstruct an aggregate by selecting events from an aggregate sequence, and we can propagate the state of an application by following its application sequence. But we can only query the current state of an event-sourced application in other ways by firstly projecting the application state into a “read model” that is designed to support such queries.

So that such queries can be performed quickly, it is useful to project the state of an event-sourced application into a read model that is stored in a database. Such “materialised views” are usually prepared and updated by processing the events of the event-sourced application in a separate event-processing component, which subscribes to the application sequence so that it can catch-up and then continue as further events are recorded.

In the sections below we will explore how the events of event-sourced applications can be processed into materialised views that support queries that are not directly supported by event-sourced applications.

Views

Materialised views are primarily designed to support queries of the state of an event-sourced application that are not directly supported by the event-sourced application itself. Therefore, every materialised view will need to have at least one query method. Materialised views will also need to be updated when events are processed, therefore each materialised view will also need at least one command method.

In order to ensure a materialised view is a reliable deterministic projection of the state of an event-sourced application, it will need to track which events have been processed. We will need it to record the positions of events that have been processed uniquely and atomically with the updating of the materialised view, so that effectively each event is never successfully processed more than once. We will also need the materialised view to report the position of the last event that has been processed, so that event processing can be stopped and resumed correctly. Because of the arbitrary nature of queries supported by materialised views, this is the only common aspect shared by all materialised views. In this sense, all materialised views are special kinds of tracking recorders.

We can define an interface for a materialised view using the library’s abstract TrackingRecorder class and develop concrete implementations using the library’s concrete tracking recorder classes, POPOTrackingRecorder, SQLiteTrackingRecorder, and PostgresTrackingRecorder. The interface will allow us to define how the materialised view will be queried and how events will be processed independently of any concrete implementation. The concrete tracking recorders make it easier to correctly implement command methods that record the positions of events uniquely and atomically in a particular database.

To show how this can work, let’s create a materialised view for counting events that can record and reveal the number of “created” events and the number of subsequent events that have been generated by an event-sourced application.

View test case

Let’s begin by writing a test. The EventCountersViewTestCase shown below expresses requirements for recording and revealing the number of events generated by an event-sourced application.

It begins by constructing an “event counters view” object. The view object is expected to be a TrackingRecorder, and therefore to have a max_tracking_id() method. This method will be called when starting or resuming the processing of events, so that event-processing can run from the correct position in an application sequence.

The test continues by calling query methods to get the counted number of “created” and subsequent events. The query methods are expected to return integers. The values returned by the query methods are expected to be equal to the number of times the corresponding command methods have been called.

The command methods are called to increment the counted number of events. The command methods are called with a unique tracking object. The value returned by max_tracking_id() is expected to reflect which tracking objects have been recorded by the command methods. The command methods are expected to raise an IntegrityError when called more than once with the same tracking object. The counted numbers are expected not to change when an IntegrityError is raised. The view’s wait() method is called with a position that has been recorded, expecting the method to return normally without raising an exception. The wait() method is called with a position that has not been recorded, expecting that a TimeoutError will be raised.

class EventCountersViewTestCase(TestCase):
    def construct_event_counters_view(self) -> EventCountersInterface:
        raise NotImplementedError

    def test(self) -> None:
        # Construct materialised view object.
        view = self.construct_event_counters_view()

        # Check the view object is a tracking recorder.
        self.assertIsInstance(view, TrackingRecorder)

        # Check the view has processed no events.
        self.assertIsNone(view.max_tracking_id("upstream"))

        # Check the event counters are zero.
        self.assertEqual(view.get_created_event_counter(), 0)
        self.assertEqual(view.get_subsequent_event_counter(), 0)

        # Increment the "created" event counter.
        view.incr_created_event_counter(Tracking("upstream", 1))

        # Check the counted number of events.
        self.assertEqual(view.get_created_event_counter(), 1)
        self.assertEqual(view.get_subsequent_event_counter(), 0)

        # Increment the subsequent event counter.
        view.incr_subsequent_event_counter(Tracking("upstream", 2))

        # Check the counted number of events.
        self.assertEqual(view.get_created_event_counter(), 1)
        self.assertEqual(view.get_subsequent_event_counter(), 1)

        # Increment the subsequent event counter again.
        view.incr_subsequent_event_counter(Tracking("upstream", 3))

        # Check the counted number of events.
        self.assertEqual(view.get_created_event_counter(), 1)
        self.assertEqual(view.get_subsequent_event_counter(), 2)

        # Check the tracking objects have been recorded.
        self.assertEqual(view.max_tracking_id("upstream"), 3)

        # Check the tracking objects are recorded uniquely and atomically.
        with self.assertRaises(IntegrityError):
            view.incr_created_event_counter(Tracking("upstream", 3))

        # Check the counted number of events.
        self.assertEqual(view.get_created_event_counter(), 1)

        with self.assertRaises(IntegrityError):
            view.incr_subsequent_event_counter(Tracking("upstream", 3))

        # Check the counted number of events.
        self.assertEqual(view.get_subsequent_event_counter(), 2)

        # Check the wait() method returns normally when
        # we wait for a position that has been recorded.
        view.wait("upstream", 3)

        # Check the wait() method raises a TimeoutError when
        # we wait for a postion that has not been recorded.
        with self.assertRaises(TimeoutError):
            view.wait("upstream", 4, timeout=0.5)

View interface

Towards satisfying this test, the EventCountersInterface class shown below extends the library’s abstract base class TrackingRecorder by defining abstract command and query methods with the expected method names and method signatures.

class EventCountersInterface(TrackingRecorder):
    @abstractmethod
    def get_created_event_counter(self) -> int:
        pass

    @abstractmethod
    def get_subsequent_event_counter(self) -> int:
        pass

    @abstractmethod
    def incr_created_event_counter(self, tracking: Tracking) -> None:
        pass

    @abstractmethod
    def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
        pass

The query methods get_created_event_counter() and get_subsequent_event_counter() return integers and have no arguments.

The command methods incr_created_event_counter() and incr_subsequent_event_counter() have a tracking argument expected to be an instance of the library’s Tracking class and return nothing.

In-memory view

We can implement the interface so that the counted numbers of events are held in memory. The POPOEventCounters class, shown below, implements the abstract interface using Python objects to hold the counted numbers of events in memory. It defines “private” attributes _created_event_counter and _subsequent_event_counter. The values of these attributes are Python int objects, initialised to be zero. The values of these attributes are returned by the query methods, and incremented by the command methods.

class POPOEventCounters(POPOTrackingRecorder, EventCountersInterface):
    def __init__(self) -> None:
        super().__init__()
        self._created_event_counter = 0
        self._subsequent_event_counter = 0

    def get_created_event_counter(self) -> int:
        return self._created_event_counter

    def get_subsequent_event_counter(self) -> int:
        return self._subsequent_event_counter

    def incr_created_event_counter(self, tracking: Tracking) -> None:
        with self._database_lock:
            self._assert_tracking_uniqueness(tracking)
            self._insert_tracking(tracking)
            self._created_event_counter += 1

    def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
        with self._database_lock:
            self._assert_tracking_uniqueness(tracking)
            self._insert_tracking(tracking)
            self._subsequent_event_counter += 1

It inherits and extends the POPOTrackingRecorder class, using its database lock to serialise commands, so that the data integrity of the counters is preserved. It uses the “private” methods _assert_tracking_uniqueness() and _insert_tracking() to avoid any event being counted more than once, whilst keeping track of which events have been processed so that event-processing can be resumed correctly.

The test case TestPOPOEventCounters will show whether the POPOEventCounters class satisfies our requirements. Running this test shows that it does.

class TestPOPOEventCounters(EventCountersViewTestCase):
    def construct_event_counters_view(self) -> EventCountersInterface:
        return POPOEventCounters()

PostgreSQL view

Similarly, we can implement the interface to store the counted numbers of events in PostgreSQL. The PostgresEventCounters class, shown below, implements EventCountersInterface by defining a database table for event counters with SQL statements that select and increment event counters.

class PostgresEventCounters(PostgresTrackingRecorder, EventCountersInterface):
    _created_event_counter_name = "CREATED_EVENTS"
    _subsequent_event_counter_name = "SUBSEQUENT_EVENTS"

    def __init__(
        self,
        datastore: PostgresDatastore,
        **kwargs: Any,
    ):
        super().__init__(datastore, **kwargs)
        assert self.tracking_table_name.endswith("_tracking")  # Because we replace it.
        self.counters_table_name = self.tracking_table_name.replace("_tracking", "")
        self.check_table_name_length(self.counters_table_name)
        self.create_table_statements.append(
            SQL(
                "CREATE TABLE IF NOT EXISTS {0}.{1} ("
                "counter_name text, "
                "counter bigint, "
                "PRIMARY KEY "
                "(counter_name))"
            ).format(
                Identifier(self.datastore.schema),
                Identifier(self.counters_table_name),
            )
        )

        self.select_counter_statement = SQL(
            "SELECT counter FROM {0}.{1} WHERE counter_name=%s"
        ).format(
            Identifier(self.datastore.schema),
            Identifier(self.counters_table_name),
        )

        self.incr_counter_statement = SQL(
            "INSERT INTO {0}.{1} VALUES (%s, 1) "
            "ON CONFLICT (counter_name) DO UPDATE "
            "SET counter = {0}.{1}.counter + 1"
        ).format(
            Identifier(self.datastore.schema),
            Identifier(self.counters_table_name),
        )

    def get_created_event_counter(self) -> int:
        return self._select_counter(self._created_event_counter_name)

    def get_subsequent_event_counter(self) -> int:
        return self._select_counter(self._subsequent_event_counter_name)

    def incr_created_event_counter(self, tracking: Tracking) -> None:
        self._incr_counter(self._created_event_counter_name, tracking)

    def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
        self._incr_counter(self._subsequent_event_counter_name, tracking)

    def _select_counter(self, name: str) -> int:
        with self.datastore.transaction(commit=False) as curs:
            curs.execute(
                query=self.select_counter_statement,
                params=(name,),
                prepare=True,
            )
            fetchone = curs.fetchone()
            return fetchone["counter"] if fetchone else 0

    def _incr_counter(self, name: str, tracking: Tracking) -> None:
        with self.datastore.transaction(commit=True) as curs:
            self._insert_tracking(curs, tracking)
            curs.execute(
                query=self.incr_counter_statement,
                params=(name,),
                prepare=True,
            )

The _select_counter() method is used to select the current value of an event counter. The _incr_counter() method is used to increment a named event counter atomically in the same database transaction as a tracking object is recorded.

It inherits and extends the PostgresTrackingRecorder class, using the transaction() method of its datastore so that the data integrity of the counters is preserved.

It uses the “private” _insert_tracking() method of PostgresTrackingRecorder to avoid any event being counted more than once, whilst keeping track of which events have been processed so that event-processing can be resumed correctly.

The test case TestPostgresEventCounters will show whether the PostgresEventCounters class satisfies our requirements. Running this test shows that it does.

class TestPostgresEventCounters(EventCountersViewTestCase):
    def setUp(self) -> None:
        self.factory = cast(
            PostgresFactory,
            InfrastructureFactory.construct(
                env=Environment(
                    name="eventcounters",
                    env={
                        "PERSISTENCE_MODULE": "eventsourcing.postgres",
                        "POSTGRES_DBNAME": "eventsourcing",
                        "POSTGRES_HOST": "127.0.0.1",
                        "POSTGRES_PORT": "5432",
                        "POSTGRES_USER": "eventsourcing",
                        "POSTGRES_PASSWORD": "eventsourcing",
                    },
                )
            ),
        )

    def construct_event_counters_view(self) -> EventCountersInterface:
        return self.factory.tracking_recorder(PostgresEventCounters)

    def tearDown(self) -> None:
        drop_tables()

This test uses the library’s InfrastructureFactory to construct the materialised view object, which is configured using environment variables. This powerful technique for constructing materialised view objects is used internally by the library’s projection runner class, which is described at the end of this part of the tutorial.

Projections

Let’s now consider how the events of an event-sourced application can be processed.

The library’s generic abstract base class Projection can be used to define how the domain events of an event-sourced application will be processed. It is intended to be subclassed by users.

class Projection(ABC, Generic[TTrackingRecorder]):
    name: str = ""
    """
    Name of projection, used to pick prefixed environment
    variables and define database table names.
    """
    topics: tuple[str, ...] = ()
    """
    Event topics, used to filter events in database when subscribing to an application.
    """

    def __init_subclass__(cls, **kwargs: Any) -> None:
        if "name" not in cls.__dict__:
            cls.name = cls.__name__

    def __init__(
        self,
        view: TTrackingRecorder,
    ):
        """Initialises the view property with the given view argument."""
        self._view = view

    @property
    def view(self) -> TTrackingRecorder:
        """Materialised view of an event-sourced application."""
        return self._view

    @singledispatchmethod
    @abstractmethod
    def process_event(
        self, domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking
    ) -> None:
        """Process a domain event and track it."""

The Projection class is a generic class because it has one type argument, expected to be the interface of a materialised view, a subclass of TrackingRecorder. The type argument should be specified by users when defining a subclass of Projection, to inform IDE code insight, command completion, and static type checking.

The Projection class has one required constructor argument, view, and one property also called view. The constructor argument is used to initialise the property. The annotated type of both is bound to the type argument of Projection. The given value of the constructor argument, and therefore the value of the property, should be a concrete instance of a materialised view.

The Projection class is an abstract class because it defines an abstract method process_event() that must be implemented by subclasses. Events will typically be processed by calling methods on view.

The Projection class has a name attribute, which can be set on subclasses. It is expected to be a Python str. It is used by projection runners, when constructing a materialised view object, to select prefixed environment variables, and in some cases to specify database table names used by the materialised view. The value used by the projection runner defaults to the class name of the subclass.

The Projection class has a topics attribute, which can be set on subclasses. It is expected to be a Python tuple of str objects, that contains domain events topics. Unless empty or undefined, projection runners will use these topics when subscribing to an application, so that events can be filtered in the application database. The subscription will then only yield events that have topics mentioned by this attribute. In many cases this will improve performance when running a projection, by avoiding the cost of transporting and reconstructing events that will be ignored by the projection. In some cases, filtering events by topic in this way will be necessary to avoid errors caused by attempting to reconstruct domain event objects that have been recorded in the database which either your code or the library code is not capable of reconstructing.

Example projection

For example, the EventCountersProjection class, shown below, processes events of an event-sourced application by calling methods of a concrete instance of EventCountersInterface. It inherits the Projection class, setting the type argument as EventCountersInterface class. It sets the name attribute as 'eventcounters'. It sets the topics attribute to mention the topics of the domain events processed by this projection.

The EventCountersProjection class implements the process_event() by calling the incr_created_event_counter() and incr_subsequent_event_counter() methods of EventCountersInterface available on view.

If the given event is an Aggregate.Created event, then incr_created_event_counter() is called. Alternatively, incr_subsequent_event_counter() is called if the given event is an Aggregate.Event. An exception is raised if the given event is a SpannerThrown event, to demonstrate error handling in process runners. Other kinds of events can be handled by calling insert_tracking() so that progress along an application sequence can be recorded, for example if large gaps are spanned by an application subscription returning checkpoints.

class EventCountersProjection(Projection[EventCountersInterface]):
    name = "eventcounters"
    topics: tuple[str, ...] = (
        get_topic(Aggregate.Created),
        get_topic(Aggregate.Event),
        get_topic(SpannerThrown),
    )

    @singledispatchmethod
    def process_event(
        self, _: DomainEventProtocol[TAggregateID], tracking: Tracking
    ) -> None:
        self.view.insert_tracking(tracking)

    @process_event.register
    def aggregate_created(self, event: Aggregate.Created, tracking: Tracking) -> None:
        self.view.incr_created_event_counter(tracking)

    @process_event.register
    def aggregate_event(self, _: Aggregate.Event, tracking: Tracking) -> None:
        self.view.incr_subsequent_event_counter(tracking)

    @process_event.register
    def spanner_thrown(self, _: SpannerThrown, __: Tracking) -> None:
        msg = "This is a deliberate bug"
        raise SpannerThrownError(msg)

Runners

Let’s consider how to run the projection, so events of an event-sourced application can be counted.

The library’s ProjectionRunner class is provided for the purpose of running projections. A projection runner can be constructed with an application class, a projection class, a materialised view class, and an optional environment Mapping object that specifies run-time configuration of the event-sourced application and the materialised view.

The projection runner will construct an instance of the given application class, and an instance of the given projection class with an instance of the given materialised view class. Any items in the given environment will be used to override any operating system environment variables, and the resulting environment variables will be used when the application object is constructed and when the materialised view class is constructed. The names of the application and of the projection are used to select prefixed environment variables before defaulting to unprefixed names. In this way, the event-sourced application can be configured separately from the materialised view.

It will subscribe to the application, from the position indicated by the max_tracking_id() method of the projection’s materialised view, and then call the process_event() method of the projection for each domain event yielded by the application subscription.

Because the projection runner starts a subscription to the application, it will first catch up by processing already recorded events that have not yet been processed, and then it will continue to process events that are subsequently recorded in the application’s database.

The ProjectionRunner class has a run_forever() method, which blocks until an optional timeout, or until an exception is raised by the projection or by the subscription, or until the projection runner is stopped by calling its stop() method. This allows an event processing component to be started and run independently as a separate operating system process for a controllable period of time, and then to terminate in a controlled way when there is an error, or when it is interrupted. Exceptions raised whilst running the projection will be re-raised by the run_forever() method. Operators of the system can examine any errors and resume processing by reconstructing the runner. Some types of errors may be transient operational errors, such as database connectivity, in which case the processing could be resumed automatically. Some errors may be programming errors, and will require manual intervention before the event processing can continue.

The wait() method of materialised view objects can be used to wait until an event has been processed by the projection before calling a query method on the materialised view.

Counting events in memory

For example, the TestEventCountersProjection class, shown below, tests the EventCountersProjection projection class with the POPOEventCounters class.

class TestEventCountersProjection(TestCase, ABC):
    view_class: type[EventCountersInterface] = POPOEventCounters
    env: ClassVar[dict[str, str]] = {}

    def test_event_counters_projection(self) -> None:
        # Construct runner with application, projection, and recorder.
        with ProjectionRunner(
            application_class=Application[UUID],
            projection_class=EventCountersProjection,
            view_class=self.view_class,
            env=self.env,
        ) as runner:

            # Get "read" and "write" model instances from the runner.
            write_model = runner.app
            read_model = runner.projection.view

            # Write some events.
            aggregate = Aggregate()
            aggregate.trigger_event(event_class=Aggregate.Event)
            aggregate.trigger_event(event_class=Aggregate.Event)
            recordings = write_model.save(aggregate)

            # Wait for the events to be processed.
            read_model.wait(
                application_name=write_model.name,
                notification_id=recordings[-1].notification.id,
            )

            # Query the read model.
            self.assertEqual(read_model.get_created_event_counter(), 1)
            self.assertEqual(read_model.get_subsequent_event_counter(), 2)

            # Write some more events.
            aggregate = Aggregate()
            aggregate.trigger_event(event_class=Aggregate.Event)
            aggregate.trigger_event(event_class=Aggregate.Event)
            recordings = write_model.save(aggregate)

            # Wait for the events to be processed.
            read_model.wait(
                application_name=write_model.name,
                notification_id=recordings[-1].notification.id,
            )

            # Query the read model.
            self.assertEqual(read_model.get_created_event_counter(), 2)
            self.assertEqual(read_model.get_subsequent_event_counter(), 4)

    def test_run_forever_raises_projection_error(self) -> None:
        # Construct runner with application, projection, and recorder.
        with ProjectionRunner(
            application_class=Application[UUID],
            projection_class=EventCountersProjection,
            view_class=self.view_class,
            env=self.env,
        ) as runner:
            write_model = runner.app
            read_model = runner.projection.view

            # Write some events.
            aggregate = Aggregate()
            aggregate.trigger_event(event_class=SpannerThrown)
            recordings = write_model.save(aggregate)

            # Projection runner terminates with projection error.
            with self.assertRaises(SpannerThrownError):
                runner.run_forever()

            # Wait times out (event has not been processed).
            with self.assertRaises(TimeoutError):
                read_model.wait(
                    application_name=write_model.name,
                    notification_id=recordings[-1].notification.id,
                )

The method test_event_counters_projection() constructs a runner, and from the runner gets references to an event-sourced application “write model” and a materialised view “read model”.

In this case, because we are generating and counting events in memory, we need to use the same instance of the application object and of the materialised view object as the projection runner for generating events and for getting the counted numbers of events.

Events are generated in the event-sourced application “write model”. The “created” and subsequent events are processed, by updating the materialised view “read model”, according to the logic of the EventCountersProjection projection. The counted numbers of each kind of event are obtained from the “read model”. The materialised view is “eventually consistent” because the event processing is asynchronous, and so the wait() method is used to wait for the events to be processed before querying the materialised view.

The method test_run_forever_raises_projection_error() shows that after a SpannerThrown event is generated by the event-sourced application, an error occurs with the event-processing, and an exception representing this error is raised by the runner’s run_forever() method. Because the event will not successfully be processed, the materialised view’s wait() method will time out.

Counting events in PostgreSQL

The TestEventCountersProjectionWithPostgres class extends TestEventCountersProjection and runs EventCountersProjection with the PostgresEventCounters class. Because this test case uses a durable database for both the event-sourced application and the materialised view, any instance of the application can be used to write events, and any instance of the materialised view can be used to obtain the counted numbers of events. If a durable database is used in production, the event-sourced application object and the materialised view object could be horizontally scaled, with the event-processing component running as a separate operating system process. This is demonstrated by extending the test methods: to resume processing by reconstructing the projection runner, to construct separate instances of the event-sourced application and of the materialised view, to continue generating events using the separate instance of the event-sourced application, to query the counted numbers of events using the separate instance of the materialised view, to expect the obtained counted numbers of events correctly report the total numbers of events that have been generated, and to show that the event-processing error is still raised and that the event-processing has not continued after the error caused by the SpannerThrown event. The event-sourced application and the materialised view could also use separate databases. But in this example they are configured more simply to use different tables in the same database.

class TestEventCountersProjectionWithPostgres(TestEventCountersProjection):
    view_class = PostgresEventCounters
    env: ClassVar[dict[str, str]] = {
        "APPLICATION_PERSISTENCE_MODULE": "eventsourcing.postgres",
        "APPLICATION_POSTGRES_DBNAME": "eventsourcing",
        "APPLICATION_POSTGRES_HOST": "127.0.0.1",
        "APPLICATION_POSTGRES_PORT": "5432",
        "APPLICATION_POSTGRES_USER": "eventsourcing",
        "APPLICATION_POSTGRES_PASSWORD": "eventsourcing",
        "EVENTCOUNTERS_PERSISTENCE_MODULE": "eventsourcing.postgres",
        "EVENTCOUNTERS_POSTGRES_DBNAME": "eventsourcing",
        "EVENTCOUNTERS_POSTGRES_HOST": "127.0.0.1",
        "EVENTCOUNTERS_POSTGRES_PORT": "5432",
        "EVENTCOUNTERS_POSTGRES_USER": "eventsourcing",
        "EVENTCOUNTERS_POSTGRES_PASSWORD": "eventsourcing",
    }

    def test_event_counters_projection(self) -> None:
        super().test_event_counters_projection()

        # Resume....
        with ProjectionRunner(
            application_class=Application[UUID],
            projection_class=EventCountersProjection,
            view_class=self.view_class,
            env=self.env,
        ):

            # Construct separate instance of "write model".
            write_model = Application[UUID](self.env)

            # Construct separate instance of "read model".
            read_model = (
                InfrastructureFactory[EventCountersInterface]
                .construct(
                    env=Environment(name=EventCountersProjection.name, env=self.env)
                )
                .tracking_recorder(self.view_class)
            )

            # Write some events.
            aggregate = Aggregate()
            aggregate.trigger_event(event_class=Aggregate.Event)
            aggregate.trigger_event(event_class=Aggregate.Event)
            recordings = write_model.save(aggregate)

            # Wait for events to be processed.
            read_model.wait(
                application_name=write_model.name,
                notification_id=recordings[-1].notification.id,
            )

            # Query the read model.
            self.assertEqual(read_model.get_created_event_counter(), 3)
            self.assertEqual(read_model.get_subsequent_event_counter(), 6)

            # Write some more events.
            aggregate = Aggregate()
            aggregate.trigger_event(event_class=Aggregate.Event)
            aggregate.trigger_event(event_class=Aggregate.Event)
            recordings = write_model.save(aggregate)

            # Wait for events to be processed.
            read_model.wait(
                application_name=write_model.name,
                notification_id=recordings[-1].notification.id,
            )

            # Query the read model.
            self.assertEqual(read_model.get_created_event_counter(), 4)
            self.assertEqual(read_model.get_subsequent_event_counter(), 8)

    def test_run_forever_raises_projection_error(self) -> None:
        super().test_run_forever_raises_projection_error()

        # Resume...
        with ProjectionRunner(
            application_class=Application[UUID],
            projection_class=EventCountersProjection,
            view_class=self.view_class,
            env=self.env,
        ) as runner:

            # Construct separate instance of "write model".
            write_model = Application[UUID](self.env)

            # Construct separate instance of "read model".
            read_model = InfrastructureFactory.construct(
                env=Environment(name=EventCountersProjection.name, env=self.env)
            ).tracking_recorder(self.view_class)

            # Still terminates with projection error.
            with self.assertRaises(SpannerThrownError):
                runner.run_forever()

            # Wait times out (event has not been processed).
            with self.assertRaises(TimeoutError):
                read_model.wait(
                    application_name=write_model.name,
                    notification_id=write_model.recorder.max_notification_id(),
                )

    def setUp(self) -> None:
        drop_tables()
        super().setUp()

    def tearDown(self) -> None:
        super().tearDown()
        drop_tables()

Exercises

  1. Replicate the code in this tutorial in your development environment.

  2. Develop and run a projection that counts dogs and tricks from a DogSchool application.

  3. Develop and run an event-sourced projection that counts dogs and tricks from a DogSchool application.

Next steps