Distributed systems

This section discusses how to make a reliable distributed system that is scalable and maintainable.

Overview

A design for distributed systems is introduced that uses event-sourced applications as building blocks. The earlier design of the event-sourced application is extended in the design of the “process application”. Process application classes can be composed into a set of pipeline expressions that together define a system pipeline. This definition of a system can be entirely independent of infrastructure. Such a system can be run in different ways with identical results.

Rather than seeking reliability in mathematics (e.g. CSP) or in physics (e.g. Actor model), the approach taken here is to seek reliable foundations in engineering empiricism, specifically in the empirical reliability of counting and of ACID database transactions.

Just as event sourcing “atomised” application state as a set of domain events, similarly the processing of domain events can be atomised as a potentially distributed set of local “process events” in which new domain events may occur. The subjective aim of a process event is “catching up on what happened”.

The processing of domain events is designed to be atomic and successive so that processing can progress in a determinate manner according to infrastructure availability. A provisional description of a design pattern for process events is included at the end of this section.

Process application

A process application is an event-sourced projection into an event-sourced application. One process application may “follow” another. A process application projects the state of the applications it follows into its own state. Being an event sourced application, the state of a process application can be obtained using a notification log reader to obtain the state as a sequence of domain events. The projection itself is defined by the application’s policy, which defines responses to domain events in terms of domain model operations, causing new domain events to be generated.

Process application diagram

The library has a process application class ProcessApplication, it functions as a projection into an event-sourced application. It extends SimpleApplication by having a notification log reader for each application it follows. It has an application policy which defines how to respond to the domain events it reads from the notification logs of the applications it follows. This process application class implements the process event pattern (notification tracking and new domain event data are stored together atomically).

Reliability

Reliability is the most important concern in this section. A process is considered to be reliable if its result is entirely unaffected (except in being delayed) by infrastructure failure such as network partitions or sudden termination of operating system processes. Infrastructure unreliability may cause processing delays, but disorderly environments shouldn’t (at least by default) cause disorderly processing results.

The only trick was remembering that production is determined in general by consumption with recording. In particular, if consumption and recording are reliable, then production is bound to be reliable. As shown below, the reliability of this library’s approach to event processing depends only on counting and the atomicity of database transactions, both of which are normally considered reliable.

Notification tracking

A process application consumes domain events by reading event notifications from its notification log readers. The domain events are retrieved in a reliable order, without race conditions or duplicates or missing items. Each event notification in a notification log has a unique integer ID, and the notification log IDs form a contiguous sequence (counting).

To keep track of its position in the notification log, a process application will create a unique tracking record for each event notification it processes. The tracking records determine how far the process has progressed through the notification log. The tracking records are used to set the position of the notification log reader when the process application is commenced or resumed.

There can only be one tracking record for each event notification. Once the tracking record has been written it can’t be written again, in which case neither will any new domain events. Hence, if a domain event notification can be processed at all, then it will be processed exactly once.

Policies

A process application will respond to domain events according to its policy. Its policy might do nothing in response to one type of event, and it might call an aggregate command method in response to another type of event. If the aggregate method generates new domain events, they will be available in its notification log for others to read, just like a normal event-sourced application.

Whatever the policy response, the process application will write one tracking record for each event notification, along with new stored event and notification records, in an atomic database transaction.

Atomicity

Just like a ratchet is as strong as its teeth and pawl, a process application is as reliable as the atomicity of its database transactions. If some of the new records from processing a domain event can’t be written, then none will be committed. If anything goes wrong before all the records are written, the transaction will abort, and none of the records will be committed. On the other hand, if a some records are committed, then all will be committed, and the process will complete an atomic progression.

The atomicity of the recording and consumption determines the production as atomic: a continuous stream of events is processed in discrete, sequenced, indivisible units. Hence, interruptions can only cause delays.

Whilst the heart of this design is having the event processing proceed atomically so that any completed “process events” are exactly what they should be, of course the “CID” parts of ACID database transactions are also crucial. Especially, it is assumed that any records that have been committed will be available after any so-called “infrastructure failure”. The continuing existence of data that has been successfully committed to a database is beyond the scope of this discussion about reliability. However, the “single point of failure” this may represent is acknowledged.

System of processes

The library class System can be used to define a system of process applications, entirely independently of infrastructure. In a system, one process application can follow another. One process can follow two other processes in a slightly more complicated system. A system could be just one process application following itself.

The reliability of the domain event processing allows a reliable “saga” or a “process manager” to be written without restricting or cluttering the application logic with precaution and remediation for infrastructure failures.

Infrastructure-independence

A system of process applications can be defined independently of infrastructure so that the same system can be run with different infrastructure at different times. For example, a system of process applications could be developed for use with SQLAlchemy, and later reused in a Django project.

System runners

A system of process applications can run in a single thread, with synchronous propagation and processing of events through the system pipeline. A system can also be run with multiple threads or multiple operating system processes, with application state propagated asynchronously in various ways.

An asynchronous pipeline with multi-threading or multi-processing means one event can be processed by each process application at the same time. This is very much like instruction pipelining in a CPU core.

Maintainability

Whilst maintainability is greatly assisted by having an entire system of applications defined independently of infrastructure, it also greatly helps to run such a system synchronously with a single thread. So long as the behaviours are preserved, running the system without any concurrent threads or processes makes it much easier to develop and maintain the system.

Scalability

Especially when using multiple operating system processes, throughput can be increased by breaking longer steps into smaller steps, up but only to a limit provided by the number of steps actually required by the domain. Such “diachronic” parallelism therefore provides limited opportunities for scaling throughput.

A system of process applications can also be run with many parallel instances of the system pipeline. This is very much like the way a multi-core CPU has many cores (a core is a pipeline). This “synchronic” parallelism means that many domain events can be processed by the same process application at the same time. This kind of parallelism allows the system to be scaled, but only to a limit provided by the degree of parallelism inherent in the domain (greatest when there are no causal dependencies between domain events, least when there are maximal causal dependencies between domain events).

Causal dependencies

Causal dependencies are needed to synchronise between parallel processing of a sequence of events. This is used in the library when a system is run with multiple pipelines.

Causal dependencies between events can be automatically detected and used to synchronise the processing of parallel pipelines downstream. For example, if an aggregate is created and then updated, the second event is obviously causally dependent on the first (you can’t update something that doesn’t exist). Downstream processing in one pipeline can wait (stall) for a dependency to be processed in another pipeline. This is like a pipeline interlock in a multi-core CPU.

In the process applications, the causal dependencies are automatically inferred by detecting the originator ID and version of aggregates as they are retrieved from the repository. The old notifications are referenced in the first new notification. Downstream can then check all causal dependencies have been processed, using its tracking records.

In case there are many dependencies in the same pipeline, only the newest dependency in each pipeline is included. By default in the library, only dependencies in different pipelines are included. If causal dependencies from all pipelines were included in each notification, each pipeline could be processed in parallel, to an extent limited by the dependencies between the notifications.

Kahn process networks

Because a notification log and reader functions effectively as a FIFO, a system of determinate process applications can be recognised as a Kahn Process Network (KPN).

Kahn Process Networks are determinate systems. If a system of process applications happens to involve processes that are not determinate, or if the processes split and combine or feedback in a random way so that nondeterminacy is introduced by design, the system as a whole will not be determinate, and could be described in more general terms as “dataflow” or “stream processing”.

Whether or not a system of process applications is determinate, the processing will be reliable (results unaffected by infrastructure failures).

High performance or “real time” processing could be obtained by avoiding writing to a durable database and instead running applications with an in-memory database.

Process managers

A process application, specifically an aggregate combined with a policy in a process application, could function effectively as a “saga”, or “process manager”, or “workflow manager”. That is, it could effectively control a sequence of steps involving other aggregates in other bounded contexts, steps that might otherwise be controlled with a “long-lived transaction”. It could ‘maintain the state of the sequence and determine the next processing step based on intermediate results’, to quote a phrase from Enterprise Integration Patterns. Exceptional “unhappy path” behaviour can be implemented as part of the logic of the application.

Example

The example below is suggestive of an orders-reservations-payments system. The system automatically processes a new Order by making a Reservation, and then a Payment; facts registered with the Order as they happen.

The system is run: firstly as a single threaded system; then with multiprocessing using a single pipeline; multiprocessing with multiple pipelines; and finally multiple pipelines with the actor model.

The behaviour of the system is entirely defined by the combination of the aggregates and the policies of its process applications. This allows highly maintainable code, code that is easily tested, easily understood, easily changed, easily reconfigured for use with different infrastructure.

Please note, the code presented in the example below works only with the library’s SQLAlchemy and Django infrastructure code. Support for Cassandra is being considered but such applications will probably be simple replications of application state, due to the limited atomicity of Cassandra’s lightweight transactions. For example, Cassandra could be used to archive events written firstly into a relational database. Events could be removed from the relational database before storage limits are encountered. Events missing in the relational database could be sourced from Cassandra.

Aggregates

In the domain model below, event-sourced aggregates are defined for orders, reservations, and payments.

An Order can be created. An existing order can be set as reserved, which involves a reservation ID. Having been created and reserved, an order can be set as paid, which involves a payment ID.

from eventsourcing.domain.model.aggregate import AggregateRoot


class Order(AggregateRoot):

    class Event(AggregateRoot.Event):
        pass

    @classmethod
    def create(cls, command_id):
        return cls.__create__(command_id=command_id)

    class Created(Event, AggregateRoot.Created):
        pass

    def __init__(self, command_id=None, **kwargs):
        super(Order, self).__init__(**kwargs)
        self.command_id = command_id
        self.reservation_id = None
        self.payment_id = None

    @property
    def is_reserved(self):
        return self.reservation_id is not None

    def set_is_reserved(self, reservation_id):
        assert not self.is_reserved, "Order {} already reserved.".format(self.id)
        self.__trigger_event__(
            Order.Reserved, reservation_id=reservation_id
        )

    class Reserved(Event):
        def mutate(self, order: "Order"):
            order.reservation_id = self.reservation_id

    @property
    def is_paid(self):
        return self.payment_id is not None

    def set_is_paid(self, payment_id):
        assert not self.is_paid, "Order {} already paid.".format(self.id)
        self.__trigger_event__(
            self.Paid, payment_id=payment_id, command_id=self.command_id
        )

    class Paid(Event):
        def mutate(self, order: "Order"):
            order.payment_id = self.payment_id

A Reservation can be created. A reservation has an order_id.

class Reservation(AggregateRoot):

    class Event(AggregateRoot.Event):
        pass

    @classmethod
    def create(cls, order_id):
        return cls.__create__(order_id=order_id)

    class Created(Event, AggregateRoot.Created):
        pass

    def __init__(self, order_id, **kwargs):
        super(Reservation, self).__init__(**kwargs)
        self.order_id = order_id

Similarly, a Payment can be created. A payment also has an order_id.

class Payment(AggregateRoot):

    class Event(AggregateRoot.Event):
        pass

    @classmethod
    def create(cls, order_id):
        return cls.__create__(order_id=order_id)

    class Created(Event, AggregateRoot.Created):
        pass

    def __init__(self, order_id, **kwargs):
        super(Payment, self).__init__(**kwargs)
        self.order_id = order_id

All the domain event classes are defined explicitly on the aggregate root classes. This is important because the application policies will use the domain event classes to decide how to respond to the events, and if the aggregate classes use the event classes from the base aggregate root class, then one aggregate’s Created event can’t be distinguished from another’s, and the application policy won’t work as expected.

The behaviours of this domain model can be fully tested with simple test cases, without involving any other components.

Commands

Commands have been discussed previously as methods on domain entities. Here, system commands are introduced, as event sourced aggregates created within a separate “commands application”.

One advantage of having distinct command aggregates is that old commands can be used to check the same application state is generated by a new version of the system.

Another advantage of using a separate commands application is that commands can be introduced into an event processing system without interrupting the event processing of the core process applications. (Treating a process application as a normal application certainly works, but can potentially cause contention writing to the notification log.)

Responses can be collected by creating separate “command response” aggregates in a separate “responses” process application. An alternative approach involves updating the command aggregate, and having the commands application follow a core process application.

In the example below, the command class CreateOrder is defined using the library’s command class, Command, which extends the library’s AggregateRoot class with a method done() and a property is_done.

The CreateOrder class extends the library’s Command. A CreateOrder command can be assigned an order ID. Its order_id is initially None.

from eventsourcing.domain.model.command import Command
from eventsourcing.domain.model.decorators import attribute


class CreateOrder(Command):

    class Event(Command.Event):
        pass

    @classmethod
    def create(cls):
        return cls.__create__()

    class Created(Event, Command.Created):
        pass

    @attribute
    def order_id(self):
        pass

    class AttributeChanged(Event, Command.AttributeChanged):
        pass

The order_id will eventually be used to keep the ID of an Order aggregate created by the system in response to a CreateOrder command being created.

The behaviour of a system command aggregate can be fully tested with simple test cases, without involving any other components.

from uuid import uuid4


def test_create_order_command():

    # Create a "create order" command.
    cmd = CreateOrder.create()

    # Check the initial values.
    assert cmd.order_id is None
    assert cmd.is_done is False

    # Assign an order ID.
    order_id = uuid4()
    cmd.order_id = order_id
    assert cmd.order_id == order_id

    # Mark the command as "done".
    cmd.done()
    assert cmd.is_done is True

    # Check the events.
    events = cmd.__batch_pending_events__()
    assert len(events) == 3
    assert isinstance(events[0], CreateOrder.Created)
    assert isinstance(events[1], CreateOrder.AttributeChanged)
    assert isinstance(events[2], CreateOrder.Done)


# Run the test.
test_create_order_command()

Processes

A process application has a policy which defines how events are processed. In the code below, process applications are defined for orders, reservations, and payments.

The Orders process application policy responds to new commands by creating a new Order aggregate. It responds to new reservations by setting an Order as reserved. And it responds to a new Payment, by setting an Order as paid.

from eventsourcing.application.process import ProcessApplication
from eventsourcing.application.decorators import applicationpolicy


class Orders(ProcessApplication):

    @applicationpolicy
    def policy(self, repository, event):
        pass

    @policy.register(CreateOrder.Created)
    def _(self, repository, event):
        return self.create_order(command_id=event.originator_id)

    @policy.register(Reservation.Created)
    def _(self, repository, event):
        self._set_order_is_reserved(repository, event)

    @policy.register(Payment.Created)
    def _(self, repository, event):
        self._set_order_is_paid(repository, event)

    @staticmethod
    def create_order(command_id):
        return Order.create(command_id=command_id)

    def _set_order_is_reserved(self, repository, event):
        order = repository[event.order_id]
        assert not order.is_reserved
        order.set_is_reserved(event.originator_id)

    def _set_order_is_paid(self, repository, event):
        order = repository[event.order_id]
        assert not order.is_paid
        order.set_is_paid(event.originator_id)

The decorator @applicationpolicy is similar to @singledispatch from the functools core Python package. It isn’t magic, it’s just a slightly better alternative to an “if-instance-elif-isinstance-…” block.

The Reservations process application responds to an Order.Created event by creating a new Reservation aggregate.

class Reservations(ProcessApplication):

    @applicationpolicy
    def policy(self, repository, event):
        pass

    @policy.register(Order.Created)
    def _(self, repository, event):
        return self.create_reservation(event.originator_id)

    @staticmethod
    def create_reservation(order_id):
        return Reservation.create(order_id=order_id)

The payments process application responds to an Order.Reserved event by creating a new Payment.

class Payments(ProcessApplication):

    @applicationpolicy
    def policy(self, repository, event):
        pass

    @policy.register(Order.Reserved)
    def _(self, repository, event):
        order_id = event.originator_id
        return self.create_payment(order_id)

    @staticmethod
    def create_payment(order_id):
        return Payment.create(order_id=order_id)

Additionally, the library class CommandProcess is extended by defining a policy that responds to Order.Created events by setting the order_id on the command, and to Order.Paid events by setting the command as done. It also has a factory method create_order() which can be used to create new Order aggregates.

from eventsourcing.application.command import CommandProcess
from eventsourcing.domain.model.decorators import retry
from eventsourcing.exceptions import OperationalError, RecordConflictError


class Commands(CommandProcess):

    persist_event_type = CreateOrder.Event

    @applicationpolicy
    def policy(self, repository, event):
        pass

    @policy.register(Order.Created)
    def _(self, repository, event):
        cmd = repository[event.command_id]
        cmd.order_id = event.originator_id

    @policy.register(Order.Paid)
    def _(self, repository, event):
        cmd = repository[event.command_id]
        cmd.done()

    @staticmethod
    @retry((OperationalError, RecordConflictError), max_attempts=10, wait=0.01)
    def create_order():
        cmd = CreateOrder.create()
        cmd.__save__()
        return cmd.id

The @retry decorator overcomes contention when creating new commands whilst also processing domain events from the Orders application.

Please note, the __save__() method of aggregates must not be called in a process policy, because pending events from both new and changed aggregates will be automatically collected by the process application after its policy() method has returned. To be reliable, a process application needs to commit all the event records atomically with a tracking record, and calling __save__() will instead commit events in a separate transaction. Policies must return new aggregates to the caller, but do not need to return existing aggregates that have been accessed or changed.

Process policies are just functions, and are easy to test.

In the orders policy test below, an existing order is marked as reserved because a reservation was created. The only complication comes from needing to prepare at least a fake repository and a domain event, given as required arguments when calling the policy in the test. If the policy response depends on already existing aggregates, they will need to be added to the fake repository. A Python dict can function effectively as a fake repository in such tests. It seems simplest to directly use the model domain event classes and aggregate classes in these tests, rather than coding test doubles.

def test_orders_policy():

    # Prepare repository with a real Order aggregate.
    order = Order.create(command_id=None)
    repository = {order.id: order}

    # Check order is not reserved.
    assert not order.is_reserved

    # Check order is reserved whenever a reservation is created.
    event = Reservation.Created(originator_id=uuid4(), originator_topic='', order_id=order.id)
    Orders().policy(repository, event)
    assert order.is_reserved


# Run the test.
test_orders_policy()

In the payments policy test below, a new payment is created because an order was reserved.

def test_payments_policy():

    # Prepare repository with a real Order aggregate.
    order = Order.create(command_id=None)
    repository = {order.id: order}

    # Check payment is created whenever order is reserved.
    event = Order.Reserved(originator_id=order.id, originator_version=1)
    payment = Payments().policy(repository, event)
    assert isinstance(payment, Payment), payment
    assert payment.order_id == order.id


# Run the test.
test_payments_policy()

It isn’t necessary to return changed aggregates from the policy. The test will already have a reference to the aggregate, since it will have constructed the aggregate before passing it to the policy in the fake repository, so the test will already be in a good position to check that already existing aggregates are changed by the policy as expected. The test gives a repository to the policy, which contains the order aggregate expected by the policy.

System

A system of process applications can be defined using one or many pipeline expressions.

The expression A | A would have a process application class called A following itself. The expression A | B | C would have A followed by B and B followed by C. This can perhaps be recognised as the “pipes and filters” pattern, where the process applications function effectively as the filters.

In this example, firstly the Orders process will follow the Commands process so that orders can be created. The Commands process will follow the Orders process, so that commands can be marked as done when processing is complete.

commands_pipeline = Commands | Orders | Commands

Similarly, the Orders process and the Reservations process will follow each other. Also the Orders and the Payments process will follow each other.

reservations_pipeline = Orders | Reservations | Orders

payments_pipeline = Orders | Payments | Orders

The orders-reservations-payments system can be defined using these pipeline expressions.

from eventsourcing.application.system import System

system = System(commands_pipeline, reservations_pipeline, payments_pipeline)

This is equivalent to a system defined with the following single pipeline expression.

pipeline = Commands | Orders | Reservations | Orders | Payments | Orders | Commands

system = System(pipeline)

Although a process application class can appear many times in the pipeline expressions, there will only be one instance of each process when the pipeline system is instantiated. Each application can follow one or many applications, and can be followed by one or many applications.

In this system, application state is propagated between process applications through notification logs only. This can perhaps be recognised as the “bounded context” pattern. Each application can access only the aggregates it has created. For example, an Order aggregate created by the Orders process is available in neither the repository of Reservations nor the repository of Payments. If an application could directly use the aggregates of another application, then processing could produce different results at different times, and in consequence the processing wouldn’t be reliable. If necessary, a process application can replicate upstream aggregates within its own state.

Runners

The system above has been defined entirely independently of infrastructure. Concrete application infrastructure is introduced by the system runners. A concrete application infrastructure class can be specified when constructing a system runner with a suitable value of infrastructure_class. A system runner can be used as a context manager.

from eventsourcing.application.popo import PopoApplication
from eventsourcing.application.system import SingleThreadedRunner

with SingleThreadedRunner(system, infrastructure_class=PopoApplication):

    # Do stuff here...
    pass

Single threaded runner

If the system object is used with the library class SingleThreadedRunner, the process applications will run in a single thread in the current process.

Events will be processed with synchronous handling of prompts, so that policies effectively call each other recursively, according to which applications each is followed by.

In the example below, the system object is used directly as a context manager. Using the system object in this manner implicitly constructs a SingleThreadedRunner. As a special case, by default this runner introduces concrete application infrastructure class PopoApplication, which literally uses plain old Python objects to store domain events in memory, and is the fastest concrete application infrastructure class in the library (much faster than in-memory SQLite). It can be used when proper disk-based durability is not required, for example during system development.

with system:

    # Create "create order" command.
    cmd_id = system.commands.create_order()

    # Check the command has an order ID and is done.
    cmd = system.commands.repository[cmd_id]
    assert cmd.order_id
    assert cmd.is_done

    # Check the order is reserved and paid.
    order = system.orders.repository[cmd.order_id]
    assert order.is_reserved
    assert order.is_paid

    # Check the reservation exists.
    reservation = system.reservations.repository[order.reservation_id]

    # Check the payment exists.
    payment = system.payments.repository[order.payment_id]

Everything happens synchronously, in a single thread, so that by the time create_order() has returned, the command has been fully processed by the system.

Running the system with a single thread and an in-memory database is useful when developing and testing a system of process applications, because it runs very quickly and the behaviour is very easy to follow.

Multiprocess runner

The example below shows the same system of process applications running in different operating system processes, using the library’s MultiprocessRunner class (which uses Python’s multiprocessing library).

Running the system with multiple operating system processes means the different processes are running concurrently, so that as the payment is made for one order, another order might get reserved, whilst a third order is at the same time created.

The code below uses the library’s MultiprocessRunner class to run the system. It will start one operating system process for each process application in the system, which in this example will give a pipeline with four child operating system processes. This example uses SQLAlchemy to access a MySQL database. The concrete infrastructure class is SQLAlchemyApplication.

from eventsourcing.application.multiprocess import MultiprocessRunner
from eventsourcing.application.sqlalchemy import SQLAlchemyApplication

runner = MultiprocessRunner(
    system=system,
    infrastructure_class=SQLAlchemyApplication,
    setup_tables=True
)

The following MySQL database connection string is compatible with SQLAlchemy.

import os

os.environ['DB_URI'] = 'mysql+pymysql://{}:{}@{}/eventsourcing?charset=utf8mb4&binary_prefix=true'.format(
    os.getenv('MYSQL_USER', 'root'),
    os.getenv('MYSQL_PASSWORD', ''),
    os.getenv('MYSQL_HOST', '127.0.0.1'),
)

The MySQL database needs to be created before running the next bit of code.

$ mysql -e "CREATE DATABASE eventsourcing;"

Single pipeline

Since the multi-processing pipeline is asynchronous, let’s define a method to check things are eventually done.

@retry((AssertionError, KeyError), max_attempts=50, wait=0.1)
def assert_eventually_done(repository, cmd_id):
    """Checks the command is eventually done."""
    assert repository[cmd_id].is_done

The multiple operating system processes can be started by using the runner as a context manager.

with runner:

    # Create "create order" command.
    cmd_id = runner.commands.create_order()

    # Wait for the processing to complete....
    assert_eventually_done(system.commands.repository, cmd_id)

    # Check the command has an order ID and is done.
    cmd = runner.commands.repository[cmd_id]
    assert cmd.order_id

    # Check the order is reserved and paid.
    order = runner.orders.repository[cmd.order_id]
    assert order.is_reserved
    assert order.is_paid

    # Check the reservation exists.
    reservation = runner.reservations.repository[order.reservation_id]

    # Check the payment exists.
    payment = runner.payments.repository[order.payment_id]

Multiple pipelines

The system can run with many instances of its pipeline. By having more than one instance of the system pipeline, more than one instance of each process application can be instantiated (one for each pipeline). Pipelines are distinguished by integer ID. The pipeline_ids are given to the MultiprocessRunner class when the runner is constructed.

In this example, there are three pipeline IDs, so there will be three instances of the system pipeline, giving twelve child operating system processes altogether.

runner = MultiprocessRunner(
    system=system,
    infrastructure_class=SQLAlchemyApplication,
    setup_tables=True,
    pipeline_ids = [0, 1, 2]
)

Fifteen orders will processed by the system altogether, five in each pipeline.

num_orders = 15

with runner:

    # Create new orders.
    command_ids = []
    while len(command_ids) < num_orders:
        for pipeline_id in runner.pipeline_ids:

            # Change the pipeline for the command.
            runner.commands.change_pipeline(pipeline_id)

            # Create a "create new order" command.
            cmd_id = runner.commands.create_order()
            command_ids.append(cmd_id)

    # Check all commands are eventually done.
    assert len(command_ids)
    for command_id in command_ids:
        assert_eventually_done(runner.commands.repository, command_id)

It would be possible to run the system with e.g. pipelines 0-7 on one machine, pipelines 8-15 on another machine, and so on. That sort of thing can be expressed in configuration management, for example with Kubernetes.

If cluster scaling is automated, it would be useful for processes to be distributed automatically across the cluster. Actor model seems like one possible foundation for such automation.

Actor model runner

An Actor model library, in particular the Thespian Actor Library, can also be used to run a multi-pipeline system of process applications.

The example below runs with Thespian’s “simple system base”. The actors will run by sending messages recursively.

from eventsourcing.application.actors import ActorModelRunner

with ActorModelRunner(system=system, setup_tables=True,
                      infrastructure_class=SQLAlchemyApplication,
                      pipeline_ids=[0, 1, 2]) as runner:

    # Create new orders.
    command_ids = []
    while len(command_ids) < num_orders:
        for pipeline_id in runner.pipeline_ids:

            # Change the pipeline for the command.
            runner.commands.change_pipeline(pipeline_id)

            # Create a "create new order" command.
            cmd_id = runner.commands.create_order()
            command_ids.append(cmd_id)

    # Check all commands are eventually done.
    assert len(command_ids)
    for command_id in command_ids:
        assert_eventually_done(runner.commands.repository, command_id)

With Thespian, a “system base” other than the default “simple system base” can be started by calling the functions start_multiproc_tcp_base_system() or start_multiproc_queue_base_system() before starting the system actors.

The base system can be shutdown by calling shutdown_actor_system(), which will shutdown any actors that are running in that base system.

With the “multiproc” base systems, the process application system actors will be started in separate operating system processes. After they have been started, they will continue to run until they are shutdown. The system actors can be started by calling actors.start(). The actors can be shutdown with actors.shutdown().

If actors is used as a context manager, as above, the start() method is called when the context manager enters. The close() method is called when the context manager exits. By default the shutdown() method is not called by close(). If ActorModelRunner is constructed with shutdown_on_close=True, which is False by default, then the actors will be shutdown by close(), and so also when the context manager exits. Even so, shutting down the system actors will not shutdown a “multiproc” base system.

Pool of workers

An alternative to having a thread dedicated to every process application for each pipeline, the prompts could be sent to via a queue to a pool of workers, which change pipeline and application according to the prompt. Causal dependencies would be needed for all notifications, which is not the library default. The library does not currently support processing events with a pool of workers.

Integration with APIs

Integration with systems that present a server API or otherwise need to be sent messages (rather than using notification logs), can be integrated by responding to events with a policy that uses a client to call the API or send a message. However, if there is a breakdown during the API call, or before the tracking record is written, then to avoid failing to make the call, it may happen that the call is made twice. If the call is not idempotent, and is not otherwise guarded against duplicate calls, there may be consequences to making the call twice, and so the situation cannot really be described as reliable.

If the server response is asynchronous, any callbacks that the server will make could be handled by calling commands on aggregates. If callbacks might be retried, perhaps because the handler crashes after successfully calling a command but before returning successfully to the caller, unless the callbacks are also tracked (with exclusive tracking records written atomically with new event and notification records) the aggregate commands will need to be idempotent, or otherwise guarded against duplicate callbacks. Such an integration could be implemented as a separate “push-API adapter” process, and it might be useful to have a generic implementation that can be reused, with documentation describing how to make such an integration reliable, however the library doesn’t currently have any such adapter process classes or documentation.

Process event pattern

draft

A set of EVENT SOURCED APPLICATIONS can be composed into a system of applications. Application state can be propagated to other applications. Application state is defined by domain event records that have been committed. Each application has a policy which defines how it responds to the domain events it processes.

Infrastructure may fail at any time. Although committed database transactions are expected to be durable, the operating system processes, the network, and the databases may go down at any time. Depending on the system design, application state may be adversely affected by infrastructure failures.

Therefore…

Use counting to sequence the domain events of an application. Use a unique constraint to make sure only one domain event is recorded for each position. Ensure there are no gaps by calculating the next position from the last recorded position. Also use counting to follow the domain events of an upstream application. Use a tracking record to store the current position in the upstream sequence. Use a unique constraint to make sure tracking can be recorded for each upstream domain event only once.

Use atomic database transactions to record process event atomically. Include the tracking position, the new domain events created by application policy, and their position in the application’s sequence. Use an object class (or other data type) called “ProcessEvent” to keep these data together, so that they can be passed into functions as a single argument.

Then, the distributed system can be considered reliable in the sense that the facts in the database will represent either that a process event occurred or that it didn’t occur, and so application state will by entirely unaffected by infrastructure failures.

Event sourced applications may be implemented with EVENT SOURCED AGGREGATES. To scale the system, use CAUSAL DEPENDENCIES to synchronise parallel pipelines. Use SYSTEM RUNNERS to bind system to infrastructure it needs to run.