system
— Event-driven systems¶
This module shows how event-sourced applications can be combined to make an event driven system.
this page is under development — please check back soon
System of applications¶
The library’s system class…
from eventsourcing.system import System
from dataclasses import dataclass
from uuid import uuid4
from eventsourcing.domain import Aggregate, AggregateCreated, AggregateEvent
class World(Aggregate):
def __init__(self, **kwargs):
super(World, self).__init__(**kwargs)
self.history = []
@classmethod
def create(cls):
return cls._create(
event_class=cls.Created,
id=uuid4(),
)
class Created(AggregateCreated):
pass
def make_it_so(self, what):
self.trigger_event(self.SomethingHappened, what=what)
class SomethingHappened(AggregateEvent):
what: str
def apply(self, world):
world.history.append(self.what)
Now let’s define an application…
from eventsourcing.application import Application
class WorldsApplication(Application):
def create_world(self):
world = World.create()
self.save(world)
return world.id
def make_it_so(self, world_id, what):
world = self.repository.get(world_id)
world.make_it_so(what)
self.save(world)
def get_world_history(self, world_id):
world = self.repository.get(world_id)
return list(world.history)
Now let’s define an analytics application…
from uuid import uuid5, NAMESPACE_URL
class Counter(Aggregate):
def __init__(self, **kwargs):
super(Counter, self).__init__(**kwargs)
self.count = 0
@classmethod
def create_id(cls, name):
return uuid5(NAMESPACE_URL, f'/counters/{name}')
@classmethod
def create(cls, name):
return cls._create(
event_class=cls.Created,
id=cls.create_id(name),
)
class Created(AggregateCreated):
pass
def increment(self):
self.trigger_event(self.Incremented)
class Incremented(AggregateEvent):
def apply(self, counter):
counter.count += 1
from eventsourcing.application import AggregateNotFound
from eventsourcing.system import ProcessApplication
from eventsourcing.dispatch import singledispatchmethod
class Counters(ProcessApplication):
def policy(self, domain_event, process_event):
pass
@singledispatchmethod
def policy(self, domain_event, process_event):
"""Default policy"""
@policy.register(World.SomethingHappened)
def _(self, domain_event, process_event):
what = domain_event.what
counter_id = Counter.create_id(what)
try:
counter = self.repository.get(counter_id)
except AggregateNotFound:
counter = Counter.create(what)
counter.increment()
process_event.save(counter)
def get_count(self, what):
counter_id = Counter.create_id(what)
try:
counter = self.repository.get(counter_id)
except AggregateNotFound:
return 0
return counter.count
system = System(pipes=[[WorldsApplication, Counters]])
Single-threaded runner¶
from eventsourcing.system import SingleThreadedRunner
runner= SingleThreadedRunner(system)
runner.start()
worlds = runner.get(WorldsApplication)
counters = runner.get(Counters)
world_id1 = worlds.create_world()
world_id2 = worlds.create_world()
world_id3 = worlds.create_world()
assert counters.get_count('dinosaurs') == 0
assert counters.get_count('trucks') == 0
assert counters.get_count('internet') == 0
worlds.make_it_so(world_id1, 'dinosaurs')
worlds.make_it_so(world_id2, 'dinosaurs')
worlds.make_it_so(world_id3, 'dinosaurs')
assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 0
assert counters.get_count('internet') == 0
worlds.make_it_so(world_id1, 'trucks')
worlds.make_it_so(world_id2, 'trucks')
assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 2
assert counters.get_count('internet') == 0
worlds.make_it_so(world_id1, 'internet')
assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 2
assert counters.get_count('internet') == 1
Multi-threaded runner¶
from eventsourcing.system import MultiThreadedRunner
runner= MultiThreadedRunner(system)
runner.start()
worlds = runner.get(WorldsApplication)
counters = runner.get(Counters)
world_id1 = worlds.create_world()
world_id2 = worlds.create_world()
world_id3 = worlds.create_world()
worlds.make_it_so(world_id1, 'dinosaurs')
worlds.make_it_so(world_id2, 'dinosaurs')
worlds.make_it_so(world_id3, 'dinosaurs')
worlds.make_it_so(world_id1, 'trucks')
worlds.make_it_so(world_id2, 'trucks')
worlds.make_it_so(world_id1, 'internet')
from time import sleep
sleep(0.01)
assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 2
assert counters.get_count('internet') == 1
…
Classes¶
-
class
eventsourcing.system.
ProcessEvent
(tracking: eventsourcing.persistence.Tracking)[source]¶ Bases:
object
Keeps together a
Tracking
object, which represents the position of a domain event notification in the notification log of a particular application, and the new domain events that result from processing that notification.
-
class
eventsourcing.system.
Follower
[source]¶ Bases:
eventsourcing.application.Application
Extends the
Application
class by using a process recorder as its application recorder, by keeping track of the applications it is following, and pulling and processing new domain event notifications through itspolicy()
method.-
__init__
() → None[source]¶ Initialises an application with an
InfrastructureFactory
, aMapper
, anApplicationRecorder
, anEventStore
, aRepository
, and aLocalNotificationLog
.
-
construct_recorder
() → eventsourcing.persistence.ProcessRecorder[source]¶ Constructs and returns a
ProcessRecorder
for the application to use as its application recorder.
-
follow
(name: str, log: eventsourcing.application.NotificationLog) → None[source]¶ Constructs a notification log reader and a mapper for the named application, and adds them to its collection of readers.
-
pull_and_process
(name: str) → None[source]¶ Pulls and processes unseen domain event notifications from the notification log reader of the names application.
Converts received event notifications to domain event objects, and then calls the
policy()
with a newProcessEvent
object which contains aTracking
object that keeps track of the name of the application and the position in its notification log from which the domain event notification was pulled. The policy will save aggregates to the process event object, using itssave()
method, which collects pending domain events using the aggregates’collect_events()
method, and the process event object will then be recorded by calling therecord()
method.
-
policy
(domain_event: eventsourcing.domain.AggregateEvent, process_event: eventsourcing.system.ProcessEvent) → None[source]¶ Abstract domain event processing policy method. Must be implemented by event processing applications. When processing the given domain event, event processing applications must use the
save()
method of the given process event object (instead of the application’ssave()
method) to collect pending events from changed aggregates, so that the new domain events will be recorded atomically with tracking information about the position of the given domain event’s notification.
-
-
class
eventsourcing.system.
Promptable
[source]¶ Bases:
abc.ABC
Abstract base class for “promptable” objects.
-
class
eventsourcing.system.
Leader
[source]¶ Bases:
eventsourcing.application.Application
Extends the
Application
class by also being responsible for keeping track of followers, and prompting followers when there are new domain event notifications to be pulled and processed.-
__init__
() → None[source]¶ Initialises an application with an
InfrastructureFactory
, aMapper
, anApplicationRecorder
, anEventStore
, aRepository
, and aLocalNotificationLog
.
-
lead
(follower: eventsourcing.system.Promptable) → None[source]¶ Adds given follower to a list of followers.
-
notify
(new_events: List[eventsourcing.domain.AggregateEvent]) → None[source]¶ Extends the application
notify()
method by callingprompt_followers()
whenever new events have just been saved.
-
prompt_followers
() → None[source]¶ Prompts followers by calling their
receive_prompt()
methods with the name of the application.
-
-
class
eventsourcing.system.
ProcessApplication
[source]¶ Bases:
eventsourcing.system.Leader
,eventsourcing.system.Follower
,abc.ABC
Base class for event processing applications that are both “leaders” and followers”.
-
class
eventsourcing.system.
System
(pipes: Iterable[Iterable[Type[eventsourcing.application.Application]]])[source]¶ Bases:
object
Defines a system of applications.
-
class
eventsourcing.system.
Runner
(system: eventsourcing.system.System)[source]¶ Bases:
abc.ABC
Abstract base class for system runners.
-
exception
eventsourcing.system.
RunnerAlreadyStarted
[source]¶ Bases:
Exception
Raised when runner is already started.
-
class
eventsourcing.system.
SingleThreadedRunner
(system: eventsourcing.system.System)[source]¶ Bases:
eventsourcing.system.Runner
,eventsourcing.system.Promptable
Runs a
System
in a single thread. A single threaded runner is a runner, and so implements thestart()
,stop()
, andget()
methods. A single threaded runner is also aPromptable
object, and implements thereceive_prompt()
method by collecting prompted names.-
start
() → None[source]¶ Starts the runner. The applications are constructed, and setup to lead and follow each other, according to the system definition. The followers are setup to follow the applications they follow (have a notification log reader with the notification log of the leader), and their leaders are setup to lead the runner itself (send prompts).
-
receive_prompt
(leader_name: str) → None[source]¶ Receives prompt by appending name of leader to list of prompted names. Unless this method has previously been called but not yet returned, it will then proceed to forward the prompts received to its application by calling the application’s
pull_and_process()
method for each prompted name.
-
-
class
eventsourcing.system.
MultiThreadedRunner
(system: eventsourcing.system.System)[source]¶ Bases:
eventsourcing.system.Runner
Runs a
System
with aMultiThreadedRunnerThread
for each follower in the system definition. It is a runner, and so implements thestart()
,stop()
, andget()
methods.-
start
() → None[source]¶ Starts the runner.
A multi-threaded runner thread is started for each ‘follower’ application in the system, and constructs an instance of each non-follower leader application in the system. The followers are then setup to follow the applications they follow (have a notification log reader with the notification log of the leader), and their leaders are setup to lead the follower’s thead (send prompts).
-
-
class
eventsourcing.system.
MultiThreadedRunnerThread
(app_class: Type[eventsourcing.system.Follower], is_stopping: threading.Event)[source]¶ Bases:
eventsourcing.system.Promptable
,threading.Thread
Runs one process application for a
MultiThreadedRunner
.A multi-threaded runner thread is a
Promptable
object, and implements thereceive_prompt()
method by collecting prompted names and setting its threading event ‘is_prompted’.A multi-threaded runner thread is a Python
threading.Thread
object, and implements the thread’srun()
method by waiting until the ‘is_prompted’ event has been set and then calling its process application’spull_and_process()
method once for each prompted name. It is expected that the process application will have been set up by the runner with a notification log reader from which event notifications will be pulled.-
__init__
(app_class: Type[eventsourcing.system.Follower], is_stopping: threading.Event)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
run
() → None[source]¶ Begins by constructing an application instance from given application class and then loops forever until stopped. The loop blocks on waiting for the ‘is_prompted’ event to be set, then forwards the prompts already received to its application by calling the application’s
pull_and_process()
method for each prompted name.
-
-
class
eventsourcing.system.
NotificationLogReader
(notification_log: eventsourcing.application.NotificationLog, section_size: int = 10)[source]¶ Bases:
object
Reads domain event notifications from a notification log.
-
__init__
(notification_log: eventsourcing.application.NotificationLog, section_size: int = 10)[source]¶ Initialises a reader with the given notification log, and optionally a section size integer which determines the requested number of domain event notifications in each section retrieved from the notification log.
-
read
(*, start: int) → Iterator[eventsourcing.persistence.Notification][source]¶ Returns a generator that yields event notifications from the reader’s notification log, starting from given start position (a notification ID).
This method traverses the linked list of sections presented by a notification log, and yields the individual event notifications that are contained in each section. When all the event notifications from a section have been yielded, the reader will retrieve the next section, and continue yielding event notification until all subsequent event notifications in the notification log from the start position have been yielded.
-