Tutorial - Part 4 - Systems¶
As we saw in Part 3, we can use the library’s
Application
class to define event-sourced
applications. Now let’s explore event-driven systems.
We will create two applications: the DogSchool
application that we discussed in Part 3,
and a second application which processes the domain events of the DogSchool
application. We will compose these two applications into a “system” and run the system
with a “runner”, with different combinations of runners and persistence modules.
First, let’s define the DogSchool
application and the Dog
aggregate.
from uuid import uuid5, NAMESPACE_URL
from eventsourcing.application import Application
from eventsourcing.domain import Aggregate, event
class DogSchool(Application):
def register_dog(self, name):
dog = Dog(name)
self.save(dog)
def add_trick(self, name, trick):
dog = self.repository.get(Dog.create_id(name))
dog.add_trick(trick=trick)
self.save(dog)
def get_dog(self, name):
dog = self.repository.get(Dog.create_id(name))
return {'name': dog.name, 'tricks': tuple(dog.tricks)}
class Dog(Aggregate):
@event('Registered')
def __init__(self, name):
self.name = name
self.tricks = []
@classmethod
def create_id(cls, name):
return uuid5(NAMESPACE_URL, f'/dogs/{name}')
@event('TrickAdded')
def add_trick(self, trick):
self.tricks.append(trick)
Process applications¶
Second, let’s define an application which counts tricks added in the Dog
aggregates, by
processing the domain events of the DogSchool
application. The Counters
class below is
an event processing application. It extends the library’s ProcessApplication
class.
The most important thing that needs to be defined when processing domain events is a policy function.
The policy function defines how individual domain events will be processed. A policy function has different responses for different types of domain events. The policy function may create changes to the state of the event processing application. These could be changes to an event-sourced domain model, or they could be updates to a non-event sourced materialized view. In this example, we will make changes to an event-sourced domain model.
In the example below, the Counters
application counts the tricks added
in the Dog
aggregates. It has a policy()
function that processes the
Dog.TrickAdded
events of the DogSchool
application. It makes changes to an
event-sourced domain model comprised of Counter
aggregates.
The Counter
aggregate class has a name
which will correspond to the name of a trick.
It also has a count
attribute, which is an integer value with an initial value of 0
. It
also has an increment()
method, decorated with the @event
decorator, which increments the value of its count
attribute.
When a Dog.TrickAdded
event is processed by the policy()
function of the Counters
application,
the name of the trick is used to get or create a Counter
aggregate object. Then, the counter’s
increment()
method is called once. The new domain events are then collected on a “processing event”
object before the policy function returns.
The policy()
function receives two arguments: domain_event
and process_event
. The domain_event
argument is a domain event object that is to be processed. The process_event
is an instance of the
ProcessingEvent
class. New domain events created in the
policy function are collected by calling the process event object’s
collect_events()
method.
The purpose of the process event object is to hold all the new domain events created by the policy function, along
with a Tracking
object that indicates a position in an application sequence
of the domain event that is being processed. These factors will be recorded together atomically by the process
application after the policy function returns. The tracking records are used to avoid dual writing in the
consumption and processing of domain events, so that each domain event is processed exactly once.
from eventsourcing.application import AggregateNotFoundError
from eventsourcing.system import ProcessApplication
from eventsourcing.dispatch import singledispatchmethod
class Counters(ProcessApplication):
@singledispatchmethod
def policy(self, domain_event, process_event):
"""Default policy"""
@policy.register(Dog.TrickAdded)
def _(self, domain_event, process_event):
trick = domain_event.trick
try:
counter_id = Counter.create_id(trick)
counter = self.repository.get(counter_id)
except AggregateNotFoundError:
counter = Counter(trick)
counter.increment()
process_event.collect_events(counter)
def get_count(self, trick):
counter_id = Counter.create_id(trick)
try:
counter = self.repository.get(counter_id)
except AggregateNotFoundError:
return 0
return counter.count
class Counter(Aggregate):
def __init__(self, name):
self.name = name
self.count = 0
@classmethod
def create_id(cls, name):
return uuid5(NAMESPACE_URL, f'/counters/{name}')
@event('Incremented')
def increment(self):
self.count += 1
Defining an event-driven system¶
Just like an application can be defined independently of concrete persistence infrastructure, we can
define which applications “follow” which other applications independently of the manner in which domain
events are pulled and processed. For this purpose, we can use the library’s
System
class to specify a list of “pipes”.
In the example below, we define a system with one “pipe” that has the DogSchool
application
followed by the Counters
application.
from eventsourcing.system import System
system = System(pipes=[[DogSchool, Counters]])
The system object builds a graph of the application classes, identifying “nodes” and “edges”.
assert list(system.nodes) == ['DogSchool', 'Counters']
assert system.edges == [('DogSchool', 'Counters')]
When the system is run, the nodes will be instantiated as application objects, and the edges will be used to set up the applications to “lead” and “follow” each other. Exactly how depends upon the concrete implementation of a system runner.
Running an event-driven system¶
Just like it’s possible to store events in different ways, it’s possible to run an event-driven system in different ways. There are many possibilities for the orchestration of the applications in a system and for interprocess communication between the applications. One possibility is to use a single thread, and to pull and process events synchronously and sequentially. Another possibility is to use multiple threads in the same operating system process, with events processed concurrently and asynchronously. If the application objects are all constructed in the same operating system process, their notification logs can be used directly.
Another possibility is to use multiple operating system processes on the same machine, or alternatively on different machines in a network. When running a system with multiple operating system processes, the notification logs must be accessed remotely across the operating system process boundary. There are many possible alternatives for inter-process communication, by which events are transported from one application to another.
The important thing, in all these cases, is to pull and process a sequence of events, and for new
state in the downstream application to be recorded atomically along with a unique tracking record
that indicates the position in the upstream sequence. And, when resuming the processing of events,
to use the last recorded position in the downstream application to pull subsequent events from the
upstream application. To demonstrate how this works, this library provides a
SingleThreadedRunner
and a MultiThreadedRunner
.
The SingleThreadedRunner
and MultiThreadedRunner
classes implement the abstract Runner
class. These system runners are constructed
with an instance of the System
class, and optionally an env
dictionary.
The runners have a start()
method which constructs and connects the
applications. The runners also have a get()
method, which returns an
application. When application command methods are called, new events will be propagated and processed,
according to the system definition and the application policies. Application query methods can be used
to obtain the resulting state of the system.
The test()
function below shows how the abstract runner interface can be used to operate the dog school
trick counting system.
We will run the test()
function firstly with the SingleThreadedRunner
and
then the MultiThreadedRunner
. The applications will use the POPO persistence
module by default. We will then run the test again, with the library’s SQLite persistence module, and then
with the PostgreSQL persistence module.
from time import sleep
def test(system, runner_class, wait=0, env=None):
# Start running the system.
runner = runner_class(system, env=env)
runner.start()
# Get the application objects.
school = runner.get(DogSchool)
counters = runner.get(Counters)
# Generate some events.
school.register_dog('Billy')
school.register_dog('Milly')
school.register_dog('Scrappy')
school.add_trick('Billy', 'roll over')
school.add_trick('Milly', 'roll over')
school.add_trick('Scrappy', 'roll over')
# Wait in case events are processed asynchronously.
sleep(wait)
# Check the results of processing the events.
assert counters.get_count('roll over') == 3
assert counters.get_count('fetch ball') == 0
assert counters.get_count('play dead') == 0
# Generate more events.
school.add_trick('Billy', 'fetch ball')
school.add_trick('Milly', 'fetch ball')
# Check the results.
sleep(wait)
assert counters.get_count('roll over') == 3
assert counters.get_count('fetch ball') == 2
assert counters.get_count('play dead') == 0
# Generate more events.
school.add_trick('Billy', 'play dead')
# Check the results.
sleep(wait)
assert counters.get_count('roll over') == 3
assert counters.get_count('fetch ball') == 2
assert counters.get_count('play dead') == 1
# Stop the runner.
runner.stop()
Single-threaded runner¶
We can run the system with the SingleThreadedRunner
.
from eventsourcing.system import SingleThreadedRunner
test(system, SingleThreadedRunner)
When the events are processed synchronously, we do not need to wait
for the results,
because the events will have been processed before the application command returns.
The applications will use the default POPO persistence module, because the environment variable
PERSISTENCE_MODULE
has not been set.
Multi-threaded runner¶
We can also run the system with the MultiThreadedRunner
. Because
the events are processed asynchronously, we need to wait
for the results.
from eventsourcing.system import MultiThreadedRunner
test(system, MultiThreadedRunner, wait=0.1)
Again, the applications will use the default POPO persistence module, because the environment variable
PERSISTENCE_MODULE
has not been set.
SQLite environment¶
We can also run the system of applications with the library’s SQLite persistence module. In the example below, the applications use in-memory SQLite databases.
import os
# Use SQLite for persistence.
os.environ['PERSISTENCE_MODULE'] = 'eventsourcing.sqlite'
# Use a separate in-memory database for each application.
os.environ['SQLITE_DBNAME'] = ':memory:'
# Run the system tests.
test(system, SingleThreadedRunner)
When the events are processed synchronously, we do not need to wait
for the results,
because the events will have been processed before the application command returns.
When running the system with the multi-threaded runner and SQLite databases, we need to be
careful to use separate databases for each application. We could use a file-based
database, but here we will use in-memory SQLite databases. Because we need SQLite’s in-memory
databases to support multi-threading, we need to enable SQLite’s shared cache. Because we
need to enable the shared cache, and we need more than one database in the same operating
system process, we also need to use named in-memory databases. The SQLite URI pattern
'file:{NAME}?mode=memory&cache=shared'
specifies a named in-memory database that has a shared cache.
In order to distinguish environment variables for different applications in a system, the environment
variable names should be prefixed with the application name.
# Use separate named in-memory databases in shared cache.
os.environ['DOGSCHOOL_SQLITE_DBNAME'] = 'file:dogschool?mode=memory&cache=shared'
os.environ['COUNTERS_SQLITE_DBNAME'] = 'file:counters?mode=memory&cache=shared'
# Run the system tests.
test(system, MultiThreadedRunner, wait=0.2)
When the events are processed asynchronously, we need to wait
for the results.
PostgreSQL environment¶
We can also run the system with the library’s PostgreSQL persistence module. Just for fun, we will also configure the system to compress and encrypt the domain events.
import os
from eventsourcing.cipher import AESCipher
# Generate a cipher key (keep this safe).
cipher_key = AESCipher.create_key(num_bytes=32)
# Cipher key.
os.environ['CIPHER_KEY'] = cipher_key
# Cipher topic.
os.environ['CIPHER_TOPIC'] = 'eventsourcing.cipher:AESCipher'
# Compressor topic.
os.environ['COMPRESSOR_TOPIC'] = 'eventsourcing.compressor:ZlibCompressor'
# Use Postgres database.
os.environ['PERSISTENCE_MODULE'] = 'eventsourcing.postgres'
# Configure database connections.
os.environ['POSTGRES_DBNAME'] = 'eventsourcing'
os.environ['POSTGRES_HOST'] = '127.0.0.1'
os.environ['POSTGRES_PORT'] = '5432'
os.environ['POSTGRES_USER'] = 'eventsourcing'
os.environ['POSTGRES_PASSWORD'] = 'eventsourcing'
test(system, SingleThreadedRunner)
Although we must use different SQLite databases for different applications, we can use the same PostgreSQL database, because the PostreSQL persistence module creates separate tables for each application.
However, before running the test again with PostgreSQL, we need to reset the trick counts, because they are being stored in a durable database and so would simply accumulate. We can do this by deleting the database tables for the system.
from eventsourcing.postgres import PostgresDatastore
from eventsourcing.tests.postgres_utils import drop_postgres_table
db = PostgresDatastore(
'eventsourcing',
'127.0.0.1',
'5432',
'eventsourcing',
'eventsourcing',
)
drop_postgres_table(db, 'dogschool_events')
drop_postgres_table(db, 'counters_events')
drop_postgres_table(db, 'counters_tracking')
After resetting the recorded state of the system, we can run the system again with the multi-threaded runner.
test(system, MultiThreadedRunner, wait=0.2)
When the state of the system is recorded in a durable database, we can access the state of the system’s applications by directly constructing the application objects.
assert DogSchool().get_dog('Scrappy')['tricks'] == ('roll over',)
assert Counters().get_count('roll over') == 3
Exercises¶
Replicate the code in this tutorial in your development environment.
Copy the code snippets above.
Run the code with the default “plain old Python object” persistence module.
Configure and run the system with an SQLite database.
Create a PostgreSQL database, and configure and run the system with a PostgreSQL database.
Configure the system to run the
DogSchool
application with a PostgreSQL database and theCounters
application with a file-based SQLite database.Connect to the databases with the command line clients for SQLite and PostgreSQL, and examine the database tables to observe the stored event records and the tracking records.
2. Write a system that has a Game
application with Player
aggregates that
have a score
which can be updated, that is followed by a HallOfFame
application that
processes the score update events into an event-sourced HighScoreTable
aggregate that keeps
a list of the top three scores.
3. Write a system that processes events with “exactly once semantics” into a non-event sourced materialized view.
Next steps¶
For more information about event-driven systems, please read the system module documentation.
See also the Example systems.