import datetime
import os
import traceback
from inspect import ismethod
from queue import Empty, Queue
from threading import Event, Lock, Thread
from time import sleep
from typing import Dict, Optional, Tuple, Type
import ray
from eventsourcing.application.process import ProcessApplication
from eventsourcing.application.simple import (
ApplicationWithConcreteInfrastructure,
Prompt,
PromptToPull,
is_prompt_to_pull,
)
from eventsourcing.domain.model.decorators import retry
from eventsourcing.domain.model.events import subscribe, unsubscribe
from eventsourcing.exceptions import (
EventSourcingError,
ExceptionWrapper,
OperationalError,
ProgrammingError,
RecordConflictError,
)
from eventsourcing.infrastructure.base import (
DEFAULT_PIPELINE_ID,
RecordManagerWithNotifications,
)
from eventsourcing.system.definition import (
AbstractSystemRunner,
System,
TProcessApplication,
)
from eventsourcing.system.rayhelpers import RayDbJob, RayPrompt
from eventsourcing.system.raysettings import ray_init_kwargs
from eventsourcing.system.runner import DEFAULT_POLL_INTERVAL
ray.init(**ray_init_kwargs)
MAX_QUEUE_SIZE = 1
PAGE_SIZE = 20
MICROSLEEP = 0.000
PROMPT_WITH_NOTIFICATION_IDS = False
PROMPT_WITH_NOTIFICATION_OBJS = False
GREEDY_PULL_NOTIFICATIONS = True
[docs]class RayRunner(AbstractSystemRunner):
"""
Uses actor model framework to run a system of process applications.
"""
[docs] def __init__(
self,
system: System,
pipeline_ids=(DEFAULT_PIPELINE_ID,),
poll_interval: Optional[int] = None,
setup_tables: bool = False,
sleep_for_setup_tables: int = 0,
db_uri: Optional[str] = None,
**kwargs
):
super(RayRunner, self).__init__(system=system, **kwargs)
self.pipeline_ids = list(pipeline_ids)
self.poll_interval = poll_interval
self.setup_tables = setup_tables or system.setup_tables
self.sleep_for_setup_tables = sleep_for_setup_tables
self.db_uri = db_uri
self.ray_processes: Dict[Tuple[str, int], RayProcess] = {}
[docs] def start(self):
"""
Starts all the actors to run a system of process applications.
"""
# Check we have the infrastructure classes we need.
for process_class in self.system.process_classes.values():
if not isinstance(process_class, ApplicationWithConcreteInfrastructure):
if not self.infrastructure_class:
raise ProgrammingError("infrastructure_class is not set")
elif not issubclass(
self.infrastructure_class, ApplicationWithConcreteInfrastructure
):
raise ProgrammingError(
"infrastructure_class is not a subclass of {}".format(
ApplicationWithConcreteInfrastructure
)
)
# Get the DB_URI.
# Todo: Support different URI for different application classes.
env_vars = {}
db_uri = self.db_uri or os.environ.get("DB_URI")
if db_uri is not None:
env_vars["DB_URI"] = db_uri
# Start processes.
for pipeline_id in self.pipeline_ids:
for process_name, process_class in self.system.process_classes.items():
ray_process_id = RayProcess.remote(
application_process_class=process_class,
infrastructure_class=self.infrastructure_class,
env_vars=env_vars,
poll_interval=self.poll_interval,
pipeline_id=pipeline_id,
setup_tables=self.setup_tables,
)
self.ray_processes[(process_name, pipeline_id)] = ray_process_id
init_ids = []
for key, ray_process in self.ray_processes.items():
process_name, pipeline_id = key
upstream_names = self.system.upstream_names[process_name]
downstream_names = self.system.downstream_names[process_name]
downstream_processes = {
name: self.ray_processes[(name, pipeline_id)]
for name in downstream_names
}
upstream_processes = {}
for upstream_name in upstream_names:
upstream_process = self.ray_processes[(upstream_name, pipeline_id)]
upstream_processes[upstream_name] = upstream_process
init_ids.append(
ray_process.init.remote(upstream_processes, downstream_processes)
)
ray.get(init_ids)
def get_ray_process(self, process_name, pipeline_id=DEFAULT_PIPELINE_ID):
assert isinstance(process_name, str)
return self.ray_processes[(process_name, pipeline_id)]
[docs] def close(self):
super(RayRunner, self).close()
for process in self.ray_processes.values():
process.stop.remote()
def get(
self, process_class: Type[TProcessApplication], pipeline_id=DEFAULT_PIPELINE_ID
) -> TProcessApplication:
assert issubclass(process_class, ProcessApplication)
process_name = process_class.create_name()
ray_process = self.get_ray_process(process_name, pipeline_id)
return ProxyApplication(ray_process)
@ray.remote
class RayProcess:
def __init__(
self,
application_process_class: Type[ProcessApplication],
infrastructure_class: Type[ApplicationWithConcreteInfrastructure],
env_vars: dict = None,
pipeline_id: int = DEFAULT_PIPELINE_ID,
poll_interval: int = None,
setup_tables: bool = False,
):
# Process application args.
self.application_process_class = application_process_class
self.infrastructure_class = infrastructure_class
self.daemon = True
self.pipeline_id = pipeline_id
self.poll_interval = poll_interval or DEFAULT_POLL_INTERVAL
self.setup_tables = setup_tables
if env_vars is not None:
os.environ.update(env_vars)
# Setup threads, queues, and threading events.
self.readers_lock = Lock()
self._has_been_prompted = Event()
self.heads_lock = Lock()
self.heads = {}
self.positions_lock = Lock()
self.positions = {}
self.positions_initialised = Event()
self.db_jobs_queue = Queue(maxsize=MAX_QUEUE_SIZE)
self.upstream_event_queue = Queue(maxsize=MAX_QUEUE_SIZE)
self.downstream_prompt_queue = Queue() # no maxsize, call() can put prompt
self.has_been_stopped = Event()
self.db_jobs_thread = Thread(target=self.db_jobs)
self.db_jobs_thread.setDaemon(True)
self.db_jobs_thread.start()
self.process_prompts_thread = Thread(target=self._process_prompts)
self.process_prompts_thread.setDaemon(True)
self.process_prompts_thread.start()
self.process_events_thread = Thread(target=self._process_events)
self.process_events_thread.setDaemon(True)
self.process_events_thread.start()
self.push_prompts_thread = Thread(target=self._push_prompts)
self.push_prompts_thread.setDaemon(True)
self.push_prompts_thread.start()
self._notification_rayids = {}
self._prompted_notifications = {}
def db_jobs(self):
# print("Running do_jobs")
while not self.has_been_stopped.is_set():
try:
item = self.db_jobs_queue.get(timeout=1)
self.db_jobs_queue.task_done()
except Empty:
if self.has_been_stopped.is_set():
break
else:
if item is None or self.has_been_stopped.is_set():
break
db_job: RayDbJob = item
# self.print_timecheck("Doing db job", item)
try:
db_job.execute()
except Exception as e:
if db_job.error is None:
print(traceback.format_exc())
self._print_timecheck(
"Continuing after error running DB job:", e
)
sleep(1)
# else:
# self.print_timecheck("Done db job", item)
@retry((OperationalError, RecordConflictError), max_attempts=100, wait=0.01)
def do_db_job(self, method, args, kwargs):
db_job = RayDbJob(method, args=args, kwargs=kwargs)
self.db_jobs_queue.put(db_job)
db_job.wait()
if db_job.error:
raise db_job.error
# self.print_timecheck("db job delay:", db_job.delay)
# self.print_timecheck("db job duration:", db_job.duration)
# self.print_timecheck('db job result:', db_job.result)
return db_job.result
def init(self, upstream_processes: dict, downstream_processes: dict) -> None:
"""
Initialise with actor handles for upstream and downstream processes.
Need to initialise after construction so that all handles exist.
"""
self.upstream_processes = upstream_processes
self.downstream_processes = downstream_processes
# Subscribe to broadcast prompts published by the process application.
subscribe(handler=self._enqueue_prompt_to_pull, predicate=is_prompt_to_pull)
# Construct process application object.
process_class = self.application_process_class
if not isinstance(process_class, ApplicationWithConcreteInfrastructure):
if self.infrastructure_class:
process_class = process_class.mixin(self.infrastructure_class)
else:
raise ProgrammingError("infrastructure_class is not set")
class MethodWrapper(object):
def __init__(self, method):
self.method = method
def __call__(self, *args, **kwargs):
try:
return self.method(*args, **kwargs)
except EventSourcingError as e:
return ExceptionWrapper(e)
class ProcessApplicationWrapper(object):
def __init__(self, process_application):
self.process_application = process_application
def __getattr__(self, item):
attribute = getattr(self.process_application, item)
if ismethod(attribute):
return MethodWrapper(attribute)
else:
return attribute
def construct_process():
return process_class(
pipeline_id=self.pipeline_id, setup_table=self.setup_tables
)
process_application = self.do_db_job(construct_process, (), {})
assert isinstance(process_application, ProcessApplication), process_application
self.process_wrapper = ProcessApplicationWrapper(process_application)
self.process_application = process_application
for upstream_name, ray_notification_log in self.upstream_processes.items():
# Make the process follow the upstream notification log.
self.process_application.follow(upstream_name, ray_notification_log)
self._reset_positions()
self.positions_initialised.set()
def _reset_positions(self):
self.do_db_job(self.__reset_positions, (), {})
def __reset_positions(self):
with self.positions_lock:
for upstream_name in self.upstream_processes:
recorded_position = self.process_application.get_recorded_position(
upstream_name
)
self.positions[upstream_name] = recorded_position
def add_downstream_process(self, downstream_name, ray_process_id):
self.downstream_processes[downstream_name] = ray_process_id
def call(self, method_name, *args, **kwargs):
"""
Method for calling methods on process application object.
"""
assert self.positions_initialised.is_set(), "Please call .init() first"
# print("Calling", method_name, args, kwargs)
if self.process_wrapper:
method = getattr(self.process_wrapper, method_name)
return self.do_db_job(method, args, kwargs)
else:
raise Exception(
"Can't call method '%s' before process exists" % method_name
)
def prompt(self, prompt: RayPrompt) -> None:
assert isinstance(prompt, RayPrompt), "Not a RayPrompt: %s" % prompt
for notification_id, rayid in prompt.notification_ids:
# self._print_timecheck("Received ray notification ID:", notification_id, rayid)
self._notification_rayids[(prompt.process_name, notification_id)] = rayid
latest_head = prompt.head_notification_id
upstream_name = prompt.process_name
if PROMPT_WITH_NOTIFICATION_OBJS:
for notification in prompt.notifications:
self._prompted_notifications[
(upstream_name, notification["id"])
] = notification
if latest_head is not None:
with self.heads_lock:
# Update head from prompt.
if upstream_name in self.heads:
if latest_head > self.heads[upstream_name]:
self.heads[upstream_name] = latest_head
self._has_been_prompted.set()
else:
self.heads[upstream_name] = latest_head
self._has_been_prompted.set()
else:
self._has_been_prompted.set()
def _process_prompts(self) -> None:
# Loop until stop event is set.
self.positions_initialised.wait()
while not self.has_been_stopped.is_set():
try:
self.__process_prompts()
except Exception as e:
if not self.has_been_stopped.is_set():
print(traceback.format_exc())
print("Continuing after error in 'process prompts' thread:", e)
print()
sleep(1)
def __process_prompts(self):
# Wait until prompted.
self._has_been_prompted.wait()
if self.has_been_stopped.is_set():
return
# self.print_timecheck('has been prompted')
current_heads = {}
with self.heads_lock:
self._has_been_prompted.clear()
for upstream_name in self.upstream_processes.keys():
current_head = self.heads.get(upstream_name)
current_heads[upstream_name] = current_head
for upstream_name in self.upstream_processes.keys():
with self.positions_lock:
current_position = self.positions.get(upstream_name)
first_id = current_position + 1 # request the next one
current_head = current_heads[upstream_name]
if current_head is None:
last_id = None
elif current_position < current_head:
if GREEDY_PULL_NOTIFICATIONS:
last_id = first_id + PAGE_SIZE - 1
else:
last_id = min(current_head, first_id + PAGE_SIZE - 1)
else:
# self.print_timecheck(
# "Up to date with", upstream_name, current_position,
# current_head
# )
continue
# last_id = first_id + PAGE_SIZE - 1
# self.print_timecheck(
# "Getting notifications in range:",
# upstream_name,
# "%s -> %s" % (first_id, last_id),
# )
upstream_process = self.upstream_processes[upstream_name]
# Works best without prompted head as last requested,
# because there might be more notifications since.
# Todo: However, limit the number to avoid getting too many, and
# if we got full quota, then get again.
notifications = []
if PROMPT_WITH_NOTIFICATION_IDS or PROMPT_WITH_NOTIFICATION_OBJS:
if last_id is not None:
for notification_id in range(first_id, last_id + 1):
if PROMPT_WITH_NOTIFICATION_IDS:
try:
rayid = self._notification_rayids.pop(
(upstream_name, notification_id)
)
except KeyError:
break
else:
notification = ray.get(rayid)
# self._print_timecheck(
# "Got notification from ray id",
# notification_id,
# rayid,
# notification,
# )
notifications.append(notification)
elif PROMPT_WITH_NOTIFICATION_OBJS:
try:
notification = self._prompted_notifications.pop(
(upstream_name, notification_id)
)
# self._print_timecheck(
# "Got notification from prompted notifications dict",
# notification_id,
# notification,
# )
except KeyError:
break
else:
notifications.append(notification)
first_id += 1
# Pull the ones we don't have.
if last_id is None or first_id <= last_id:
# self._print_timecheck("Pulling notifications", first_id, last_id,
# 'from', upstream_name)
rayid = upstream_process.get_notifications.remote(first_id, last_id)
_notifications = ray.get(rayid)
# self._print_timecheck("Pulled notifications", _notifications)
notifications += _notifications
# self.print_timecheck(
# "Obtained notifications:", len(notifications), 'from',
# upstream_name
# )
if len(notifications):
if len(notifications) == PAGE_SIZE:
# self._print_timecheck("Range limit reached, reprompting...")
self._has_been_prompted.set()
position = notifications[-1]["id"]
with self.positions_lock:
current_position = self.positions[upstream_name]
if current_position is None or position > current_position:
self.positions[upstream_name] = position
queue_item = []
for notification in notifications:
# Check causal dependencies.
self.process_application.check_causal_dependencies(
upstream_name, notification.get("causal_dependencies")
)
# Get domain event from notification.
event = self.process_application.event_from_notification(
notification
)
# self.print_timecheck("obtained event", event)
# Put domain event on the queue, for event processing.
queue_item.append((event, notification["id"], upstream_name))
self.upstream_event_queue.put(queue_item)
sleep(MICROSLEEP)
def get_notifications(self, first_notification_id, last_notification_id):
"""
Returns a list of notifications, with IDs from first_notification_id
to last_notification_id, inclusive. IDs are 1-based sequence.
This is called by the "process prompts" thread of a downstream process.
"""
return self.do_db_job(
self._get_notifications, (first_notification_id, last_notification_id), {}
)
def _get_notifications(self, first_notification_id, last_notification_id):
record_manager = self.process_application.event_store.record_manager
assert isinstance(record_manager, RecordManagerWithNotifications)
start = first_notification_id - 1
stop = last_notification_id
return list(record_manager.get_notifications(start, stop))
def _process_events(self):
while not self.has_been_stopped.is_set():
try:
self.__process_events()
except Exception as e:
print(traceback.format_exc())
print("Continuing after error in 'process events' thread:", e)
sleep(1)
def __process_events(self):
try:
queue_item = self.upstream_event_queue.get() # timeout=5)
self.upstream_event_queue.task_done()
except Empty:
if self.has_been_stopped.is_set():
return
else:
if queue_item is None or self.has_been_stopped.is_set():
return
for (domain_event, notification_id, upstream_name) in queue_item:
# print("Processing upstream event:", (domain_event,
# notification_id, upstream_name))
new_events, new_records = (), ()
while not self.has_been_stopped.is_set():
try:
new_events, new_records = self.do_db_job(
method=self.process_application.process_upstream_event,
args=(domain_event, notification_id, upstream_name),
kwargs={},
)
break
except Exception as e:
print(traceback.format_exc())
self._print_timecheck(
"Retrying to reprocess event after error:", e
)
sleep(1)
# Todo: Forever? What if this is the wrong event?
if self.has_been_stopped.is_set():
return
# if new_events:
# self._print_timecheck("new events", len(new_events), new_events)
notifications = ()
notification_ids = ()
notifiable_events = [e for e in new_events if e.__notifiable__]
if len(notifiable_events):
if PROMPT_WITH_NOTIFICATION_IDS or PROMPT_WITH_NOTIFICATION_OBJS:
manager = self.process_application.event_store.record_manager
assert isinstance(manager, RecordManagerWithNotifications)
notification_id_name = manager.notification_id_name
notifications = []
for record in new_records:
if isinstance(
getattr(record, notification_id_name, None), int
):
notifications.append(
manager.create_notification_from_record(record)
)
if len(notifications):
head_notification_id = notifications[-1]["id"]
if PROMPT_WITH_NOTIFICATION_IDS:
notification_ids = self._put_notifications_in_ray_object_store(
notifications
)
# Clear the notifications, avoid sending with IDs.
notifications = ()
else:
head_notification_id = self._get_max_notification_id()
else:
head_notification_id = self._get_max_notification_id()
prompt = RayPrompt(
self.process_application.name,
self.process_application.pipeline_id,
head_notification_id,
notification_ids,
notifications,
)
# self.print_timecheck(
# "putting prompt on downstream " "prompt queue",
# self.downstream_prompt_queue.qsize(),
# )
self.downstream_prompt_queue.put(prompt)
sleep(MICROSLEEP)
# self.print_timecheck(
# "put prompt on downstream prompt " "queue"
# )
# sleep(0.1)
def _put_notifications_in_ray_object_store(self, notifications):
notification_ids = [(n["id"], ray.put(n)) for n in notifications]
return notification_ids
def _enqueue_prompt_to_pull(self, prompt):
# print("Enqueing locally published prompt:", prompt)
self.downstream_prompt_queue.put(prompt)
sleep(MICROSLEEP)
def _push_prompts(self) -> None:
while not self.has_been_stopped.is_set():
try:
self.__push_prompts()
except Exception as e:
print(traceback.format_exc())
print("Continuing after error in 'push prompts' thread:", e)
sleep(1)
def __push_prompts(self):
try:
item = self.downstream_prompt_queue.get() # timeout=1)
self.downstream_prompt_queue.task_done()
# Todo: Instead, drain the queue and consolidate prompts.
except Empty:
self._print_timecheck(
"timed out getting item from downstream prompt " "queue"
)
if self.has_been_stopped.is_set():
return
else:
# self.print_timecheck("task done on downstream prompt queue")
if item is None or self.has_been_stopped.is_set():
return
elif isinstance(item, PromptToPull):
if item.head_notification_id:
head_notification_id = item.head_notification_id
else:
head_notification_id = self._get_max_notification_id()
prompt = RayPrompt(
self.process_application.name,
self.process_application.pipeline_id,
head_notification_id,
)
else:
prompt = item
# self._print_timecheck('pushing prompt with', prompt.notification_ids)
prompt_response_ids = []
# self.print_timecheck("pushing prompts", prompt)
for downstream_name, ray_process in self.downstream_processes.items():
prompt_response_ids.append(ray_process.prompt.remote(prompt))
if self.has_been_stopped.is_set():
return
# self._print_timecheck("pushed prompt to", downstream_name)
ray.get(prompt_response_ids)
# self._print_timecheck("pushed prompts")
def _get_max_notification_id(self):
"""
Returns the highest notification ID of this process application.
:return:
"""
record_manager = self.process_application.event_store.record_manager
assert isinstance(record_manager, RecordManagerWithNotifications)
max_notification_id = self.do_db_job(
record_manager.get_max_notification_id, (), {}
)
# self.print_timecheck("MAX NOTIFICATION ID in DB:", max_notification_id)
return max_notification_id
def stop(self):
"""
Stops the process.
"""
# print("%s actor stopping %s" % (os.getpid(), datetime.datetime.now()))
self.has_been_stopped.set()
# print("%s actor joining db_jobs_thread %s" % (os.getpid(),
# datetime.datetime.now()))
self.db_jobs_queue.put(None)
self.upstream_event_queue.put(None)
self.downstream_prompt_queue.put(None)
self._has_been_prompted.set()
self.positions_initialised.set()
self.db_jobs_thread.join(timeout=1)
assert not self.db_jobs_thread.is_alive(), (
"DB jobs thread still alive"
)
# print("%s actor joining process_events_thread %s" % (os.getpid(),
# datetime.datetime.now()))
self.process_events_thread.join(timeout=1)
assert not self.process_events_thread.is_alive(), (
"Process events thread still alive"
)
# print("%s actor joining process_prompts_thread %s" % (os.getpid(),
# datetime.datetime.now()))
self.process_prompts_thread.join(timeout=1)
assert not self.process_prompts_thread.is_alive(), (
"Process prompts thread still alive"
)
# print("%s actor joining push_prompts_thread %s" % (os.getpid(),
# datetime.datetime.now()))
self.push_prompts_thread.join(timeout=1)
assert not self.push_prompts_thread.is_alive(), (
"Push prompts thread still alive"
)
self.process_application.close()
unsubscribe(handler=self._enqueue_prompt_to_pull, predicate=is_prompt_to_pull)
# print("%s actor stopped %s" % (os.getpid(), datetime.datetime.now()))
ray.actor.exit_actor()
def _print_timecheck(self, activity, *args):
# pass
process_name = self.application_process_class.__name__.lower()
print(
"Timecheck",
datetime.datetime.now(),
self.pipeline_id,
process_name,
activity,
*args
)
class ProxyApplication:
def __init__(self, ray_process: RayProcess):
self.ray_process: RayProcess = ray_process
def __getattr__(self, item):
return ProxyMethod(self.ray_process, item)
class ProxyMethod:
def __init__(self, ray_process: RayProcess, attribute_name: str):
self.ray_process: RayProcess = ray_process
self.attribute_name = attribute_name
def __call__(self, *args, **kwargs):
ray_id = self.ray_process.call.remote(self.attribute_name, *args, **kwargs)
return_value = ray.get(ray_id)
if isinstance(return_value, ExceptionWrapper):
raise return_value.e
else:
return return_value