import time
from abc import ABC, abstractmethod
from collections import OrderedDict, defaultdict, deque
from queue import Empty, Queue
from threading import Barrier, BrokenBarrierError, Event, Lock, Thread, Timer
from time import sleep
from eventsourcing.application.popo import PopoApplication
from eventsourcing.application.process import ProcessApplication, Prompt
from eventsourcing.application.simple import ApplicationWithConcreteInfrastructure
from eventsourcing.domain.model.decorators import retry
from eventsourcing.domain.model.events import subscribe, unsubscribe
from eventsourcing.exceptions import CausalDependencyFailed, EventSourcingError, OperationalError, ProgrammingError, \
RecordConflictError
from eventsourcing.interface.notificationlog import NotificationLogReader
DEFAULT_POLL_INTERVAL = 5
[docs]class System(object):
[docs] def __init__(self, *pipeline_exprs, **kwargs):
"""
Initialises a "process network" system object.
:param pipeline_exprs: Pipeline expressions involving process application classes.
Each pipeline expression of process classes shows directly which process
follows which other process in the system.
For example, the pipeline expression (A | B | C) shows that B follows A,
and C follows B.
The pipeline expression (A | A) shows that A follows A.
The pipeline expression (A | B | A) shows that B follows A, and A follows B.
The pipeline expressions ((A | B | A), (A | C | A)) are equivalent to (A | B | A | C | A).
"""
self.pipelines_exprs = pipeline_exprs
self.setup_tables = kwargs.get('setup_tables', False)
self.infrastructure_class = kwargs.get('infrastructure_class', None)
self.session = kwargs.get('session', None)
self.process_classes = OrderedDict()
for pipeline_expr in self.pipelines_exprs:
for process_class in pipeline_expr:
process_name = process_class.__name__.lower()
if process_name not in self.process_classes:
self.process_classes[process_name] = process_class
self.processes = {}
self.is_session_shared = True
# Determine which process follows which.
self.followers = OrderedDict()
# A following is a list of process classes followed by a process class.
# Todo: Factor this out, it's confusing. (Only used in ActorModelRunner now).
self.followings = OrderedDict()
for pipeline_expr in self.pipelines_exprs:
previous_name = None
for process_class in pipeline_expr:
process_name = process_class.__name__.lower()
try:
follows = self.followings[process_name]
except KeyError:
follows = []
self.followings[process_name] = follows
try:
self.followers[process_name]
except KeyError:
self.followers[process_name] = []
if previous_name and previous_name not in follows:
follows.append(previous_name)
followers = self.followers[previous_name]
followers.append(process_name)
previous_name = process_name
[docs] def construct_app(self, process_class, infrastructure_class=None, **kwargs):
kwargs = dict(kwargs)
if 'session' not in kwargs and process_class.is_constructed_with_session:
kwargs['session'] = self.session
infrastructure_class = infrastructure_class or self.infrastructure_class
process = process_class.bind(infrastructure_class, **kwargs)
if process_class.is_constructed_with_session and self.is_session_shared:
if self.session is None:
self.session = process.session
return process
[docs] def is_prompt(self, event):
return isinstance(event, Prompt)
def __enter__(self):
self.__runner = SingleThreadedRunner(self)
self.__runner.__enter__()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.__runner.__exit__(exc_type, exc_val, exc_tb)
del (self.__runner)
def __getattr__(self, process_name, infrastructure_class=None):
if self.processes and process_name in self.processes:
process = self.processes[process_name]
else:
try:
process_class = self.process_classes[process_name]
except KeyError:
raise AttributeError(process_name)
else:
process = self.construct_app(
process_class=process_class,
setup_table=self.setup_tables,
infrastructure_class=infrastructure_class,
)
self.processes[process_name] = process
return process
[docs]class SystemRunner(ABC):
def __init__(self, system: System, infrastructure_class=None, setup_tables=False):
self.system = system
self.infrastructure_class = infrastructure_class or self.system.infrastructure_class
# Check that a concrete infrastructure class is involved.
if not all([issubclass(c, ApplicationWithConcreteInfrastructure)
for c in self.system.process_classes.values()]):
if self.infrastructure_class is None or not issubclass(
self.infrastructure_class, ApplicationWithConcreteInfrastructure
):
raise ProgrammingError("System runner needs a concrete application infrastructure class")
self.setup_tables = setup_tables
self.processes = {}
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs] @abstractmethod
def start(self):
pass
[docs] def close(self):
if self.system.processes:
for process_name, process in self.system.processes.items():
process.close()
self.system.processes.clear()
def __getattr__(self, process_name):
return self.system.__getattr__(process_name, infrastructure_class=self.infrastructure_class)
[docs]class InProcessRunner(SystemRunner):
"""
Runs a system in the current process,
either in the current thread, or with
one thread for each process in the system.
"""
[docs] def start(self):
assert len(self.system.processes) == 0, "Already running"
# Construct the processes.
for process_class in self.system.process_classes.values():
process = self.system.construct_app(
process_class=process_class,
infrastructure_class=self.infrastructure_class,
setup_table=self.setup_tables or self.system.setup_tables,
)
self.system.processes[process.name] = process
# Tell each process about the processes it follows.
for followed_name, followers in self.system.followers.items():
followed = self.system.processes[followed_name]
followed_log = followed.notification_log
for follower_name in followers:
follower = self.system.processes[follower_name]
follower.follow(followed_name, followed_log)
# Do something to propagate prompts.
subscribe(
predicate=self.system.is_prompt,
handler=self.handle_prompt,
)
[docs] @abstractmethod
def handle_prompt(self, prompt):
pass
[docs] def close(self):
super(InProcessRunner, self).close()
unsubscribe(
predicate=self.system.is_prompt,
handler=self.handle_prompt,
)
[docs]class SingleThreadedRunner(InProcessRunner):
"""
Runs a system in the current thread.
"""
def __init__(self, system: System, infrastructure_class=PopoApplication, *args, **kwargs):
super(SingleThreadedRunner, self).__init__(system=system,
infrastructure_class=infrastructure_class, *args, **kwargs
)
self.pending_prompts = Queue()
self.iteration_lock = Lock()
[docs] def handle_prompt(self, prompt):
self.run_followers(prompt)
[docs] def run_followers(self, prompt):
"""
First caller adds a prompt to queue and
runs followers until there are no more
pending prompts.
Subsequent callers just add a prompt
to the queue, avoiding recursion.
"""
assert isinstance(prompt, Prompt)
# Put the prompt on the queue.
self.pending_prompts.put(prompt)
if self.iteration_lock.acquire(False):
start_time = time.time()
i = 0
try:
while True:
try:
prompt = self.pending_prompts.get(False)
except Empty:
break
else:
followers = self.system.followers[prompt.process_name]
for follower_name in followers:
follower = self.system.processes[follower_name]
follower.run(prompt)
i += 1
self.pending_prompts.task_done()
finally:
run_frequency = i / (time.time() - start_time)
# print(f"Run frequency: {run_frequency}")
self.iteration_lock.release()
# This is the old way of doing it, with recursion.
# def run_followers_with_recursion(self, prompt):
# followers = self.system.followers[prompt.process_name]
# for follower_name in followers:
# follower = self.processes[follower_name]
# follower.run(prompt)
#
[docs]class MultiThreadedRunner(InProcessRunner):
"""
Runs a system with a thread for each process.
"""
def __init__(self, system: System, poll_interval=None, clock_speed=None):
super(MultiThreadedRunner, self).__init__(system=system)
self.poll_interval = poll_interval or DEFAULT_POLL_INTERVAL
assert isinstance(system, System)
self.threads = {}
self.clock_speed = clock_speed
if self.clock_speed:
self.clock_event = Event()
self.stop_clock_event = Event()
else:
self.clock_event = None
self.stop_clock_event = None
[docs] def start(self):
super(MultiThreadedRunner, self).start()
assert not self.threads, "Already started"
self.inboxes = {}
self.outboxes = {}
self.clock_events = []
# Setup queues.
for process_name, upstream_names in self.system.followings.items():
inbox_id = process_name.lower()
if inbox_id not in self.inboxes:
self.inboxes[inbox_id] = Queue()
for upstream_class_name in upstream_names:
outbox_id = upstream_class_name.lower()
if outbox_id not in self.outboxes:
self.outboxes[outbox_id] = PromptOutbox()
if inbox_id not in self.outboxes[outbox_id].downstream_inboxes:
self.outboxes[outbox_id].downstream_inboxes[inbox_id] = self.inboxes[inbox_id]
# Construct application threads.
for process_name, process in self.system.processes.items():
process_instance_id = process_name
if self.clock_event:
process.clock_event = self.clock_event
process.tick_interval = 1 / self.clock_speed
thread = PromptQueuedApplicationThread(
process=process,
poll_interval=self.poll_interval,
inbox=self.inboxes[process_instance_id],
outbox=self.outboxes[process_instance_id],
# Todo: Is it better to clock the prompts or the notifications?
# clock_event=clock_event
)
self.threads[process_instance_id] = thread
# Start application threads.
for thread in self.threads.values():
thread.start()
# Start clock.
if self.clock_speed:
self.start_clock()
[docs] def start_clock(self):
tick_interval = 1 / self.clock_speed
# print(f"Tick interval: {tick_interval:.6f}s")
self.last_tick = None
self.this_tick = None
self.tick_adjustment = 0
def set_clock_event():
if self.stop_clock_event.is_set():
return
self.this_tick = time.process_time()
if self.last_tick:
tick_size = self.this_tick - self.last_tick
tick_oversize = tick_size - tick_interval
tick_oversize_percentage = 100 * (tick_oversize) / tick_interval
if tick_oversize_percentage > 300:
print(f"Warning: Tick over size: {tick_size :.6f}s {tick_oversize_percentage:.2f}%")
if abs(tick_oversize_percentage) < 300:
self.tick_adjustment += 0.5 * tick_interval * tick_oversize
max_tick_adjustment = 0.5 * tick_interval
min_tick_adjustment = 0
self.tick_adjustment = min(self.tick_adjustment, max_tick_adjustment)
self.tick_adjustment = max(self.tick_adjustment, min_tick_adjustment)
self.last_tick = self.this_tick
self.clock_event.set()
self.clock_event.clear()
if not self.stop_clock_event.is_set():
set_timer()
def set_timer():
# print(f"Tick adjustment: {self.tick_adjustment:.6f}")
if self.last_tick is not None:
time_since_last_tick = time.process_time() - self.last_tick
time_remaining = tick_interval - time_since_last_tick
timer_interval = time_remaining - self.tick_adjustment
if timer_interval < 0:
timer_interval = 0
# print("Warning: clock thread is running flat out!")
else:
timer_interval = 0
timer = Timer(timer_interval, set_clock_event)
timer.start()
set_timer()
[docs] def handle_prompt(self, prompt):
self.broadcast_prompt(prompt)
[docs] def broadcast_prompt(self, prompt):
outbox_id = prompt.process_name
assert outbox_id in self.outboxes, (outbox_id, self.outboxes.keys())
self.outboxes[outbox_id].put(prompt)
[docs] def close(self):
super(MultiThreadedRunner, self).close()
if self.clock_event is not None:
self.clock_event.set()
if self.stop_clock_event is not None:
self.stop_clock_event.set()
for thread in self.threads.values():
thread.inbox.put('QUIT')
for thread in self.threads.values():
thread.join(timeout=10)
self.threads.clear()
[docs]class PromptQueuedApplicationThread(Thread):
"""
Application thread which uses queues of prompts.
It loops on an "inbox" queue of prompts, and
adds its prompts to an "outbox" queue.
"""
def __init__(self, process: ProcessApplication, poll_interval=DEFAULT_POLL_INTERVAL,
inbox=None, outbox=None, clock_event=None):
super(PromptQueuedApplicationThread, self).__init__(daemon=True)
self.process = process
self.poll_interval = poll_interval
self.inbox = inbox
self.outbox = outbox
self.clock_event = clock_event
[docs] def run(self):
self.loop_on_prompts()
[docs] def loop_on_prompts(self):
# Loop on getting prompts.
while True:
try:
# Todo: Make the poll interval gradually increase if there are only timeouts?
prompt = self.inbox.get(timeout=self.poll_interval)
except Empty:
# Basically, we're polling after a timeout.
if self.clock_event is None:
self.run_process()
else:
self.inbox.task_done()
if prompt == 'QUIT':
self.process.close()
break
else:
if self.clock_event is not None:
self.clock_event.wait()
started = time.time()
self.run_process(prompt)
if self.clock_event is not None:
ended = time.time()
duration = ended - started
if self.clock_event.is_set():
print(f"Warning: Process {self.process.name} overran clock cycle: {duration}")
else:
print(f"Info: Process {self.process.name} ran within clock cycle: {duration}")
[docs] def run_process(self, prompt=None):
try:
self._run_process(prompt)
except CausalDependencyFailed:
pass
except EventSourcingError:
pass
@retry(CausalDependencyFailed, max_attempts=100, wait=0.2)
@retry((OperationalError, RecordConflictError), max_attempts=100, wait=0.01)
def _run_process(self, prompt=None):
self.process.run(prompt)
[docs]class PromptOutbox(object):
"""
Has a collection of downstream prompt inboxes.
"""
def __init__(self):
self.downstream_inboxes = {}
[docs] def put(self, prompt):
"""
Puts prompt in each downstream inbox (an actual queue).
"""
for queue in self.downstream_inboxes.values():
queue.put(prompt)
[docs]class SteppingRunner(InProcessRunner):
def __init__(self, normal_speed=1, scale_factor=1, is_verbose=False, *args, **kwargs):
super(SteppingRunner, self).__init__(*args, **kwargs)
self.normal_speed = normal_speed
self.scale_factor = scale_factor
self.is_verbose = is_verbose
if scale_factor:
self.tick_interval = 1 / (normal_speed * scale_factor)
else:
self.tick_interval = 0
if self.is_verbose:
print(f"Tick interval: {self.tick_interval:.6f}s")
self.clock_thread = None
[docs] def call_in_future(self, cmd, ticks_delay):
self.clock_thread.call_in_future(cmd, ticks_delay)
[docs]class SteppingSingleThreadedRunner(SteppingRunner):
def __init__(self, *args, **kwargs):
super(SteppingSingleThreadedRunner, self).__init__(*args, **kwargs)
self.seen_prompt_events = {}
self.stop_event = Event()
[docs] def start(self):
super(SteppingSingleThreadedRunner, self).start()
self.clock_thread = ProcessRunningClockThread(
normal_speed=self.normal_speed,
scale_factor=self.scale_factor,
stop_event=self.stop_event,
is_verbose=self.is_verbose,
seen_prompt_events=self.seen_prompt_events,
processes=self.system.processes
)
self.clock_thread.start()
[docs] def handle_prompt(self, prompt):
pass
[docs] def close(self):
self.stop_event.set()
super(SteppingSingleThreadedRunner, self).close()
self.clock_thread.join(5)
if self.clock_thread.isAlive():
print(f"Warning: clock thread was still alive")
[docs]class ClockThread(Thread):
def __init__(self, *args, **kwargs):
super(ClockThread, self).__init__()
self.future_cmds = defaultdict(list)
self.tick_count = 0
[docs] def call_in_future(self, cmd, ticks_delay):
ticks_delay = max(ticks_delay, 1)
self.future_cmds[ticks_delay + self.tick_count].append(cmd)
[docs] def call_commands(self):
for future_cmd in self.future_cmds.get(self.tick_count, []):
future_cmd()
[docs]class ProcessRunningClockThread(ClockThread):
def __init__(self, normal_speed, scale_factor, stop_event: Event,
is_verbose=False, seen_prompt_events=None, processes=None):
super(ProcessRunningClockThread, self).__init__(daemon=True)
self.normal_speed = normal_speed
self.scale_factor = scale_factor
self.stop_event = stop_event
self.seen_prompt_events = seen_prompt_events
self.processes = processes
self.last_tick_time = None
self.last_process_time = None
self.all_tick_durations = deque()
self.tick_adjustment = 0.0
self.is_verbose = is_verbose
if normal_speed and scale_factor:
self.tick_interval = 1 / normal_speed / scale_factor
else:
self.tick_interval = None
if self.tick_interval:
self.tick_durations_window_size = max(100, int(round(1 / self.tick_interval, 0)))
else:
self.tick_durations_window_size = 1000
# Construct lists of followers for each process.
self.followers = {}
for process_name, process in self.processes.items():
self.followers[process_name] = []
for process_name, process in self.processes.items():
for upstream_process_name in process.readers:
self.followers[upstream_process_name].append(process_name)
# Construct a notification log reader for each process.
self.readers = {}
for process_name, process in self.processes.items():
reader = NotificationLogReader(
notification_log=process.notification_log,
use_direct_query_if_available=True
)
self.readers[process_name] = reader
@property
def actual_clock_speed(self):
if self.all_tick_durations:
# Todo: Might need to lock this, to avoid "RuntimeError: deque mutated during iteration".
durations = list(self.all_tick_durations)
return len(durations) / sum(durations)
else:
return 0
[docs] def run(self):
# Get new notifications once.
# loop_counter = 0
while not self.stop_event.is_set():
# print("Loop count: {}".format(loop_counter))
try:
# Get all notifications.
all_notifications = {}
for process_name in self.processes:
# seen_prompt = self.seen_prompt_events[process_name]
# if seen_prompt.is_set():
# seen_prompt.clear()
reader = self.readers[process_name]
notifications = reader.read_list()
all_notifications[process_name] = notifications
# Process all notifications.
all_events = {}
for process_name, notifications in all_notifications.items():
events = []
for notification in notifications:
# print(process_name, notification)
process = self.processes[process_name]
# It's not the follower process, but the method does the same thing.
event = process.get_event_from_notification(notification)
notification_id = notification['id']
events.append((notification_id, event))
all_events[process_name] = events
for process_name, events in all_events.items():
# print(f"Process: {process_name}")
for follower_name in self.followers[process_name]:
follower_process = self.processes[follower_name]
# print(f"Follower: {follower_name}")
for notification_id, event in events:
# print(f"Notification: {notification_id}, {event}")
follower_process.process_upstream_event(event, notification_id, process_name)
# Call commands delayed until this clock tick.
self.call_commands()
except:
if self.stop_event.is_set():
break
else:
# print("Error... Loop count: {}".format(loop_counter))
self.stop_event.set()
raise
else:
tick_time = time.time()
process_time = time.process_time()
if self.last_tick_time is not None:
tick_duration = tick_time - self.last_tick_time
self.all_tick_durations.append(tick_duration)
if len(self.all_tick_durations) > self.tick_durations_window_size:
self.all_tick_durations.popleft()
if self.is_verbose:
process_duration = process_time - self.last_process_time
intensity = 100 * process_duration / tick_duration
clock_speed = 1 / tick_duration
real_time = self.tick_count / self.normal_speed
print(f"Tick {self.tick_count:4}: {real_time:4.2f}s {tick_duration:.6f}s, "
f"{intensity:6.2f}%, {clock_speed:6.1f}Hz, "
f"{self.actual_clock_speed:6.1f}Hz, {self.tick_adjustment:.6f}s"
)
if self.tick_interval:
tick_oversize = tick_duration - self.tick_interval
tick_oversize_percentage = 100 * (tick_oversize) / self.tick_interval
# if tick_oversize_percentage > 300:
# print(f"Warning: Tick over size: { tick_duration :.6f}s {tick_oversize_percentage:.2f}%")
if abs(tick_oversize_percentage) < 300:
# Weight falls from 1 as reciprocal of count, to tick interval.
# weight = max(1 / self.tick_count, min(.1, self.tick_interval))
weight = 1 / (1 + self.tick_count * self.tick_interval) ** 2
# print(f"Weight: {weight:.4f}")
self.tick_adjustment += weight * tick_oversize
max_tick_adjustment = 1.0 * self.tick_interval
min_tick_adjustment = 0
self.tick_adjustment = min(self.tick_adjustment, max_tick_adjustment)
self.tick_adjustment = max(self.tick_adjustment, min_tick_adjustment)
self.last_tick_time = tick_time
self.last_process_time = process_time
self.tick_count += 1
if self.tick_interval:
sleep_interval = self.tick_interval - self.tick_adjustment
sleep(max(sleep_interval, 0))
[docs]class SteppingMultiThreadedRunner(SteppingRunner):
"""
Has a clock thread, and a thread for each application process
in the system. The clock thread loops until stopped, waiting
for a barrier, after sleeping for remaining tick interval timer.
Application threads loop until stopped, waiting for the same
barrier. Then, after all threads are waiting at the barrier,
the barrier is lifted. The clock thread proceeds by sleeping
for the clock tick interval. The application threads proceed by
getting new notifications and processing all of them.
There are actually two barriers, so that each application thread
waits before getting notifications, and then waits for all processes
to complete getting notification before processing the notifications
through the application policy. This avoids events created by a process
application "bleeding" into the notifications of another process
application in the same clock cycle.
Todo:
Receive prompts, but set an event for the prompting process, to avoid unnecessary runs.
Allow commands to be scheduled at future clock tick number, and execute when reached.
"""
def __init__(self, *args, **kwargs):
super(SteppingMultiThreadedRunner, self).__init__(*args, **kwargs)
self.seen_prompt_events = {}
self.fetch_barrier = None
self.execute_barrier = None
self.application_threads = {}
self.clock_thread = None
self.stop_event = Event()
[docs] def handle_prompt(self, prompt):
seen_prompt = self.seen_prompt_events[prompt.process_name]
seen_prompt.set()
[docs] def start(self):
super(SteppingMultiThreadedRunner, self).start()
parties = 1 + len(self.system.processes)
self.fetch_barrier = Barrier(parties)
self.execute_barrier = Barrier(parties)
# Create an event for each process.
for process_name in self.system.processes:
self.seen_prompt_events[process_name] = Event()
# Construct application threads.
for process_name, process in self.system.processes.items():
process_instance_id = process_name
thread = BarrierControlledApplicationThread(
process=process,
fetch_barrier=self.fetch_barrier,
execute_barrier=self.execute_barrier,
stop_event=self.stop_event,
)
self.application_threads[process_instance_id] = thread
# Start application threads.
for thread in self.application_threads.values():
thread.start()
# Start clock thread.
self.clock_thread = BarrierControllingClockThread(
normal_speed=self.normal_speed,
scale_factor=self.scale_factor,
tick_interval=self.tick_interval,
fetch_barrier=self.fetch_barrier,
execute_barrier=self.execute_barrier,
stop_event=self.stop_event,
is_verbose=self.is_verbose,
)
self.clock_thread.start()
[docs] def close(self):
self.stop_event.set()
super(SteppingMultiThreadedRunner, self).close()
self.execute_barrier.abort()
self.fetch_barrier.abort()
for thread in self.application_threads.values():
thread.join(timeout=1)
if thread.isAlive():
print(f"Warning: application thread '{thread.process.name}' was still alive: {thread.state}")
self.application_threads.clear()
self.clock_thread.join()
if self.clock_thread.isAlive():
print(f"Warning: clock thread was still alive")
[docs]class BarrierControlledApplicationThread(Thread):
def __init__(self, process: ProcessApplication, fetch_barrier: Barrier,
execute_barrier: Barrier, stop_event: Event):
super(BarrierControlledApplicationThread, self).__init__(daemon=True)
self.process_application = process
self.fetch_barrier = fetch_barrier
self.execute_barrier = execute_barrier
self.stop_event = stop_event
[docs] def run(self):
while not self.stop_event.is_set():
# Isolate "fetch" and "execute" steps, to avoid
# events created in one application being processed by
# another application in the same tick. Race condition
# where one process writes new events before another has
# read all notifications from last tick. Actually, need
# to get all notifications from all upstream applications
# and then process the notifications. The run() method
# gets notifications from one, then processes, then gets
# from another, which makes the race condition probable.
all_notifications = []
try:
self.fetch_barrier.wait()
except BrokenBarrierError:
self.abort()
else:
try:
# Get all notifications.
for upstream_name in self.process_application.readers:
notifications = list(self.process_application.read_reader(upstream_name))
all_notifications.append((upstream_name, notifications))
except:
self.abort()
raise
if self.stop_event.is_set():
break
try:
self.execute_barrier.wait()
except BrokenBarrierError:
self.abort()
else:
try:
# Process all notifications.
for upstream_name, notifications in all_notifications:
for notification in notifications:
event = self.process_application.get_event_from_notification(notification)
self.process_application.process_upstream_event(event, notification['id'], upstream_name)
except:
self.abort()
raise
try:
self.execute_barrier.wait()
except BrokenBarrierError:
self.abort()
[docs] def abort(self):
self.stop_event.set()
self.fetch_barrier.abort()
self.execute_barrier.abort()
[docs]class BarrierControllingClockThread(ClockThread):
def __init__(self, normal_speed, scale_factor, tick_interval,
fetch_barrier: Barrier, execute_barrier: Barrier,
stop_event: Event, is_verbose=False):
super(BarrierControllingClockThread, self).__init__(daemon=True)
# Todo: Remove the redundancy here.
self.normal_speed = normal_speed
self.scale_factor = scale_factor
self.tick_interval = tick_interval
self.fetch_barrier = fetch_barrier
self.execute_barrier = execute_barrier
self.stop_event = stop_event
self.last_tick_time = None
self.last_process_time = None
self.all_tick_durations = deque()
self.tick_adjustment = 0.0
self.is_verbose = is_verbose
if self.tick_interval:
self.tick_durations_window_size = max(1, int(round(1 / self.tick_interval, 0)))
else:
self.tick_durations_window_size = 100
@property
def actual_clock_speed(self):
if self.all_tick_durations:
durations = self.all_tick_durations
return len(durations) / sum(durations)
else:
return 0
[docs] def run(self):
while not self.stop_event.is_set():
try:
self.fetch_barrier.wait()
self.execute_barrier.wait()
self.execute_barrier.wait()
self.call_commands()
except BrokenBarrierError:
self.fetch_barrier.abort()
self.execute_barrier.abort()
self.stop_event.set()
else:
tick_time = time.time()
process_time = time.process_time()
if self.last_tick_time is not None:
tick_duration = tick_time - self.last_tick_time
self.all_tick_durations.append(tick_duration)
if len(self.all_tick_durations) > self.tick_durations_window_size:
self.all_tick_durations.popleft()
if self.is_verbose:
process_duration = process_time - self.last_process_time
intensity = 100 * process_duration / tick_duration
clock_speed = 1 / tick_duration
real_time = self.tick_count / self.normal_speed
print(f"Tick {self.tick_count:4}: {real_time:4.2f}s {tick_duration:.6f}s, "
f"{intensity:6.2f}%, {clock_speed:6.1f}Hz, "
f"{self.actual_clock_speed:6.1f}Hz, {self.tick_adjustment:.6f}s"
)
if self.tick_interval:
tick_oversize = tick_duration - self.tick_interval
tick_oversize_percentage = 100 * (tick_oversize) / self.tick_interval
# if tick_oversize_percentage > 300:
# print(f"Warning: Tick over size: { tick_duration :.6f}s {tick_oversize_percentage:.2f}%")
if abs(tick_oversize_percentage) < 300:
# Weight falls from 1 as reciprocal of count, to tick interval.
# weight = max(1 / self.tick_count, min(.1, self.tick_interval))
weight = 1 / (1 + self.tick_count * self.tick_interval) ** 2
# print(f"Weight: {weight:.4f}")
self.tick_adjustment += weight * tick_oversize
max_tick_adjustment = 1.0 * self.tick_interval
min_tick_adjustment = 0
self.tick_adjustment = min(self.tick_adjustment, max_tick_adjustment)
self.tick_adjustment = max(self.tick_adjustment, min_tick_adjustment)
self.last_tick_time = tick_time
self.last_process_time = process_time
self.tick_count += 1
if self.tick_interval:
sleep_interval = self.tick_interval - self.tick_adjustment
sleep(max(sleep_interval, 0))