import multiprocessing
from multiprocessing import Manager
from queue import Empty
from time import sleep
from eventsourcing.application.notificationlog import RecordManagerNotificationLog
from eventsourcing.application.process import Prompt
from eventsourcing.application.simple import ApplicationWithConcreteInfrastructure
from eventsourcing.application.system import (
DEFAULT_POLL_INTERVAL,
PromptOutbox,
System,
SystemRunner,
)
from eventsourcing.domain.model.decorators import retry
from eventsourcing.domain.model.events import subscribe, unsubscribe
from eventsourcing.exceptions import (
CausalDependencyFailed,
OperationalError,
RecordConflictError,
ProgrammingError,
)
from eventsourcing.infrastructure.base import DEFAULT_PIPELINE_ID
[docs]class MultiprocessRunner(SystemRunner):
[docs] def __init__(
self,
system: System,
pipeline_ids=(DEFAULT_PIPELINE_ID,),
poll_interval=None,
setup_tables=False,
sleep_for_setup_tables=0,
*args,
**kwargs
):
super(MultiprocessRunner, self).__init__(system=system, *args, **kwargs)
self.pipeline_ids = pipeline_ids
self.poll_interval = poll_interval or DEFAULT_POLL_INTERVAL
assert isinstance(system, System)
self.os_processes = None
self.setup_tables = setup_tables or system.setup_tables
self.sleep_for_setup_tables = sleep_for_setup_tables
[docs] def start(self):
assert self.os_processes is None, "Already started"
self.os_processes = []
self.manager = Manager()
self.inboxes = {}
self.outboxes = {}
# Setup queues.
for pipeline_id in self.pipeline_ids:
for process_name, upstream_names in self.system.followings.items():
inbox_id = (pipeline_id, process_name.lower())
if inbox_id not in self.inboxes:
self.inboxes[inbox_id] = self.manager.Queue()
for upstream_class_name in upstream_names:
outbox_id = (pipeline_id, upstream_class_name.lower())
if outbox_id not in self.outboxes:
self.outboxes[outbox_id] = PromptOutbox()
if inbox_id not in self.outboxes[outbox_id].downstream_inboxes:
self.outboxes[outbox_id].downstream_inboxes[
inbox_id
] = self.inboxes[inbox_id]
# Subscribe to broadcast prompts published by a process
# application in the parent operating system process.
subscribe(handler=self.broadcast_prompt, predicate=self.is_prompt)
# Start operating system process.
for pipeline_id in self.pipeline_ids:
for process_name, upstream_names in self.system.followings.items():
process_class = self.system.process_classes[process_name]
inbox = self.inboxes[(pipeline_id, process_name.lower())]
outbox = self.outboxes.get((pipeline_id, process_name.lower()))
os_process = OperatingSystemProcess(
application_process_class=process_class,
infrastructure_class=self.infrastructure_class,
upstream_names=upstream_names,
poll_interval=self.poll_interval,
pipeline_id=pipeline_id,
setup_tables=self.setup_tables,
inbox=inbox,
outbox=outbox,
)
os_process.daemon = True
os_process.start()
self.os_processes.append(os_process)
if self.setup_tables:
# Avoid conflicts when creating tables.
sleep(self.sleep_for_setup_tables)
def broadcast_prompt(self, prompt):
outbox_id = (prompt.pipeline_id, prompt.process_name)
outbox = self.outboxes.get(outbox_id)
if outbox:
outbox.put(prompt)
@staticmethod
def is_prompt(event):
return isinstance(event, Prompt)
[docs] def close(self):
super(MultiprocessRunner, self).close()
unsubscribe(handler=self.broadcast_prompt, predicate=self.is_prompt)
for os_process in self.os_processes:
os_process.inbox.put("QUIT")
for os_process in self.os_processes:
os_process.join(timeout=10)
for os_process in self.os_processes:
os_process.is_alive() and os_process.terminate()
self.os_processes = None
self.manager = None
[docs]class OperatingSystemProcess(multiprocessing.Process):
[docs] def __init__(
self,
application_process_class,
infrastructure_class,
upstream_names,
pipeline_id=DEFAULT_PIPELINE_ID,
poll_interval=DEFAULT_POLL_INTERVAL,
setup_tables=False,
inbox=None,
outbox=None,
*args,
**kwargs
):
super(OperatingSystemProcess, self).__init__(*args, **kwargs)
self.application_process_class = application_process_class
self.infrastructure_class = infrastructure_class
self.upstream_names = upstream_names
self.daemon = True
self.pipeline_id = pipeline_id
self.poll_interval = poll_interval
self.inbox = inbox
self.outbox = outbox
self.setup_tables = setup_tables
[docs] def run(self):
# Construct process application class.
process_class = self.application_process_class
if not isinstance(process_class, ApplicationWithConcreteInfrastructure):
if self.infrastructure_class:
process_class = process_class.mixin(self.infrastructure_class)
else:
raise ProgrammingError("infrastructure_class is not set")
# Construct process application object.
self.process = process_class(
pipeline_id=self.pipeline_id, setup_table=self.setup_tables
)
# Follow upstream notification logs.
for upstream_name in self.upstream_names:
# Obtain a notification log object (local or remote) for the upstream process.
if upstream_name == self.process.name:
# Upstream is this process's application,
# so use own notification log.
notification_log = self.process.notification_log
else:
# For a different application, we need to construct a notification
# log with a record manager that has the upstream application ID.
# Currently assumes all applications are using the same database
# and record manager class. If it wasn't the same database,we would
# to use a remote notification log, and upstream would need to provide
# an API from which we can pull. It's not unreasonable to have a fixed
# number of application processes connecting to the same database.
record_manager = self.process.event_store.record_manager
notification_log = RecordManagerNotificationLog(
record_manager=record_manager.clone(
application_name=upstream_name,
# Todo: Check if setting pipeline_id is necessary (it's the same?).
pipeline_id=self.pipeline_id,
),
section_size=self.process.notification_log_section_size,
)
# Todo: Support upstream partition IDs different from self.pipeline_id?
# Todo: Support combining partitions. Read from different partitions but write to the same partition,
# could be one os process that reads from many logs of the same upstream app, or many processes each
# reading one partition with contention writing to the same partition).
# Todo: Support dividing partitions Read from one but write to many. Maybe one process per
# upstream partition, round-robin to pick partition for write. Or have many processes reading
# with each taking it in turn to skip processing somehow.
# Todo: Dividing partitions would allow a stream to flow at the same rate through slower
# process applications.
# Todo: Support merging results from "replicated state machines" - could have a command
# logging process that takes client commands and presents them in a notification log.
# Then the system could be deployed in different places, running independently, receiving
# the same commands, and running the same processes. The command logging process could
# be accompanied with a result logging process that reads results from replicas as they
# are available. Not sure what to do if replicas return different things. If one replica
# goes down, then it could resume by pulling events from another? Not sure what to do.
# External systems could be modelled as commands.
# Make the process follow the upstream notification log.
self.process.follow(upstream_name, notification_log)
# Subscribe to broadcast prompts published by the process application.
subscribe(handler=self.broadcast_prompt, predicate=self.is_prompt)
try:
self.loop_on_prompts()
finally:
unsubscribe(handler=self.broadcast_prompt, predicate=self.is_prompt)
@retry(CausalDependencyFailed, max_attempts=100, wait=0.1)
def loop_on_prompts(self):
# Run once, in case prompts were missed.
self.run_process()
# Loop on getting prompts.
while True:
try:
# Todo: Make the poll interval gradually increase if there are only timeouts?
item = self.inbox.get(timeout=self.poll_interval)
self.inbox.task_done()
if item == "QUIT":
self.process.close()
break
else:
self.run_process(item)
except Empty:
# Basically, we're polling after a timeout.
self.run_process()
@retry((OperationalError, RecordConflictError), max_attempts=100, wait=0.1)
def run_process(self, prompt=None):
self.process.run(prompt)
def broadcast_prompt(self, prompt):
if self.outbox is not None:
self.outbox.put(prompt)
@staticmethod
def is_prompt(event):
return isinstance(event, Prompt)