Tutorial - Part 4 - Projections

This part of the tutorial shows how the state of an event-sourced application can be projected into a materialised view that supports arbitrary queries.

As we saw in Part 3, we can use the library’s Application class to define an event-sourced application. The state of an event-sourced application is recorded as a set of events, each of which is positioned in one of many “aggregate sequences” and also in the overall “application sequence”.

We can reconstruct an aggregate by selecting events from an aggregate sequence, and we can propagate the state of an application by following its application sequence. But we can only query the current state of an event-sourced application in other ways by firstly projecting the application state into a “read model” that is designed to support such queries.

So that such queries can be performed quickly, it is useful to project the state of an event-sourced application into a read model that is stored in a database. Such “materialised views” are usually prepared and updated by processing the events of the event-sourced application in a separate event-processing component, which subscribes to the application sequence so that it can catch-up and then continue as further events are recorded.

In the sections below we will explore how the events of event-sourced applications can be processed into materialised views that support queries that are not directly supported by event-sourced applications.

View classes

Materialised views support queries of the state of an event-sourced application. Therefore, every materialised view will need to have at least one query method. Materialised views will also need to be updated when events are processed, and so a materialised view will also need to have at least one command method.

A materialised view is typically expected to be a reliable deterministic projection of the state of an event-sourced application. To achieve this, we need to record the position of each processed event uniquely, and record it atomically along with the updating of the materialised view. This ensures the results of processing an event are never successfully recorded more than once. And so that event processing can be resumed correctly, we also need to get the position of the last successfully processed event.

Because of the arbitrary nature of materialised views, the recording and reporting of the positions of processed events are the only common aspects shared by all materialised views. In this sense, all materialised views are special kinds of tracking recorders.

And so we can define an interface for a materialised view using the library’s abstract TrackingRecorder class. The interface will define how the materialised view will be queried and how events will be processed, independently of any concrete implementation.

We can then develop concrete implementations of this interface by also extending the library’s concrete tracking recorder classes: POPOTrackingRecorder, SQLiteTrackingRecorder and PostgresTrackingRecorder. The concrete tracking recorders help with recording the positions of processed events uniquely and atomically in a particular database.

To show how this can work, let’s create a simple materialised view, for counting events, that can record and reveal the number of “created” events and the number of subsequent events that have been generated by an event-sourced application.

View test

Let’s begin by writing a test. The EventCountersViewTestCase shown below expresses requirements for a view object that can record and reveal the number of events generated by an event-sourced application.

The test expects this view object:

  • to be an instance of TrackingRecorder, and thereby to have a max_tracking_id() method. This will support resuming to process events from the correct position.

  • to have command and query methods for incrementing and obtaining the recorded number of events. The values returned by the query methods are expected to be equal to the number of times the corresponding command methods have been called.

  • to have command methods with a tracking object parameter. The value returned by max_tracking_id() is expected to reflect which tracking objects have been recorded by the command methods. The command methods are expected to raise an IntegrityError when called more than once with the same tracking object. The counted numbers are expected not to change when an IntegrityError is raised by a command method.

  • to have a wait() method that returns normally without raising an exception when called with a position that has been recorded. A TimeoutError is expected to be raised when wait() is called with a position greater than those which have been recorded.

The test begins by constructing an instance of EventCountersView — to be defined below.

class EventCountersViewTestCase(TestCase):
    def construct_event_counters_view(self) -> EventCountersView:
        raise NotImplementedError

    def test(self) -> None:
        # Construct materialised view object.
        view = self.construct_event_counters_view()

        # Check the view object is a tracking recorder.
        self.assertIsInstance(view, TrackingRecorder)

        # Check the view has processed no events.
        self.assertIsNone(view.max_tracking_id("upstream"))

        # Check the event counters are zero.
        self.assertEqual(view.get_created_event_counter(), 0)
        self.assertEqual(view.get_subsequent_event_counter(), 0)

        # Increment the "created" event counter.
        view.incr_created_event_counter(Tracking("upstream", 1))

        # Check the counted number of events.
        self.assertEqual(view.get_created_event_counter(), 1)
        self.assertEqual(view.get_subsequent_event_counter(), 0)

        # Increment the subsequent event counter.
        view.incr_subsequent_event_counter(Tracking("upstream", 2))

        # Check the counted number of events.
        self.assertEqual(view.get_created_event_counter(), 1)
        self.assertEqual(view.get_subsequent_event_counter(), 1)

        # Increment the subsequent event counter again.
        view.incr_subsequent_event_counter(Tracking("upstream", 3))

        # Check the counted number of events.
        self.assertEqual(view.get_created_event_counter(), 1)
        self.assertEqual(view.get_subsequent_event_counter(), 2)

        # Check the tracking objects have been recorded.
        self.assertEqual(view.max_tracking_id("upstream"), 3)

        # Check the tracking objects are recorded uniquely and atomically.
        with self.assertRaises(IntegrityError):
            view.incr_created_event_counter(Tracking("upstream", 3))

        # Check the counted number of events.
        self.assertEqual(view.get_created_event_counter(), 1)

        with self.assertRaises(IntegrityError):
            view.incr_subsequent_event_counter(Tracking("upstream", 3))

        # Check the counted number of events.
        self.assertEqual(view.get_subsequent_event_counter(), 2)

        # Check the wait() method returns normally when
        # we wait for a position that has been recorded.
        view.wait("upstream", 3)

        # Check the wait() method raises a TimeoutError when
        # we wait for a postion that has not been recorded.
        with self.assertRaises(TimeoutError):
            view.wait("upstream", 4, timeout=0.5)

View example

Towards satisfying this test, let’s define EventCountersView. We can extend the library’s abstract base class TrackingRecorder and define abstract command and query methods with names and signatures expected by the test above:

  • the command methods incr_created_event_counter() and incr_subsequent_event_counter() have a tracking parameter, typed with the library’s Tracking class.

  • the query methods get_created_event_counter() and get_subsequent_event_counter() return integers.

class EventCountersView(TrackingRecorder):
    @abstractmethod
    def get_created_event_counter(self) -> int:
        pass

    @abstractmethod
    def get_subsequent_event_counter(self) -> int:
        pass

    @abstractmethod
    def incr_created_event_counter(self, tracking: Tracking) -> None:
        pass

    @abstractmethod
    def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
        pass

We will implement this interface twice: once to create a materialised view stored in memory, and then again to create a materialised view stored in Postgres.

View in memory

The POPOEventCounters class implements EventCountersView using Python objects to hold the counted numbers of events. It defines “private” attributes _created_event_counter and _subsequent_event_counter as Python int objects initialised to zero. The values of these attributes are incremented by the command methods and returned by the query methods.

It inherits the POPOTrackingRecorder class, using its database lock to serialise commands, and the inherited methods _assert_tracking_uniqueness() and _insert_tracking() to avoid events being counted more than once.

class POPOEventCounters(POPOTrackingRecorder, EventCountersView):
    def __init__(self) -> None:
        super().__init__()
        self._created_event_counter = 0
        self._subsequent_event_counter = 0

    def get_created_event_counter(self) -> int:
        return self._created_event_counter

    def get_subsequent_event_counter(self) -> int:
        return self._subsequent_event_counter

    def incr_created_event_counter(self, tracking: Tracking) -> None:
        with self._database_lock:
            self._assert_tracking_uniqueness(tracking)
            self._insert_tracking(tracking)
            self._created_event_counter += 1

    def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
        with self._database_lock:
            self._assert_tracking_uniqueness(tracking)
            self._insert_tracking(tracking)
            self._subsequent_event_counter += 1

The test case TestPOPOEventCounters extends will show whether the POPOEventCounters class satisfies our requirements.

class TestPOPOEventCounters(EventCountersViewTestCase):
    def construct_event_counters_view(self) -> EventCountersView:
        return POPOEventCounters()

View in Postgres

The PostgresEventCounters class also implements EventCountersView. It defines a database table, and SQL statements for incrementing and selecting event counters. The _select_counter() method is used to select the current value of an event counter. The _incr_counter() method is used to increment a named event counter atomically in the same database transaction as a tracking object is recorded.

It extends the PostgresTrackingRecorder class, using the transaction() method to implement atomic transactions, and the inherited method _insert_tracking() to avoid events being counted more than once.

class PostgresEventCounters(PostgresTrackingRecorder, EventCountersView):
    _created_event_counter_name = "CREATED_EVENTS"
    _subsequent_event_counter_name = "SUBSEQUENT_EVENTS"

    def __init__(
        self,
        datastore: PostgresDatastore,
        **kwargs: Any,
    ):
        super().__init__(datastore, **kwargs)
        assert self.tracking_table_name.endswith("_tracking")  # Because we replace it.
        self.counters_table_name = self.tracking_table_name.replace("_tracking", "")
        self.check_identifier_length(self.counters_table_name)
        self.sql_create_statements.append(
            SQL(
                "CREATE TABLE IF NOT EXISTS {0}.{1} ("
                "counter_name text, "
                "counter bigint, "
                "PRIMARY KEY "
                "(counter_name))"
            ).format(
                Identifier(self.datastore.schema),
                Identifier(self.counters_table_name),
            )
        )

        self.select_counter_statement = SQL(
            "SELECT counter FROM {0}.{1} WHERE counter_name=%s"
        ).format(
            Identifier(self.datastore.schema),
            Identifier(self.counters_table_name),
        )

        self.incr_counter_statement = SQL(
            "INSERT INTO {0}.{1} VALUES (%s, 1) "
            "ON CONFLICT (counter_name) DO UPDATE "
            "SET counter = {0}.{1}.counter + 1"
        ).format(
            Identifier(self.datastore.schema),
            Identifier(self.counters_table_name),
        )

    def get_created_event_counter(self) -> int:
        return self._select_counter(self._created_event_counter_name)

    def get_subsequent_event_counter(self) -> int:
        return self._select_counter(self._subsequent_event_counter_name)

    def incr_created_event_counter(self, tracking: Tracking) -> None:
        self._incr_counter(self._created_event_counter_name, tracking)

    def incr_subsequent_event_counter(self, tracking: Tracking) -> None:
        self._incr_counter(self._subsequent_event_counter_name, tracking)

    def _select_counter(self, name: str) -> int:
        with self.datastore.transaction(commit=False) as curs:
            curs.execute(
                query=self.select_counter_statement,
                params=(name,),
                prepare=True,
            )
            fetchone = curs.fetchone()
            return fetchone["counter"] if fetchone else 0

    def _incr_counter(self, name: str, tracking: Tracking) -> None:
        with self.datastore.transaction(commit=True) as curs:
            self._insert_tracking(curs, tracking)
            curs.execute(
                query=self.incr_counter_statement,
                params=(name,),
                prepare=True,
            )

The test case TestPostgresEventCounters will show whether the PostgresEventCounters class satisfies our requirements. This test uses the library’s InfrastructureFactory to construct the materialised view object.

class TestPostgresEventCounters(EventCountersViewTestCase):
    expected_factory_topic = "eventsourcing.postgres:PostgresFactory"
    env: ClassVar[dict[str, str]] = {
        "PERSISTENCE_MODULE": "eventsourcing.postgres",
        "POSTGRES_DBNAME": "eventsourcing",
        "POSTGRES_HOST": "127.0.0.1",
        "POSTGRES_PORT": "5432",
        "POSTGRES_USER": "eventsourcing",
        "POSTGRES_PASSWORD": "eventsourcing",
        "POSTGRES_SCHEMA": "public",
    }

    def setUp(self) -> None:
        self.factory = InfrastructureFactory[EventCountersView].construct(self.env)

    def tearDown(self) -> None:
        self.factory.close()
        drop_tables()

    def construct_event_counters_view(self) -> EventCountersView:
        return self.factory.tracking_recorder(PostgresEventCounters)

Projections

Let’s now consider how the events of an event-sourced application can be processed.

The library’s generic abstract base class Projection can be used to define how the domain events of an event-sourced application will be processed. It is intended to be subclassed by users.

The source code of the Projection class is included here for clarity.

class Projection(ABC, Generic[TTrackingRecorder]):
    name: str = ""
    """
    Name of projection, used to pick prefixed environment
    variables and define database table names.
    """
    topics: tuple[str, ...] = ()
    """
    Event topics, used to filter events in database when subscribing to an application.
    """

    def __init_subclass__(cls, **kwargs: Any) -> None:
        if "name" not in cls.__dict__:
            cls.name = cls.__name__

    def __init__(
        self,
        view: TTrackingRecorder,
    ):
        """Initialises the view property with the given view argument."""
        self._view = view

    @property
    def view(self) -> TTrackingRecorder:
        """Materialised view of an event-sourced application."""
        return self._view

    @singledispatchmethod
    @abstractmethod
    def process_event(self, domain_event: Any, tracking: Tracking) -> None:
        """Process a domain event and track it."""

The Projection class is a generic class because it has a type parameter, which is expected to be the interface type of a materialised view, with an upper bound for the type argument being TrackingRecorder.

The constructor argument view is used to initialise the view property. The annotated type of the constructor argument and the property is bound to the type parameter. The given value of the constructor argument, and therefore the value of the property, should be a concrete instance of a materialised view.

The Projection class is an abstract class because it defines an abstract method process_event() that must be implemented by subclasses. Events will typically be processed by calling methods on view.

To inform IDE code insight tooling, command completion, and static type checking, The type argument should be specified by users when defining a subclass of Projection.

The Projection class has a name attribute, which can be set on subclasses. It is expected to be a Python str. It is used by projection runners, when constructing a materialised view object, to select prefixed environment variables, and in some cases to specify database table names used by the materialised view. The value used by the projection runner defaults to the class name of the subclass.

The Projection class has a topics attribute, which can be set on subclasses. It is expected to be a Python tuple of str objects, that contains domain events topics. Unless empty or undefined, projection runners will use these topics when subscribing to an application, so that events can be filtered in the application database. The subscription will then only yield events that have topics mentioned by this attribute. In many cases this will improve performance when running a projection, by avoiding the cost of transporting and reconstructing events that will be ignored by the projection. In some cases, filtering events by topic in this way will be necessary to avoid errors caused by attempting to reconstruct domain event objects that have been recorded in the database which either your code or the library code is not capable of reconstructing.

Projection example

The AggregateEventCountersProjection class processes events of an event-sourced application.

It inherits the Projection class, the type argument is EventCountersView. It implements process_event() by calling incr_created_event_counter() for Aggregate.Created events and incr_subsequent_event_counter() for Aggregate.Event events. If a SpannerThrown event is being processed, then an exception is raised, to demonstrate error handling in process runners.

class AggregateEventCountersProjection(Projection[EventCountersView]):
    name = "eventcounters"
    topics: tuple[str, ...] = (
        get_topic(Aggregate.Created),
        get_topic(Aggregate.Event),
        get_topic(SpannerThrown),
    )

    @singledispatchmethod
    def process_event(self, _: Aggregate.Event, tracking: Tracking) -> None:
        self.view.insert_tracking(tracking)

    @process_event.register
    def aggregate_created(self, _: Aggregate.Created, tracking: Tracking) -> None:
        self.view.incr_created_event_counter(tracking)

    @process_event.register
    def aggregate_event(self, _: Aggregate.Event, tracking: Tracking) -> None:
        self.view.incr_subsequent_event_counter(tracking)

    @process_event.register
    def spanner_thrown(self, _: SpannerThrown, __: Tracking) -> None:
        msg = "This is a deliberate bug"
        raise SpannerThrownError(msg)

By default, other kinds of events will be processed by calling insert_tracking(), so that progress along an application sequence can be recorded. This can help bridge large gaps without risking repetitive pulling and processing of upstream events.

Projection runner

Let’s consider how to run the projection, so events of an event-sourced application can be counted.

As shown in the AggregateEventCountersProjectionTestCase class below, a projection runner can be constructed with an application class, a projection class, a materialised view class, and an optional environment Mapping object that specifies run-time configuration of the event-sourced application and the materialised view.

It will subscribe to the application, from the position indicated by the view’s max_tracking_id() method, and then call the process_event() method of the projection for each domain event yielded by the application subscription.

The projection runner has a run_forever() method, which blocks until an optional timeout, or until an exception is raised by the projection or by the subscription, or until the projection runner is stopped by calling its stop() method. This allows an event processing component to be started and run independently as a separate operating system process for a controllable period of time, and then to terminate in a controlled way when there is an error, or when it is interrupted.

Exceptions raised whilst running the projection will be re-raised by the run_forever() method. Operators of the system can examine any errors and resume processing by reconstructing the runner. Some types of errors may be transient operational errors, such as database connectivity, in which case the processing could be resumed automatically. Some errors may be programming errors, and will require manual intervention before the event processing can continue.

The wait() method of view objects can be used by user interface requests, to wait until an event has been processed by the projection before calling a query method on the materialised view. Separate instances of the view object can also be used to support such requests, if the view is backed by a durable database.

class AggregateEventCountersProjectionTestCase(TestCase, ABC):
    view_class: type[EventCountersView]
    env: ClassVar[dict[str, str]] = {}

    def test_event_counters_projection(self) -> None:
        # Construct runner with application, projection, and recorder.
        with ProjectionRunner(
            application_class=Application[UUID],
            projection_class=AggregateEventCountersProjection,
            view_class=self.view_class,
            env=self.env,
        ) as runner:

            # Get "read" and "write" model instances from the runner.
            write_model = runner.app
            read_model = runner.projection.view

            # Write some events.
            aggregate = Aggregate()
            aggregate.trigger_event(event_class=Aggregate.Event)
            aggregate.trigger_event(event_class=Aggregate.Event)
            recordings = write_model.save(aggregate)

            # Wait for the events to be processed.
            read_model.wait(
                application_name=write_model.name,
                notification_id=recordings[-1].notification.id,
            )

            # Query the read model.
            self.assertEqual(read_model.get_created_event_counter(), 1)
            self.assertEqual(read_model.get_subsequent_event_counter(), 2)

            # Write some more events.
            aggregate = Aggregate()
            aggregate.trigger_event(event_class=Aggregate.Event)
            aggregate.trigger_event(event_class=Aggregate.Event)
            recordings = write_model.save(aggregate)

            # Wait for the events to be processed.
            read_model.wait(
                application_name=write_model.name,
                notification_id=recordings[-1].notification.id,
            )

            # Query the read model.
            self.assertEqual(read_model.get_created_event_counter(), 2)
            self.assertEqual(read_model.get_subsequent_event_counter(), 4)

    def test_run_forever_raises_projection_error(self) -> None:
        # Construct runner with application, projection, and recorder.
        with ProjectionRunner(
            application_class=Application[UUID],
            projection_class=AggregateEventCountersProjection,
            view_class=self.view_class,
            env=self.env,
        ) as runner:
            write_model = runner.app
            read_model = runner.projection.view

            # Write some events.
            aggregate = Aggregate()
            aggregate.trigger_event(event_class=SpannerThrown)
            recordings = write_model.save(aggregate)

            # Projection runner terminates with projection error.
            with self.assertRaises(SpannerThrownError):
                runner.run_forever(timeout=5)

            # Wait times out (event has not been processed).
            with self.assertRaises(TimeoutError):
                read_model.wait(
                    application_name=write_model.name,
                    notification_id=recordings[-1].notification.id,
                )

The test method test_event_counters_projection() constructs a runner, and from the runner gets references to an event-sourced application “write model” and a materialised view “read model”.

Events are generated in the event-sourced application “write model”. These events are then processed, according to the logic of the AggregateEventCountersProjection, by updating the materialised event counters view “read model”.

The counted numbers of each kind of event can be are obtained from the “read model”. The materialised view is “eventually consistent” because the event processing is asynchronous, and so the wait() method is used to wait for the events to be processed before querying the materialised view. This makes the test deterministic.

The test method test_run_forever_raises_projection_error() shows that after a SpannerThrown event is generated by the event-sourced application, an error occurs with the event-processing, and an exception representing this error is raised by the runner’s run_forever() method. Because the event will not successfully be processed, the materialised view’s wait() method will time out.

Running projection in memory

The test case below runs the projection class with the POPOEventCounters view.

class TestAggregateEventCountersProjectionWithPOPO(
    AggregateEventCountersProjectionTestCase
):
    view_class: type[EventCountersView] = POPOEventCounters

When generating and counting events in memory, we need to use the projection runner’s application and view objects for generating events and getting counted numbers.

But, as we shall see, when using a durable database such as Postgres, the events can be generated using other instances of the application, and the materialised view can be queries using other instances of the view class.

Running projection in Postgres

The test case below runs the projection class with the PostgresEventCounters view.

Because this test case uses a durable database for both the event-sourced application and the materialised view, any instance of the application can be used to write events, and any instance of the materialised view can be used to obtain the counted numbers of events.

class TestAggregateEventCountersProjectionWithPostgres(
    AggregateEventCountersProjectionTestCase
):
    view_class = PostgresEventCounters
    env: ClassVar[dict[str, str]] = {
        "APPLICATION_PERSISTENCE_MODULE": "eventsourcing.postgres",
        "APPLICATION_POSTGRES_DBNAME": "eventsourcing",
        "APPLICATION_POSTGRES_HOST": "127.0.0.1",
        "APPLICATION_POSTGRES_PORT": "5432",
        "APPLICATION_POSTGRES_USER": "eventsourcing",
        "APPLICATION_POSTGRES_PASSWORD": "eventsourcing",
        "EVENTCOUNTERS_PERSISTENCE_MODULE": "eventsourcing.postgres",
        "EVENTCOUNTERS_POSTGRES_DBNAME": "eventsourcing",
        "EVENTCOUNTERS_POSTGRES_HOST": "127.0.0.1",
        "EVENTCOUNTERS_POSTGRES_PORT": "5432",
        "EVENTCOUNTERS_POSTGRES_USER": "eventsourcing",
        "EVENTCOUNTERS_POSTGRES_PASSWORD": "eventsourcing",
    }

    def setUp(self) -> None:
        drop_tables()
        super().setUp()

    def tearDown(self) -> None:
        super().tearDown()
        drop_tables()

    def test_event_counters_projection(self) -> None:
        super().test_event_counters_projection()

        # Resume....
        with ProjectionRunner(
            application_class=Application[UUID],
            projection_class=AggregateEventCountersProjection,
            view_class=self.view_class,
            env=self.env,
        ):

            # Construct separate instance of "write model".
            write_model = Application[UUID](self.env)

            # Construct separate instance of "read model".
            read_model = (
                InfrastructureFactory[EventCountersView]
                .construct(
                    env=Environment(
                        name=AggregateEventCountersProjection.name, env=self.env
                    )
                )
                .tracking_recorder(self.view_class)
            )

            # Write some events.
            aggregate = Aggregate()
            aggregate.trigger_event(event_class=Aggregate.Event)
            aggregate.trigger_event(event_class=Aggregate.Event)
            recordings = write_model.save(aggregate)

            # Wait for events to be processed.
            read_model.wait(
                application_name=write_model.name,
                notification_id=recordings[-1].notification.id,
            )

            # Query the read model.
            self.assertEqual(read_model.get_created_event_counter(), 3)
            self.assertEqual(read_model.get_subsequent_event_counter(), 6)

            # Write some more events.
            aggregate = Aggregate()
            aggregate.trigger_event(event_class=Aggregate.Event)
            aggregate.trigger_event(event_class=Aggregate.Event)
            recordings = write_model.save(aggregate)

            # Wait for events to be processed.
            read_model.wait(
                application_name=write_model.name,
                notification_id=recordings[-1].notification.id,
            )

            # Query the read model.
            self.assertEqual(read_model.get_created_event_counter(), 4)
            self.assertEqual(read_model.get_subsequent_event_counter(), 8)

    def test_run_forever_raises_projection_error(self) -> None:
        super().test_run_forever_raises_projection_error()

        # Resume...
        with ProjectionRunner(
            application_class=Application[UUID],
            projection_class=AggregateEventCountersProjection,
            view_class=self.view_class,
            env=self.env,
        ) as runner:

            # Construct separate instance of "write model".
            write_model = Application[UUID](self.env)

            # Construct separate instance of "read model".
            read_model = InfrastructureFactory.construct(
                env=Environment(
                    name=AggregateEventCountersProjection.name, env=self.env
                )
            ).tracking_recorder(self.view_class)

            # Still terminates with projection error.
            with self.assertRaises(SpannerThrownError):
                runner.run_forever()

            # Wait times out (event has not been processed).
            with self.assertRaises(TimeoutError):
                read_model.wait(
                    application_name=write_model.name,
                    notification_id=write_model.recorder.max_notification_id(),
                )

If a durable database is used in production, the event-sourced application object and the materialised view object could be horizontally scaled, with the event-processing component running as a separate operating system process.

This is demonstrated by extending the test methods: to resume processing by reconstructing the projection runner, to construct separate instances of the event-sourced application and of the materialised view, to continue generating events using the separate instance of the event-sourced application, to query the counted numbers of events using the separate instance of the materialised view, to expect the obtained counted numbers of events correctly report the total numbers of events that have been generated, and to show that the event-processing error is still raised and that the event-processing has not continued after the error caused by the SpannerThrown event.

The event-sourced application and the materialised view could also use separate databases. But in this example they are configured more simply to use different tables in the same database.

Exercises

  1. Replicate the code in this tutorial in your development environment.

  2. Develop and run a projection that counts dogs and tricks from a DogSchool application.

  3. Develop and run an event-sourced projection that counts dogs and tricks from a DogSchool application.

Next steps