system — Event-driven systems

This module shows how event-sourced applications can be combined to make an event driven system.

this page is under development — please check back soon

System of applications

The library’s system class…

from eventsourcing.system import System
from dataclasses import dataclass
from uuid import uuid4

from eventsourcing.domain import Aggregate, AggregateCreated, AggregateEvent


class World(Aggregate):
    def __init__(self, **kwargs):
        super(World, self).__init__(**kwargs)
        self.history = []

    @classmethod
    def create(cls):
        return cls._create(
            event_class=cls.Created,
            id=uuid4(),
        )

    class Created(AggregateCreated):
        pass

    def make_it_so(self, what):
        self.trigger_event(self.SomethingHappened, what=what)

    class SomethingHappened(AggregateEvent):
        what: str

        def apply(self, world):
            world.history.append(self.what)

Now let’s define an application…

from eventsourcing.application import Application


class WorldsApplication(Application):

    def create_world(self):
        world = World.create()
        self.save(world)
        return world.id

    def make_it_so(self, world_id, what):
        world = self.repository.get(world_id)
        world.make_it_so(what)
        self.save(world)

    def get_world_history(self, world_id):
        world = self.repository.get(world_id)
        return list(world.history)

Now let’s define an analytics application…

from uuid import uuid5, NAMESPACE_URL

class Counter(Aggregate):
    def __init__(self, **kwargs):
        super(Counter, self).__init__(**kwargs)
        self.count = 0

    @classmethod
    def create_id(cls, name):
        return uuid5(NAMESPACE_URL, f'/counters/{name}')

    @classmethod
    def create(cls, name):
        return cls._create(
            event_class=cls.Created,
            id=cls.create_id(name),
        )

    class Created(AggregateCreated):
        pass

    def increment(self):
        self.trigger_event(self.Incremented)

    class Incremented(AggregateEvent):
        def apply(self, counter):
            counter.count += 1
from eventsourcing.application import AggregateNotFound
from eventsourcing.system import ProcessApplication
from eventsourcing.dispatch import singledispatchmethod


class Counters(ProcessApplication):

    def policy(self, domain_event, process_event):
        pass

    @singledispatchmethod
    def policy(self, domain_event, process_event):
        """Default policy"""

    @policy.register(World.SomethingHappened)
    def _(self, domain_event, process_event):
        what = domain_event.what
        counter_id = Counter.create_id(what)
        try:
            counter = self.repository.get(counter_id)
        except AggregateNotFound:
            counter = Counter.create(what)
        counter.increment()
        process_event.save(counter)

    def get_count(self, what):
        counter_id = Counter.create_id(what)
        try:
            counter = self.repository.get(counter_id)
        except AggregateNotFound:
            return 0
        return counter.count
system = System(pipes=[[WorldsApplication, Counters]])

Single-threaded runner

from eventsourcing.system import SingleThreadedRunner

runner= SingleThreadedRunner(system)
runner.start()
worlds = runner.get(WorldsApplication)
counters = runner.get(Counters)

world_id1 = worlds.create_world()
world_id2 = worlds.create_world()
world_id3 = worlds.create_world()

assert counters.get_count('dinosaurs') == 0
assert counters.get_count('trucks') == 0
assert counters.get_count('internet') == 0

worlds.make_it_so(world_id1, 'dinosaurs')
worlds.make_it_so(world_id2, 'dinosaurs')
worlds.make_it_so(world_id3, 'dinosaurs')

assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 0
assert counters.get_count('internet') == 0

worlds.make_it_so(world_id1, 'trucks')
worlds.make_it_so(world_id2, 'trucks')

assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 2
assert counters.get_count('internet') == 0

worlds.make_it_so(world_id1, 'internet')

assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 2
assert counters.get_count('internet') == 1

Multi-threaded runner

from eventsourcing.system import MultiThreadedRunner

runner= MultiThreadedRunner(system)
runner.start()
worlds = runner.get(WorldsApplication)
counters = runner.get(Counters)

world_id1 = worlds.create_world()
world_id2 = worlds.create_world()
world_id3 = worlds.create_world()

worlds.make_it_so(world_id1, 'dinosaurs')
worlds.make_it_so(world_id2, 'dinosaurs')
worlds.make_it_so(world_id3, 'dinosaurs')

worlds.make_it_so(world_id1, 'trucks')
worlds.make_it_so(world_id2, 'trucks')

worlds.make_it_so(world_id1, 'internet')

from time import sleep

sleep(0.01)

assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 2
assert counters.get_count('internet') == 1

Classes

class eventsourcing.system.ProcessEvent(tracking: eventsourcing.persistence.Tracking)[source]

Bases: object

Keeps together a Tracking object, which represents the position of a domain event notification in the notification log of a particular application, and the new domain events that result from processing that notification.

__init__(tracking: eventsourcing.persistence.Tracking)[source]

Initalises the process event with the given tracking object.

save(*aggregates) → None[source]

Collects pending domain events from the given aggregate.

class eventsourcing.system.Follower[source]

Bases: eventsourcing.application.Application

Extends the Application class by using a process recorder as its application recorder, by keeping track of the applications it is following, and pulling and processing new domain event notifications through its policy() method.

__init__() → None[source]

Initialises an application with an InfrastructureFactory, a Mapper, an ApplicationRecorder, an EventStore, a Repository, and a LocalNotificationLog.

construct_recorder() → eventsourcing.persistence.ProcessRecorder[source]

Constructs and returns a ProcessRecorder for the application to use as its application recorder.

follow(name: str, log: eventsourcing.application.NotificationLog) → None[source]

Constructs a notification log reader and a mapper for the named application, and adds them to its collection of readers.

pull_and_process(name: str) → None[source]

Pulls and processes unseen domain event notifications from the notification log reader of the names application.

Converts received event notifications to domain event objects, and then calls the policy() with a new ProcessEvent object which contains a Tracking object that keeps track of the name of the application and the position in its notification log from which the domain event notification was pulled. The policy will save aggregates to the process event object, using its save() method, which collects pending domain events using the aggregates’ collect_events() method, and the process event object will then be recorded by calling the record() method.

policy(domain_event: eventsourcing.domain.AggregateEvent, process_event: eventsourcing.system.ProcessEvent) → None[source]

Abstract domain event processing policy method. Must be implemented by event processing applications. When processing the given domain event, event processing applications must use the save() method of the given process event object (instead of the application’s save() method) to collect pending events from changed aggregates, so that the new domain events will be recorded atomically with tracking information about the position of the given domain event’s notification.

record(process_event: eventsourcing.system.ProcessEvent) → None[source]

Records given process event in the application’s process recorder.

class eventsourcing.system.Promptable[source]

Bases: abc.ABC

Abstract base class for “promptable” objects.

receive_prompt(leader_name: str) → None[source]

Receives the name of leader that has new domain event notifications.

class eventsourcing.system.Leader[source]

Bases: eventsourcing.application.Application

Extends the Application class by also being responsible for keeping track of followers, and prompting followers when there are new domain event notifications to be pulled and processed.

__init__() → None[source]

Initialises an application with an InfrastructureFactory, a Mapper, an ApplicationRecorder, an EventStore, a Repository, and a LocalNotificationLog.

lead(follower: eventsourcing.system.Promptable) → None[source]

Adds given follower to a list of followers.

notify(new_events: List[eventsourcing.domain.AggregateEvent]) → None[source]

Extends the application notify() method by calling prompt_followers() whenever new events have just been saved.

prompt_followers() → None[source]

Prompts followers by calling their receive_prompt() methods with the name of the application.

class eventsourcing.system.ProcessApplication[source]

Bases: eventsourcing.system.Leader, eventsourcing.system.Follower, abc.ABC

Base class for event processing applications that are both “leaders” and followers”.

class eventsourcing.system.System(pipes: Iterable[Iterable[Type[eventsourcing.application.Application]]])[source]

Bases: object

Defines a system of applications.

__init__(pipes: Iterable[Iterable[Type[eventsourcing.application.Application]]])[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.system.Runner(system: eventsourcing.system.System)[source]

Bases: abc.ABC

Abstract base class for system runners.

__init__(system: eventsourcing.system.System)[source]

Initialize self. See help(type(self)) for accurate signature.

start() → None[source]

Starts the runner.

stop() → None[source]

Stops the runner.

get(cls: Type[A]) → A[source]

Returns an application instance for given application class.

exception eventsourcing.system.RunnerAlreadyStarted[source]

Bases: Exception

Raised when runner is already started.

class eventsourcing.system.SingleThreadedRunner(system: eventsourcing.system.System)[source]

Bases: eventsourcing.system.Runner, eventsourcing.system.Promptable

Runs a System in a single thread. A single threaded runner is a runner, and so implements the start(), stop(), and get() methods. A single threaded runner is also a Promptable object, and implements the receive_prompt() method by collecting prompted names.

__init__(system: eventsourcing.system.System)[source]

Initialises runner with the given System.

start() → None[source]

Starts the runner. The applications are constructed, and setup to lead and follow each other, according to the system definition. The followers are setup to follow the applications they follow (have a notification log reader with the notification log of the leader), and their leaders are setup to lead the runner itself (send prompts).

receive_prompt(leader_name: str) → None[source]

Receives prompt by appending name of leader to list of prompted names. Unless this method has previously been called but not yet returned, it will then proceed to forward the prompts received to its application by calling the application’s pull_and_process() method for each prompted name.

stop() → None[source]

Stops the runner.

get(cls: Type[A]) → A[source]

Returns an application instance for given application class.

class eventsourcing.system.MultiThreadedRunner(system: eventsourcing.system.System)[source]

Bases: eventsourcing.system.Runner

Runs a System with a MultiThreadedRunnerThread for each follower in the system definition. It is a runner, and so implements the start(), stop(), and get() methods.

__init__(system: eventsourcing.system.System)[source]

Initialises runner with the given System.

start() → None[source]

Starts the runner.

A multi-threaded runner thread is started for each ‘follower’ application in the system, and constructs an instance of each non-follower leader application in the system. The followers are then setup to follow the applications they follow (have a notification log reader with the notification log of the leader), and their leaders are setup to lead the follower’s thead (send prompts).

stop() → None[source]

Stops the runner.

get(cls: Type[A]) → A[source]

Returns an application instance for given application class.

class eventsourcing.system.MultiThreadedRunnerThread(app_class: Type[eventsourcing.system.Follower], is_stopping: threading.Event)[source]

Bases: eventsourcing.system.Promptable, threading.Thread

Runs one process application for a MultiThreadedRunner.

A multi-threaded runner thread is a Promptable object, and implements the receive_prompt() method by collecting prompted names and setting its threading event ‘is_prompted’.

A multi-threaded runner thread is a Python threading.Thread object, and implements the thread’s run() method by waiting until the ‘is_prompted’ event has been set and then calling its process application’s pull_and_process() method once for each prompted name. It is expected that the process application will have been set up by the runner with a notification log reader from which event notifications will be pulled.

__init__(app_class: Type[eventsourcing.system.Follower], is_stopping: threading.Event)[source]

Initialize self. See help(type(self)) for accurate signature.

run() → None[source]

Begins by constructing an application instance from given application class and then loops forever until stopped. The loop blocks on waiting for the ‘is_prompted’ event to be set, then forwards the prompts already received to its application by calling the application’s pull_and_process() method for each prompted name.

receive_prompt(leader_name: str) → None[source]

Receives prompt by appending name of leader to list of prompted names.

class eventsourcing.system.NotificationLogReader(notification_log: eventsourcing.application.NotificationLog, section_size: int = 10)[source]

Bases: object

Reads domain event notifications from a notification log.

__init__(notification_log: eventsourcing.application.NotificationLog, section_size: int = 10)[source]

Initialises a reader with the given notification log, and optionally a section size integer which determines the requested number of domain event notifications in each section retrieved from the notification log.

read(*, start: int) → Iterator[eventsourcing.persistence.Notification][source]

Returns a generator that yields event notifications from the reader’s notification log, starting from given start position (a notification ID).

This method traverses the linked list of sections presented by a notification log, and yields the individual event notifications that are contained in each section. When all the event notifications from a section have been yielded, the reader will retrieve the next section, and continue yielding event notification until all subsequent event notifications in the notification log from the start position have been yielded.