system — Event-driven systems

This module shows how event-sourced applications can be combined to make an event driven system.

this page is under development — please check back soon

System of applications

The library’s system class…

from eventsourcing.system import System
from uuid import uuid4

from eventsourcing.domain import Aggregate, event

class Dog(Aggregate):
    @event('Registered')
    def __init__(self, name):
        self.name = name
        self.tricks = []

    @event('TrickAdded')
    def add_trick(self, trick):
        self.tricks.append(trick)

Now let’s define an application…

from eventsourcing.application import Application

class DogSchool(Application):
    def register_dog(self, name):
        dog = Dog(name)
        self.save(dog)
        return dog.id

    def add_trick(self, dog_id, trick):
        dog = self.repository.get(dog_id)
        dog.add_trick(trick)
        self.save(dog)

    def get_dog(self, dog_id):
        dog = self.repository.get(dog_id)
        return {'name': dog.name, 'tricks': tuple(dog.tricks)}

Now let’s define an analytics application…

from uuid import uuid5, NAMESPACE_URL

class Counter(Aggregate):
    def __init__(self, name):
        self.name = name
        self.count = 0

    @classmethod
    def create_id(cls, name):
        return uuid5(NAMESPACE_URL, f'/counters/{name}')

    @event('Incremented')
    def increment(self):
        self.count += 1
from eventsourcing.application import AggregateNotFound
from eventsourcing.system import ProcessApplication
from eventsourcing.dispatch import singledispatchmethod

class Counters(ProcessApplication):
    @singledispatchmethod
    def policy(self, domain_event, process_event):
        """Default policy"""

    @policy.register(Dog.TrickAdded)
    def _(self, domain_event, process_event):
        trick = domain_event.trick
        try:
            counter_id = Counter.create_id(trick)
            counter = self.repository.get(counter_id)
        except AggregateNotFound:
            counter = Counter(trick)
        counter.increment()
        process_event.collect_events(counter)

    def get_count(self, trick):
        counter_id = Counter.create_id(trick)
        try:
            counter = self.repository.get(counter_id)
        except AggregateNotFound:
            return 0
        return counter.count
system = System(pipes=[[DogSchool, Counters]])

Single-threaded runner

from eventsourcing.system import SingleThreadedRunner

runner = SingleThreadedRunner(system)
runner.start()

school = runner.get(DogSchool)
counters = runner.get(Counters)

dog_id1 = school.register_dog('Billy')
dog_id2 = school.register_dog('Milly')
dog_id3 = school.register_dog('Scrappy')

school.add_trick(dog_id1, 'roll over')
school.add_trick(dog_id2, 'roll over')
school.add_trick(dog_id3, 'roll over')

assert counters.get_count('roll over') == 3
assert counters.get_count('fetch ball') == 0
assert counters.get_count('play dead') == 0

school.add_trick(dog_id1, 'fetch ball')
school.add_trick(dog_id2, 'fetch ball')

assert counters.get_count('roll over') == 3
assert counters.get_count('fetch ball') == 2
assert counters.get_count('play dead') == 0

school.add_trick(dog_id1, 'play dead')

assert counters.get_count('roll over') == 3
assert counters.get_count('fetch ball') == 2
assert counters.get_count('play dead') == 1

runner.stop()

Multi-threaded runner

from eventsourcing.system import MultiThreadedRunner

runner = MultiThreadedRunner(system)
runner.start()

school = runner.get(DogSchool)
counters = runner.get(Counters)

dog_id1 = school.register_dog('Billy')
dog_id2 = school.register_dog('Milly')
dog_id3 = school.register_dog('Scrappy')

school.add_trick(dog_id1, 'roll over')
school.add_trick(dog_id2, 'roll over')
school.add_trick(dog_id3, 'roll over')

school.add_trick(dog_id1, 'fetch ball')
school.add_trick(dog_id2, 'fetch ball')

school.add_trick(dog_id1, 'play dead')

from time import sleep

sleep(0.01)

assert counters.get_count('roll over') == 3
assert counters.get_count('fetch ball') == 2
assert counters.get_count('play dead') == 1

runner.stop()

Classes

class eventsourcing.system.Follower(env: Optional[Mapping[str, str]] = None)[source]

Bases: eventsourcing.application.Application

Extends the Application class by using a process recorder as its application recorder, by keeping track of the applications it is following, and pulling and processing new domain event notifications through its policy() method.

__init__(env: Optional[Mapping[str, str]] = None) None[source]

Initialises an application with an InfrastructureFactory, a Mapper, an ApplicationRecorder, an EventStore, a Repository, and a LocalNotificationLog.

construct_recorder() eventsourcing.persistence.ProcessRecorder[source]

Constructs and returns a ProcessRecorder for the application to use as its application recorder.

follow(name: str, log: eventsourcing.application.NotificationLog) None[source]

Constructs a notification log reader and a mapper for the named application, and adds them to its collections of readers and mappers.

pull_and_process(leader_name: str, start: Optional[int] = None, stop: Optional[int] = None) None[source]

Pull and process new domain event notifications.

pull_notifications(leader_name: str, start: int, stop: Optional[int] = None) Iterator[List[eventsourcing.persistence.Notification]][source]

Pulls batches of unseen Notification objects from the notification log reader of the named application.

convert_notifications(leader_name: str, notifications: Iterable[eventsourcing.persistence.Notification]) List[Tuple[eventsourcing.domain.DomainEventProtocol, eventsourcing.persistence.Tracking]][source]

Uses the given Mapper to convert each received Notification object to an AggregateEvent object paired with a Tracking object.

process_event(domain_event: eventsourcing.domain.DomainEventProtocol, tracking: eventsourcing.persistence.Tracking) None[source]

Calls policy() method with the given AggregateEvent and a new ProcessingEvent created from the given Tracking object.

The policy will collect any new aggregate events on the process event object.

After the policy method returns, the process event object will then be recorded by calling record(), which will return new notifications.

After calling take_snapshots(), the new notifications are passed to the notify() method.

abstract policy(domain_event: eventsourcing.domain.DomainEventProtocol, processing_event: eventsourcing.application.ProcessingEvent) None[source]

Abstract domain event processing policy method. Must be implemented by event processing applications. When processing the given domain event, event processing applications must use the collect_events() method of the given process event object (instead of the application’s save() method) to collect pending events from changed aggregates, so that the new domain events will be recorded atomically with tracking information about the position of the given domain event’s notification.

class eventsourcing.system.RecordingEventReceiver[source]

Bases: abc.ABC

Abstract base class for objects that may receive recording events.

abstract receive_recording_event(recording_event: eventsourcing.application.RecordingEvent) None[source]

Receives a recording event.

class eventsourcing.system.Leader(env: Optional[Mapping[str, str]] = None)[source]

Bases: eventsourcing.application.Application

Extends the Application class by also being responsible for keeping track of followers, and prompting followers when there are new domain event notifications to be pulled and processed.

__init__(env: Optional[Mapping[str, str]] = None) None[source]

Initialises an application with an InfrastructureFactory, a Mapper, an ApplicationRecorder, an EventStore, a Repository, and a LocalNotificationLog.

lead(follower: eventsourcing.system.RecordingEventReceiver) None[source]

Adds given follower to a list of followers.

class eventsourcing.system.ProcessApplication(env: Optional[Mapping[str, str]] = None)[source]

Bases: eventsourcing.system.Leader, eventsourcing.system.Follower

Base class for event processing applications that are both “leaders” and followers”.

class eventsourcing.system.System(pipes: Iterable[Iterable[Type[eventsourcing.application.Application]]])[source]

Bases: object

Defines a system of applications.

__init__(pipes: Iterable[Iterable[Type[eventsourcing.application.Application]]])[source]
property topic: Optional[str]

Returns a topic to the system object, if constructed as a module attribute.

class eventsourcing.system.Runner(system: eventsourcing.system.System, env: Optional[Mapping[str, str]] = None)[source]

Bases: abc.ABC

Abstract base class for system runners.

__init__(system: eventsourcing.system.System, env: Optional[Mapping[str, str]] = None)[source]
abstract start() None[source]

Starts the runner.

abstract stop() None[source]

Stops the runner.

abstract get(cls: Type[eventsourcing.application.TApplication]) eventsourcing.application.TApplication[source]

Returns an application instance for given application class.

exception eventsourcing.system.RunnerAlreadyStarted[source]

Bases: Exception

Raised when runner is already started.

exception eventsourcing.system.NotificationPullingError[source]

Bases: Exception

Raised when pulling notifications fails.

exception eventsourcing.system.NotificationConvertingError[source]

Bases: Exception

Raised when converting notifications fails.

exception eventsourcing.system.EventProcessingError[source]

Bases: Exception

Raised when event processing fails.

class eventsourcing.system.SingleThreadedRunner(system: eventsourcing.system.System, env: Optional[Mapping[str, str]] = None)[source]

Bases: eventsourcing.system.Runner, eventsourcing.system.RecordingEventReceiver

Runs a System in a single thread.

__init__(system: eventsourcing.system.System, env: Optional[Mapping[str, str]] = None)[source]

Initialises runner with the given System.

start() None[source]

Starts the runner. The applications mentioned in the system definition are constructed. The followers are set up to follow the applications they are defined as following in the system definition. And the leaders are set up to lead the runner itself.

receive_recording_event(recording_event: eventsourcing.application.RecordingEvent) None[source]

Receives recording event by appending the name of the leader to a list of prompted names.

Then, unless this method has previously been called and not yet returned, each of the prompted names is resolved to a leader application, and its followers pull and process events from that application. This may lead to further names being added to the list of prompted names. This process continues until there are no more prompted names. In this way, a system of applications will process all events in a single thread.

stop() None[source]

Stops the runner.

get(cls: Type[eventsourcing.application.TApplication]) eventsourcing.application.TApplication[source]

Returns an application instance for given application class.

class eventsourcing.system.NewSingleThreadedRunner(system: eventsourcing.system.System, env: Optional[Mapping[str, str]] = None)[source]

Bases: eventsourcing.system.Runner, eventsourcing.system.RecordingEventReceiver

Runs a System in a single thread.

__init__(system: eventsourcing.system.System, env: Optional[Mapping[str, str]] = None)[source]

Initialises runner with the given System.

start() None[source]

Starts the runner. The applications are constructed, and setup to lead and follow each other, according to the system definition. The followers are setup to follow the applications they follow (have a notification log reader with the notification log of the leader), and their leaders are setup to lead the runner itself (send prompts).

receive_recording_event(recording_event: eventsourcing.application.RecordingEvent) None[source]

Receives recording event by appending it to list of received recording events.

Unless this method has previously been called and not yet returned, it will then attempt to make the followers process all received recording events, until there are none remaining.

stop() None[source]

Stops the runner.

get(cls: Type[eventsourcing.application.TApplication]) eventsourcing.application.TApplication[source]

Returns an application instance for given application class.

class eventsourcing.system.MultiThreadedRunner(system: eventsourcing.system.System, env: Optional[Mapping[str, str]] = None)[source]

Bases: eventsourcing.system.Runner

Runs a System with one MultiThreadedRunnerThread for each Follower in the system definition.

__init__(system: eventsourcing.system.System, env: Optional[Mapping[str, str]] = None)[source]

Initialises runner with the given System.

start() None[source]

Starts the runner. A multi-threaded runner thread is started for each ‘follower’ application in the system, and constructs an instance of each non-follower leader application in the system. The followers are then setup to follow the applications they follow (have a notification log reader with the notification log of the leader), and their leaders are setup to lead the follower’s thead (send prompts).

stop() None[source]

Stops the runner.

get(cls: Type[eventsourcing.application.TApplication]) eventsourcing.application.TApplication[source]

Returns an application instance for given application class.

class eventsourcing.system.MultiThreadedRunnerThread(follower: eventsourcing.system.Follower, has_errored: threading.Event)[source]

Bases: eventsourcing.system.RecordingEventReceiver, threading.Thread

Runs one Follower application in a MultiThreadedRunner.

__init__(follower: eventsourcing.system.Follower, has_errored: threading.Event)[source]
run() None[source]

Loops forever until stopped. The loop blocks on waiting for the ‘is_prompted’ event to be set, then calls pull_and_process() method for each prompted name.

receive_recording_event(recording_event: eventsourcing.application.RecordingEvent) None[source]

Receives prompt by appending name of leader to list of prompted names.

class eventsourcing.system.NewMultiThreadedRunner(system: eventsourcing.system.System, env: Optional[Mapping[str, str]] = None)[source]

Bases: eventsourcing.system.Runner, eventsourcing.system.RecordingEventReceiver

Runs a System with multiple threads in a new way.

__init__(system: eventsourcing.system.System, env: Optional[Mapping[str, str]] = None)[source]

Initialises runner with the given System.

start() None[source]

Starts the runner.

A multi-threaded runner thread is started for each ‘follower’ application in the system, and constructs an instance of each non-follower leader application in the system. The followers are then setup to follow the applications they follow (have a notification log reader with the notification log of the leader), and their leaders are setup to lead the follower’s thead (send prompts).

stop() None[source]

Stops the runner.

get(cls: Type[eventsourcing.application.TApplication]) eventsourcing.application.TApplication[source]

Returns an application instance for given application class.

receive_recording_event(recording_event: eventsourcing.application.RecordingEvent) None[source]

Receives a recording event.

class eventsourcing.system.PullingThread(converting_queue: queue.Queue[Optional[Union[eventsourcing.application.RecordingEvent, List[eventsourcing.persistence.Notification]]]], follower: eventsourcing.system.Follower, leader_name: str, has_errored: threading.Event)[source]

Bases: threading.Thread

Receives or pulls notifications from the given leader, and puts them on a queue for conversion into processing jobs.

__init__(converting_queue: queue.Queue[Optional[Union[eventsourcing.application.RecordingEvent, List[eventsourcing.persistence.Notification]]]], follower: eventsourcing.system.Follower, leader_name: str, has_errored: threading.Event)[source]

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

run() None[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class eventsourcing.system.ConvertingThread(converting_queue: queue.Queue[Optional[Union[eventsourcing.application.RecordingEvent, List[eventsourcing.persistence.Notification]]]], processing_queue: queue.Queue[Optional[List[Tuple[eventsourcing.domain.DomainEventProtocol, eventsourcing.persistence.Tracking]]]], follower: eventsourcing.system.Follower, leader_name: str, has_errored: threading.Event)[source]

Bases: threading.Thread

Converts notifications into processing jobs.

__init__(converting_queue: queue.Queue[Optional[Union[eventsourcing.application.RecordingEvent, List[eventsourcing.persistence.Notification]]]], processing_queue: queue.Queue[Optional[List[Tuple[eventsourcing.domain.DomainEventProtocol, eventsourcing.persistence.Tracking]]]], follower: eventsourcing.system.Follower, leader_name: str, has_errored: threading.Event)[source]

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

run() None[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class eventsourcing.system.ProcessingThread(processing_queue: queue.Queue[Optional[List[Tuple[eventsourcing.domain.DomainEventProtocol, eventsourcing.persistence.Tracking]]]], follower: eventsourcing.system.Follower, has_errored: threading.Event)[source]

Bases: threading.Thread

A processing thread gets events from a processing queue, and calls the application’s process_event() method.

__init__(processing_queue: queue.Queue[Optional[List[Tuple[eventsourcing.domain.DomainEventProtocol, eventsourcing.persistence.Tracking]]]], follower: eventsourcing.system.Follower, has_errored: threading.Event)[source]

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

run() None[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class eventsourcing.system.NotificationLogReader(notification_log: eventsourcing.application.NotificationLog, section_size: int = 10)[source]

Bases: object

Reads domain event notifications from a notification log.

__init__(notification_log: eventsourcing.application.NotificationLog, section_size: int = 10)[source]

Initialises a reader with the given notification log, and optionally a section size integer which determines the requested number of domain event notifications in each section retrieved from the notification log.

read(*, start: int) Iterator[eventsourcing.persistence.Notification][source]

Returns a generator that yields event notifications from the reader’s notification log, starting from given start position (a notification ID).

This method traverses the linked list of sections presented by a notification log, and yields the individual event notifications that are contained in each section. When all the event notifications from a section have been yielded, the reader will retrieve the next section, and continues yielding event notification until all subsequent event notifications in the notification log from the start position have been yielded.

select(*, start: int, stop: Optional[int] = None, topics: Sequence[str] = ()) Iterator[List[eventsourcing.persistence.Notification]][source]

Returns a generator that yields lists of event notifications from the reader’s notification log, starting from given start position (a notification ID).

This method selects a limited list of notifications from a notification log and yields event notifications in batches. When one list of event notifications has been yielded, the reader will retrieve another list, and continue until all subsequent event notifications in the notification log from the start position have been yielded.