import logging
from typing import Dict
from thespian.actors import Actor, ActorExitRequest, ActorSystem
from eventsourcing.application.notificationlog import RecordManagerNotificationLog
from eventsourcing.application.process import (
ProcessApplication,
)
from eventsourcing.application.simple import is_prompt_to_pull, PromptToPull
from eventsourcing.system.definition import System, AbstractSystemRunner
from eventsourcing.domain.model.events import subscribe, unsubscribe
from eventsourcing.exceptions import RecordConflictError
from eventsourcing.infrastructure.base import DEFAULT_PIPELINE_ID
logger = logging.getLogger()
# Todo: Send timer message to run slave every so often (in master or slave?).
DEFAULT_THESPIAN_LOGCFG = {
"version": 1,
"formatters": {"normal": {"format": "%(levelname)-8s %(message)s"}},
"handlers": {
# 'h': {
# 'class': 'logging.FileHandler',
# 'filename': 'hello.log',
# 'formatter': 'normal',
# 'level': logging.INFO
# }
},
"loggers": {
# '': {'handlers': ['h'], 'level': logging.DEBUG}
},
}
def start_thespian_system(system_base=None, logcfg=DEFAULT_THESPIAN_LOGCFG):
ActorSystem(systemBase=system_base, logDefs=logcfg)
def shutdown_thespian_system():
ActorSystem().shutdown()
def start_multiproc_tcp_base_system():
start_thespian_system(system_base="multiprocTCPBase")
# def start_multiproc_udp_base_system():
# start_thespian_system(system_base='multiprocUDPBase')
#
#
# def start_multiproc_queue_base_system():
# start_thespian_system(system_base='multiprocQueueBase')
[docs]class ThespianRunner(AbstractSystemRunner):
"""
Uses actor model framework to run a system of process applications.
"""
[docs] def __init__(
self,
system: System,
pipeline_ids=(DEFAULT_PIPELINE_ID,),
system_actor_name="system",
shutdown_on_close=False,
**kwargs
):
super(ThespianRunner, self).__init__(system=system, **kwargs)
self.pipeline_ids = list(pipeline_ids)
self.pipeline_actors: Dict[int, PipelineActor] = {}
self.system_actor_name = system_actor_name
# Create the system actor (singleton).
self.system_actor = self.actor_system.createActor(
actorClass=SystemActor, globalName=self.system_actor_name
)
self.shutdown_on_close = shutdown_on_close
@property
def actor_system(self):
return ActorSystem()
[docs] def start(self):
"""
Starts all the actors to run a system of process applications.
"""
# Subscribe to broadcast prompts published by a process
# application in the parent operating system process.
subscribe(handler=self.forward_prompt, predicate=is_prompt_to_pull)
# Initialise the system actor.
msg = SystemInitRequest(
self.system.process_classes,
self.infrastructure_class,
self.system.upstream_names,
self.pipeline_ids,
)
response = self.actor_system.ask(self.system_actor, msg)
# Keep the pipeline actor addresses, to send prompts directly.
assert isinstance(response, SystemInitResponse), type(response)
assert list(response.pipeline_actors.keys()) == self.pipeline_ids, (
"Configured pipeline IDs mismatch initialised system {} {}"
).format(list(self.pipeline_actors.keys()), self.pipeline_ids)
self.pipeline_actors = response.pipeline_actors
# Todo: Somehow know when to get a new address from the system actor.
# Todo: Command and response messages to system actor to get new pipeline
# address.
def forward_prompt(self, prompt: PromptToPull):
if prompt.pipeline_id in self.pipeline_actors:
pipeline_actor = self.pipeline_actors[prompt.pipeline_id]
self.actor_system.tell(pipeline_actor, prompt)
# else:
# msg = "Pipeline {} is not running.".format(prompt.pipeline_id)
# raise ValueError(msg)
[docs] def close(self):
"""Stops all the actors running a system of process applications."""
super(ThespianRunner, self).close()
unsubscribe(handler=self.forward_prompt, predicate=is_prompt_to_pull)
if self.shutdown_on_close:
self.shutdown()
def shutdown(self):
msg = ActorExitRequest(recursive=True)
self.actor_system.tell(self.system_actor, msg)
[docs]class SystemActor(Actor):
[docs] def __init__(self):
super(SystemActor, self).__init__()
self.pipeline_actors = {}
self.is_initialised = False
[docs] def receiveMessage(self, msg, sender):
if isinstance(msg, SystemInitRequest):
if not self.is_initialised:
self.init_pipelines(msg)
self.is_initialised = True
msg = SystemInitResponse(self.pipeline_actors.copy())
self.send(sender, msg)
def init_pipelines(self, msg):
self.process_classes = msg.process_classes
self.infrastructure_class = msg.infrastructure_class
self.system_followings = msg.system_followings
for pipeline_id in msg.pipeline_ids:
pipeline_actor = self.createActor(PipelineActor)
self.pipeline_actors[pipeline_id] = pipeline_actor
msg = PipelineInitRequest(
self.process_classes,
self.infrastructure_class,
self.system_followings,
pipeline_id,
)
self.send(pipeline_actor, msg)
[docs]class PipelineActor(Actor):
[docs] def __init__(self):
super(PipelineActor, self).__init__()
self.system = None
self.process_actors = {}
self.pipeline_id = None
[docs] def receiveMessage(self, msg, sender):
if isinstance(msg, PipelineInitRequest):
# logger.info("pipeline received init: {}".format(msg))
self.init_pipeline(msg)
elif isinstance(msg, PromptToPull):
# logger.info("pipeline received prompt: {}".format(msg))
self.forward_prompt(msg)
def init_pipeline(self, msg):
self.pipeline_id = msg.pipeline_id
self.process_classes = msg.process_classes
self.infrastructure_class = msg.infrastructure_class
self.system_followings = msg.system_followings
self.followers = {}
for process_class_name, upstream_class_names in self.system_followings.items():
for upstream_class_name in upstream_class_names:
process_name = upstream_class_name.lower()
if process_name not in self.followers:
self.followers[process_name] = []
downstream_class_names = self.followers[process_name]
if process_class_name not in downstream_class_names:
downstream_class_names.append(process_class_name)
process_class_names = self.system_followings.keys()
for process_class_name in process_class_names:
process_actor = self.createActor(ProcessMaster)
process_name = process_class_name.lower()
self.process_actors[process_name] = process_actor
for process_class_name in process_class_names:
process_name = process_class_name.lower()
upstream_application_names = [
c.lower() for c in self.system_followings[process_class_name]
]
downstream_actors = {}
for downstream_class_name in self.followers[process_name]:
downstream_name = downstream_class_name.lower()
# logger.warning("sending prompt to process application {}".format(
# downstream_name))
process_actor = self.process_actors[downstream_name]
downstream_actors[downstream_name] = process_actor
process_class = self.process_classes[process_class_name]
msg = ProcessInitRequest(
process_class,
self.infrastructure_class,
self.pipeline_id,
upstream_application_names,
downstream_actors,
self.myAddress,
)
self.send(self.process_actors[process_name], msg)
def forward_prompt(self, msg):
for downstream_class_name in self.followers[msg.process_name]:
downstream_name = downstream_class_name.lower()
process_actor = self.process_actors[downstream_name]
self.send(process_actor, msg)
[docs]class ProcessMaster(Actor):
[docs] def __init__(self):
super(ProcessMaster, self).__init__()
self.is_slave_running = False
self.last_prompts = {}
self.slave_actor = None
[docs] def receiveMessage(self, msg, sender):
if isinstance(msg, ProcessInitRequest):
self.init_process(msg)
elif isinstance(msg, PromptToPull):
# logger.warning("{} master received prompt: {}".format(
# self.process_application_class.__name__, msg))
self.consume_prompt(prompt=msg)
elif isinstance(msg, SlaveRunResponse):
# logger.info("process application master received slave finished run: {
# }".format(msg))
self.handle_slave_run_response()
def init_process(self, msg):
self.process_application_class = msg.process_application_class
self.infrastructure_class = msg.infrastructure_class
self.slave_actor = self.createActor(ProcessSlave)
self.send(self.slave_actor, msg)
self.run_slave()
def consume_prompt(self, prompt: PromptToPull):
self.last_prompts[prompt.process_name] = prompt
self.run_slave()
def handle_slave_run_response(self):
self.is_slave_running = False
if self.last_prompts:
self.run_slave()
def run_slave(self):
# Don't send to slave if we think it's running, or we'll
# probably get blocked while sending the message and have
# to wait until the slave runs its loop (thespian design).
if self.slave_actor and not self.is_slave_running:
self.send(
self.slave_actor, SlaveRunRequest(self.last_prompts, self.myAddress)
)
self.is_slave_running = True
self.last_prompts = {}
[docs]class ProcessSlave(Actor):
[docs] def __init__(self):
super(ProcessSlave, self).__init__()
self.process = None
[docs] def receiveMessage(self, msg, sender):
if isinstance(msg, ProcessInitRequest):
# logger.info("process application slave received init: {}".format(msg))
self.init_process(msg)
elif isinstance(msg, SlaveRunRequest):
# logger.info("{} process application slave received last prompts: {
# }".format(self.process.name, msg))
self.run_process(msg)
elif isinstance(msg, ActorExitRequest):
# logger.info("{} process application slave received exit request: {
# }".format(self.process.name, msg))
self.close()
def init_process(self, msg):
self.pipeline_actor = msg.pipeline_actor
self.downstream_actors = msg.downstream_actors
self.pipeline_id = msg.pipeline_id
self.upstream_application_names = msg.upstream_application_names
# Construct the process application class.
process_class = msg.process_application_class
if msg.infrastructure_class:
process_class = process_class.mixin(msg.infrastructure_class)
# Reset the database connection (for Django).
process_class.reset_connection_after_forking()
# Construct the process application.
self.process = process_class(pipeline_id=self.pipeline_id)
assert isinstance(self.process, ProcessApplication)
# Subscribe the slave actor's send_prompt() method.
# - the process application will call publish_prompt()
# and the actor will receive the prompt and send it
# as a message.
subscribe(predicate=self.is_my_prompt, handler=self.send_prompt)
# Close the process application persistence policy.
# - slave actor process application doesn't publish
# events, so we don't need this
self.process.persistence_policy.close()
# Unsubscribe process application's publish_prompt().
# - slave actor process application doesn't publish
# events, so we don't need this
unsubscribe(
predicate=self.process.persistence_policy.is_event,
handler=self.process.publish_prompt_for_events,
)
# Construct and follow upstream notification logs.
for upstream_application_name in self.upstream_application_names:
record_manager = self.process.event_store.record_manager
# assert isinstance(record_manager, ACIDRecordManager), type(record_manager)
notification_log = RecordManagerNotificationLog(
record_manager=record_manager.clone(
application_name=upstream_application_name,
pipeline_id=self.pipeline_id,
),
section_size=self.process.notification_log_section_size,
)
self.process.follow(upstream_application_name, notification_log)
def run_process(self, msg):
notification_count = 0
# Just process one notification so prompts are dispatched promptly, sent
# messages only dispatched from actor after receive_message() returns.
advance_by = 1
try:
if msg.last_prompts:
for prompt in msg.last_prompts.values():
notification_count += self.process.run(
prompt, advance_by=advance_by
)
else:
notification_count += self.process.run(advance_by=advance_by)
except RecordConflictError:
# Run again.
self.send(
self.myAddress, SlaveRunRequest(last_prompts={}, master=msg.master)
)
else:
if notification_count:
# Run again, until nothing was done.
self.send(
self.myAddress, SlaveRunRequest(last_prompts={}, master=msg.master)
)
else:
# Report back to master.
self.send(msg.master, SlaveRunResponse())
def close(self):
unsubscribe(predicate=self.is_my_prompt, handler=self.send_prompt)
self.process.close()
def is_my_prompt(self, prompt):
return (
isinstance(prompt, PromptToPull)
and prompt.process_name == self.process.name
and prompt.pipeline_id == self.pipeline_id
)
def send_prompt(self, prompt):
for downstream_name, downstream_actor in self.downstream_actors.items():
self.send(downstream_actor, prompt)
class SystemInitRequest(object):
def __init__(
self, process_classes, infrastructure_class, system_followings, pipeline_ids
):
self.process_classes = process_classes
self.infrastructure_class = infrastructure_class
self.system_followings = system_followings
self.pipeline_ids = pipeline_ids
class SystemInitResponse(object):
def __init__(self, pipeline_actors):
self.pipeline_actors = pipeline_actors
class PipelineInitRequest(object):
def __init__(
self, process_classes, infrastructure_class, system_followings, pipeline_id
):
self.process_classes = process_classes
self.infrastructure_class = infrastructure_class
self.system_followings = system_followings
self.pipeline_id = pipeline_id
class ProcessInitRequest(object):
def __init__(
self,
process_application_class,
infrastructure_class,
pipeline_id,
upstream_application_names,
downstream_actors,
pipeline_actor,
):
self.process_application_class = process_application_class
self.infrastructure_class = infrastructure_class
self.pipeline_id = pipeline_id
self.upstream_application_names = upstream_application_names
self.downstream_actors = downstream_actors
self.pipeline_actor = pipeline_actor
class SlaveRunRequest(object):
def __init__(self, last_prompts, master):
self.last_prompts = last_prompts
self.master = master
class SlaveRunResponse(object):
pass