from abc import ABC, abstractmethod
from collections import defaultdict
from threading import Event, Lock, Thread
from typing import Dict, Iterable, Iterator, List, Set, Tuple, Type, TypeVar
from eventsourcing.application import Application, NotificationLog, Section
from eventsourcing.domain import Aggregate, AggregateEvent
from eventsourcing.persistence import (
Mapper,
Notification,
ProcessRecorder,
Tracking,
)
from eventsourcing.utils import get_topic, resolve_topic
[docs]class ProcessEvent:
"""
Keeps together a :class:`~eventsourcing.persistence.Tracking`
object, which represents the position of a domain event notification
in the notification log of a particular application, and the
new domain events that result from processing that notification.
"""
[docs] def __init__(self, tracking: Tracking):
"""
Initalises the process event with the given tracking object.
"""
self.tracking = tracking
self.events: List[AggregateEvent] = []
[docs] def save(self, *aggregates: Aggregate) -> None:
"""
Collects pending domain events from the given aggregate.
"""
for aggregate in aggregates:
self.events += aggregate.collect_events()
[docs]class Follower(Application):
"""
Extends the :class:`~eventsourcing.application.Application` class
by using a process recorder as its application recorder, by keeping
track of the applications it is following, and pulling and processing
new domain event notifications through its :func:`policy` method.
"""
[docs] def __init__(self) -> None:
super().__init__()
self.readers: Dict[
str,
Tuple[
NotificationLogReader,
Mapper[AggregateEvent],
],
] = {}
self.recorder: ProcessRecorder
[docs] def construct_recorder(self) -> ProcessRecorder:
"""
Constructs and returns a :class:`~eventsourcing.persistence.ProcessRecorder`
for the application to use as its application recorder.
"""
return self.factory.process_recorder()
[docs] def follow(self, name: str, log: NotificationLog) -> None:
"""
Constructs a notification log reader and a mapper for
the named application, and adds them to its collection
of readers.
"""
assert isinstance(self.recorder, ProcessRecorder)
reader = NotificationLogReader(log)
mapper = self.construct_mapper(name)
self.readers[name] = (reader, mapper)
[docs] def pull_and_process(self, name: str) -> None:
"""
Pulls and processes unseen domain event notifications
from the notification log reader of the names application.
Converts received event notifications to domain
event objects, and then calls the :func:`policy`
with a new :class:`ProcessEvent` object which
contains a :class:`~eventsourcing.persistence.Tracking`
object that keeps track of the name of the application
and the position in its notification log from which the
domain event notification was pulled. The policy will
save aggregates to the process event object, using its
:func:`~ProcessEvent.save` method, which collects pending
domain events using the aggregates'
:func:`~eventsourcing.domain.Aggregate.collect_events`
method, and the process event object will then be recorded
by calling the :func:`record` method.
"""
reader, mapper = self.readers[name]
start = self.recorder.max_tracking_id(name) + 1
for notification in reader.read(start=start):
domain_event = mapper.to_domain_event(notification)
process_event = ProcessEvent(
Tracking(
application_name=name,
notification_id=notification.id,
)
)
self.policy(
domain_event,
process_event,
)
self.record(process_event)
[docs] @abstractmethod
def policy(
self,
domain_event: AggregateEvent,
process_event: ProcessEvent,
) -> None:
"""
Abstract domain event processing policy method. Must be
implemented by event processing applications. When
processing the given domain event, event processing
applications must use the :func:`~ProcessEvent.save`
method of the given process event object (instead of
the application's :func:`~eventsourcing.application.Application.save`
method) to collect pending events from changed aggregates,
so that the new domain events will be recorded atomically
with tracking information about the position of the given
domain event's notification.
"""
[docs] def record(self, process_event: ProcessEvent) -> None:
"""
Records given process event in the application's process recorder.
"""
self.events.put(
**process_event.__dict__,
)
self.notify(process_event.events)
[docs]class Promptable(ABC):
"""
Abstract base class for "promptable" objects.
"""
[docs] @abstractmethod
def receive_prompt(self, leader_name: str) -> None:
"""
Receives the name of leader that has new domain
event notifications.
"""
[docs]class Leader(Application):
"""
Extends the :class:`~eventsourcing.application.Application`
class by also being responsible for keeping track of
followers, and prompting followers when there are new
domain event notifications to be pulled and processed.
"""
[docs] def __init__(self) -> None:
super().__init__()
self.followers: List[Promptable] = []
[docs] def lead(self, follower: Promptable) -> None:
"""
Adds given follower to a list of followers.
"""
self.followers.append(follower)
[docs] def notify(self, new_events: List[AggregateEvent]) -> None:
"""
Extends the application :func:`~eventsourcing.application.Application.notify`
method by calling :func:`prompt_followers` whenever new events have just
been saved.
"""
super().notify(new_events)
if len(new_events):
self.prompt_followers()
[docs] def prompt_followers(self) -> None:
"""
Prompts followers by calling their :func:`~Promptable.receive_prompt`
methods with the name of the application.
"""
name = self.__class__.__name__
for follower in self.followers:
follower.receive_prompt(name)
[docs]class ProcessApplication(Leader, Follower, ABC):
"""
Base class for event processing applications
that are both "leaders" and followers".
"""
[docs]class System:
"""
Defines a system of applications.
"""
[docs] def __init__(
self,
pipes: Iterable[Iterable[Type[Application]]],
):
nodes: Dict[str, Type[Application]] = {}
edges: Set[Tuple[str, str]] = set()
# Build nodes and edges.
for pipe in pipes:
follower_cls = None
for cls in pipe:
nodes[cls.__name__] = cls
if follower_cls is None:
follower_cls = cls
else:
leader_cls = follower_cls
follower_cls = cls
edges.add(
(
leader_cls.__name__,
follower_cls.__name__,
)
)
self.edges = list(edges)
self.nodes: Dict[str, str] = {}
for name in nodes:
topic = get_topic(nodes[name])
self.nodes[name] = topic
# Identify leaders and followers.
self.follows: Dict[str, List[str]] = defaultdict(list)
self.leads: Dict[str, List[str]] = defaultdict(list)
for edge in edges:
self.leads[edge[0]].append(edge[1])
self.follows[edge[1]].append(edge[0])
# Check followers are followers.
for name in self.follows:
if not issubclass(nodes[name], Follower):
raise TypeError("Not a follower class: %s" % nodes[name])
# Check each process is a process application class.
for name in self.processors:
if not issubclass(nodes[name], ProcessApplication):
raise TypeError("Not a process application class: %s" % nodes[name])
@property
def leaders(self) -> Iterable[str]:
return self.leads.keys()
@property
def leaders_only(self) -> Iterable[str]:
for name in self.leads.keys():
if name not in self.follows:
yield name
@property
def followers(self) -> Iterable[str]:
return self.follows.keys()
@property
def processors(self) -> Iterable[str]:
return set(self.leaders).intersection(self.followers)
def get_app_cls(self, name: str) -> Type[Application]:
cls = resolve_topic(self.nodes[name])
assert issubclass(cls, Application)
return cls
def leader_cls(self, name: str) -> Type[Leader]:
cls = self.get_app_cls(name)
if issubclass(cls, Leader):
return cls
else:
cls = type(
cls.__name__,
(Leader, cls),
{},
)
assert issubclass(cls, Leader)
return cls
def follower_cls(self, name: str) -> Type[Follower]:
cls = self.get_app_cls(name)
assert issubclass(cls, Follower)
return cls
A = TypeVar("A")
[docs]class Runner(ABC):
"""
Abstract base class for system runners.
"""
[docs] def __init__(self, system: System):
self.system = system
self.is_started = False
[docs] @abstractmethod
def start(self) -> None:
"""
Starts the runner.
"""
if self.is_started:
raise RunnerAlreadyStarted()
self.is_started = True
[docs] @abstractmethod
def stop(self) -> None:
"""
Stops the runner.
"""
[docs] @abstractmethod
def get(self, cls: Type[A]) -> A:
"""
Returns an application instance for given application class.
"""
[docs]class RunnerAlreadyStarted(Exception):
"""
Raised when runner is already started.
"""
[docs]class SingleThreadedRunner(Runner, Promptable):
"""
Runs a :class:`System` in a single thread.
A single threaded runner is a runner, and so implements the
:func:`start`, :func:`stop`, and :func:`get` methods.
A single threaded runner is also a :class:`Promptable` object, and
implements the :func:`receive_prompt` method by collecting prompted
names.
"""
[docs] def __init__(self, system: System):
"""
Initialises runner with the given :class:`System`.
"""
super().__init__(system)
self.apps: Dict[str, Application] = {}
self.prompts_received: List[str] = []
self.is_prompting = False
[docs] def start(self) -> None:
"""
Starts the runner.
The applications are constructed, and setup to lead and follow
each other, according to the system definition.
The followers are setup to follow the applications they follow
(have a notification log reader with the notification log of the
leader), and their leaders are setup to lead the runner itself
(send prompts).
"""
super().start()
# Construct followers.
for name in self.system.followers:
self.apps[name] = self.system.follower_cls(name)()
# Construct leaders.
for name in self.system.leaders_only:
self.apps[name] = self.system.leader_cls(name)()
# Lead and follow.
for edge in self.system.edges:
leader = self.apps[edge[0]]
follower = self.apps[edge[1]]
assert isinstance(leader, Leader)
assert isinstance(follower, Follower)
leader.lead(self)
follower.follow(leader.__class__.__name__, leader.log)
[docs] def receive_prompt(self, leader_name: str) -> None:
"""
Receives prompt by appending name of
leader to list of prompted names.
Unless this method has previously been called but not
yet returned, it will then proceed to forward the prompts
received to its application by calling the application's
:func:`~Follower.pull_and_process` method for each prompted name.
"""
if leader_name not in self.prompts_received:
self.prompts_received.append(leader_name)
if not self.is_prompting:
self.is_prompting = True
while self.prompts_received:
prompt = self.prompts_received.pop(0)
for name in self.system.leads[prompt]:
follower = self.apps[name]
assert isinstance(follower, Follower)
follower.pull_and_process(prompt)
self.is_prompting = False
[docs] def stop(self) -> None:
self.apps.clear()
[docs] def get(self, cls: Type[A]) -> A:
app = self.apps[cls.__name__]
assert isinstance(app, cls)
return app
[docs]class MultiThreadedRunner(Runner):
"""
Runs a :class:`System` with a :class:`MultiThreadedRunnerThread` for each
follower in the system definition.
It is a runner, and so implements the :func:`start`, :func:`stop`,
and :func:`get` methods.
"""
[docs] def __init__(self, system: System):
"""
Initialises runner with the given :class:`System`.
"""
super().__init__(system)
self.apps: Dict[str, Application] = {}
self.threads: Dict[str, MultiThreadedRunnerThread] = {}
self.is_stopping = Event()
[docs] def start(self) -> None:
"""
Starts the runner.
A multi-threaded runner thread is started for each
'follower' application in the system, and constructs
an instance of each non-follower leader application in
the system. The followers are then setup to follow the
applications they follow (have a notification log reader
with the notification log of the leader), and their leaders
are setup to lead the follower's thead (send prompts).
"""
super().start()
# Construct followers.
for name in self.system.followers:
app_class = self.system.follower_cls(name)
thread = MultiThreadedRunnerThread(
app_class=app_class,
is_stopping=self.is_stopping,
)
self.threads[name] = thread
thread.start()
if (not thread.is_running.wait(timeout=5)) or thread.has_stopped.is_set():
self.stop()
raise Exception(f"Thread for '{app_class.__name__}' failed to start")
self.apps[name] = thread.app
# Construct non-follower leaders.
for name in self.system.leaders_only:
app = self.system.leader_cls(name)()
self.apps[name] = app
# Lead and follow.
for edge in self.system.edges:
leader = self.apps[edge[0]]
follower = self.apps[edge[1]]
assert isinstance(leader, Leader)
assert isinstance(follower, Follower)
follower.follow(leader.__class__.__name__, leader.log)
thread = self.threads[edge[1]]
leader.lead(thread)
[docs] def stop(self) -> None:
self.is_stopping.set()
for thread in self.threads.values():
thread.is_prompted.set()
thread.join()
@property
def has_stopped(self) -> bool:
return all([t.has_stopped.is_set() for t in self.threads.values()])
[docs] def get(self, cls: Type[A]) -> A:
app = self.apps[cls.__name__]
assert isinstance(app, cls)
return app
[docs]class MultiThreadedRunnerThread(Promptable, Thread):
"""
Runs one process application for a
:class:`~eventsourcing.system.MultiThreadedRunner`.
A multi-threaded runner thread is a :class:`~eventsourcing.system.Promptable`
object, and implements the :func:`receive_prompt` method by collecting
prompted names and setting its threading event 'is_prompted'.
A multi-threaded runner thread is a Python :class:`threading.Thread` object,
and implements the thread's :func:`run` method by waiting until the
'is_prompted' event has been set and then calling its process application's
:func:`~eventsourcing.system.Follower.pull_and_process`
method once for each prompted name. It is expected that
the process application will have been set up by the runner
with a notification log reader from which event notifications
will be pulled.
"""
[docs] def __init__(
self,
app_class: Type[Follower],
is_stopping: Event,
):
super().__init__()
self.app_class = app_class
self.is_stopping = is_stopping
self.has_stopped = Event()
self.has_errored = Event()
self.is_prompted = Event()
self.prompted_names: List[str] = []
self.prompted_names_lock = Lock()
self.setDaemon(True)
self.is_running = Event()
[docs] def run(self) -> None:
"""
Begins by constructing an application instance from
given application class and then loops forever until
stopped. The loop blocks on waiting for the 'is_prompted'
event to be set, then forwards the prompts already received
to its application by calling the application's
:func:`~Follower.pull_and_process` method for each prompted name.
"""
try:
self.app: Follower = self.app_class()
except Exception:
self.has_errored.set()
self.has_stopped.set()
raise
finally:
self.is_running.set() # pragma: no cover
# -----------------------^ weird branch coverage thing with Python 3.9
try:
while True:
self.is_prompted.wait()
if self.is_stopping.is_set():
self.has_stopped.set()
break
with self.prompted_names_lock:
prompted_names = self.prompted_names
self.prompted_names = []
self.is_prompted.clear()
for name in prompted_names:
self.app.pull_and_process(name)
except Exception:
self.has_errored.set()
self.has_stopped.set()
self.is_stopping.is_set()
raise
[docs] def receive_prompt(self, leader_name: str) -> None:
"""
Receives prompt by appending name of
leader to list of prompted names.
"""
with self.prompted_names_lock:
if leader_name not in self.prompted_names:
self.prompted_names.append(leader_name)
self.is_prompted.set()
[docs]class NotificationLogReader:
"""
Reads domain event notifications from a notification log.
"""
DEFAULT_SECTION_SIZE = 10
[docs] def __init__(
self,
notification_log: NotificationLog,
section_size: int = DEFAULT_SECTION_SIZE,
):
"""
Initialises a reader with the given notification log,
and optionally a section size integer which determines
the requested number of domain event notifications in
each section retrieved from the notification log.
"""
self.notification_log = notification_log
self.section_size = section_size
[docs] def read(self, *, start: int) -> Iterator[Notification]:
"""
Returns a generator that yields event notifications
from the reader's notification log, starting from
given start position (a notification ID).
This method traverses the linked list of sections presented by
a notification log, and yields the individual event notifications
that are contained in each section. When all the event notifications
from a section have been yielded, the reader will retrieve the next
section, and continue yielding event notification until all subsequent
event notifications in the notification log from the start position
have been yielded.
"""
section_id = "{},{}".format(start, start + self.section_size - 1)
while True:
section: Section = self.notification_log[section_id]
for item in section.items:
# Todo: Reintroduce if supporting
# sections with regular alignment?
# if item.id < start:
# continue
yield item
if section.next_id is None:
break
else:
section_id = section.next_id