import time
from collections import OrderedDict, defaultdict
from threading import Lock
from eventsourcing.application.pipeline import Pipeable
from eventsourcing.application.simple import SimpleApplication
from eventsourcing.application.snapshotting import SnapshottingApplication
from eventsourcing.domain.model.events import publish, subscribe, unsubscribe
from eventsourcing.exceptions import CausalDependencyFailed, PromptFailed
from eventsourcing.infrastructure.base import ACIDRecordManager
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
from eventsourcing.interface.notificationlog import NotificationLogReader
from eventsourcing.utils.transcoding import json_dumps, json_loads
[docs]class ProcessEvent(object):
def __init__(self, new_events, tracking_kwargs=None, causal_dependencies=None):
self.new_events = new_events
self.tracking_kwargs = tracking_kwargs
self.causal_dependencies = causal_dependencies
[docs]class ProcessApplication(Pipeable, SimpleApplication):
set_notification_ids = False
use_causal_dependencies = False
def __init__(self, name=None, policy=None, setup_table=False, **kwargs):
self.policy_func = policy
self.readers = OrderedDict()
self.is_reader_position_ok = defaultdict(bool)
self._notification_generators = {}
self._policy_lock = Lock()
self.clock_event = None
self.tick_interval = None
super(ProcessApplication, self).__init__(name=name, setup_table=setup_table, **kwargs)
# Publish prompts for any domain events that we persist.
if self.persistence_policy:
subscribe(
predicate=self.persistence_policy.is_event,
handler=self.publish_prompt,
)
[docs] def close(self):
if self.persistence_policy:
unsubscribe(
predicate=self.persistence_policy.is_event,
handler=self.publish_prompt,
)
super(ProcessApplication, self).close()
[docs] def publish_prompt(self, event=None):
"""
Publishes prompt for a given event.
Used to prompt downstream process application when an event
is published by this application's model, which can happen
when application command methods, rather than the process policy,
are called.
Wraps exceptions with PromptFailed, to avoid application policy exceptions being
seen directly in other applications when running synchronously in single thread.
"""
prompt = Prompt(self.name, self.pipeline_id)
try:
publish(prompt)
except PromptFailed:
raise
except Exception as e:
raise PromptFailed("{}: {}".format(type(e), str(e)))
[docs] def follow(self, upstream_application_name, notification_log):
# Create a reader.
reader = NotificationLogReader(notification_log, use_direct_query_if_available=True)
self.readers[upstream_application_name] = reader
[docs] def run(self, prompt=None, advance_by=None):
if prompt:
assert isinstance(prompt, Prompt)
upstream_names = [prompt.process_name]
else:
upstream_names = self.readers.keys()
notification_count = 0
for upstream_name in upstream_names:
if not self.is_reader_position_ok[upstream_name]:
self.del_notification_generator(upstream_name)
self.set_reader_position_from_tracking_records(upstream_name)
self.is_reader_position_ok[upstream_name] = True
while True:
with self._policy_lock:
# Get notification generator.
generator = self.get_notification_generator(upstream_name, advance_by)
try:
notification = next(generator)
except StopIteration:
self.del_notification_generator(upstream_name)
break
notification_count += 1
# Get domain event from notification.
event = self.get_event_from_notification(notification)
# Decode causal dependencies of the domain event.
causal_dependencies = notification.get('causal_dependencies') or '[]'
causal_dependencies = json_loads(causal_dependencies) or []
# Check causal dependencies are satisfied.
for causal_dependency in causal_dependencies:
pipeline_id = causal_dependency['pipeline_id']
notification_id = causal_dependency['notification_id']
_manager = self.event_store.record_manager
has_tracking_record = _manager.has_tracking_record(
upstream_application_name=upstream_name,
pipeline_id=pipeline_id,
notification_id=notification_id
)
if not has_tracking_record:
# Invalidate reader position.
self.is_reader_position_ok[upstream_name] = False
# Raise exception.
raise CausalDependencyFailed({
'application_name': self.name,
'upstream_name': upstream_name,
'pipeline_id': pipeline_id,
'notification_id': notification_id
})
# Wait on the clock event, if there is one.
if self.clock_event is not None:
self.clock_event.wait()
# print("Processing upstream event: ", event)
new_events = self.process_upstream_event(event, notification['id'], upstream_name)
self.take_snapshots(new_events)
# Publish a prompt if there are new notifications.
# Todo: Optionally send events as prompts, saves pulling event if it arrives in order.
if any([event.__notifiable__ for event in new_events]):
self.publish_prompt()
return notification_count
[docs] def process_upstream_event(self, event, notification_id, upstream_name):
if self.tick_interval is not None:
cycle_started = time.process_time()
else:
cycle_started = None
# Call policy with the upstream event.
all_aggregates, causal_dependencies = self.call_policy(event)
# Collect pending events.
new_events = self.collect_pending_events(all_aggregates)
# Record process event.
try:
tracking_kwargs = self.construct_tracking_kwargs(
notification_id, upstream_name
)
process_event = ProcessEvent(
new_events, tracking_kwargs, causal_dependencies
)
self.record_process_event(process_event)
# Todo: Maybe write one tracking record at the end of a run, if
# necessary, or only during a period of time when nothing happens?
except Exception as exc:
# Need to invalidate reader position, so it is refreshed.
self.is_reader_position_ok[upstream_name] = False
# Need to purge from the cache relevant entities that
# have evolved their state past what has been recorded,
# otherwise strange errors (about version mismatches, or
# when identifying causal dependencies) can arise.
if self.repository._cache:
originator_ids = set([event.originator_id for event in new_events])
for originator_id in originator_ids:
try:
del self.repository._cache[originator_id]
except KeyError:
pass
raise exc
else:
if self.tick_interval is not None:
# Todo: Change this to use the full cycle time (improve getting notifications first).
cycle_ended = time.process_time()
cycle_time = cycle_ended - cycle_started
cycle_perc = 100 * (cycle_time) / self.tick_interval
if cycle_perc > 100:
msg = f"Warning: {self.name} cycle exceeded tick interval by: {cycle_perc - 100:.2f}%"
print(msg)
return new_events
[docs] def get_event_from_notification(self, notification):
return self.event_store.mapper.event_from_topic_and_state(
topic=notification['topic'],
state=notification['state']
)
[docs] def get_notification_generator(self, upstream_name, advance_by):
# Dict avoids re-entrant calls to run() starting their own generator,
# so that notifications are only received once. Was needed in
# single-threaded runner before it was changed to use iteration not
# recursion. Hence, probably no longer needed - use reader directly.
try:
generator = self._notification_generators[upstream_name]
except KeyError:
# Todo: Rename as 'iterator'? We use an iterator, doesn't matter whether or not it is a generator.
generator = self.read_reader(upstream_name, advance_by)
self._notification_generators[upstream_name] = generator
return generator
[docs] def read_reader(self, upstream_name, advance_by=None):
return self.readers[upstream_name].read(advance_by=advance_by)
[docs] def del_notification_generator(self, upstream_name):
try:
del self._notification_generators[upstream_name]
except KeyError:
pass
[docs] def take_snapshots(self, new_events):
pass
[docs] def set_reader_position_from_tracking_records(self, upstream_name):
max_record_id = self.event_store.record_manager.get_max_tracking_record_id(upstream_name)
reader = self.readers[upstream_name]
reader.seek(max_record_id or 0)
[docs] def call_policy(self, event):
# Get the application policy.
policy = self.policy_func or self.policy
# Wrap the actual repository, so we can collect aggregates.
repository = RepositoryWrapper(self.repository)
# Actually call the policy.
new_aggregates = policy(repository, event)
# Collect all aggregates.
repo_aggregates = list(repository.retrieved_aggregates.values())
all_aggregates = repo_aggregates[:]
if new_aggregates is not None:
if not isinstance(new_aggregates, (list, tuple)):
new_aggregates = [new_aggregates]
if self.repository._use_cache:
for new_aggregate in new_aggregates:
self.repository._cache[new_aggregate.id] = new_aggregate
all_aggregates += new_aggregates
# Identify causal dependencies.
causal_dependencies = []
if self.use_causal_dependencies:
highest = defaultdict(int)
for entity_id, entity_version in repository.causal_dependencies:
pipeline_id, notification_id = self.event_store.record_manager.get_pipeline_and_notification_id(
entity_id, entity_version
)
if pipeline_id is not None and pipeline_id != self.pipeline_id:
highest[pipeline_id] = max(notification_id, highest[pipeline_id])
causal_dependencies = []
for pipeline_id, notification_id in highest.items():
causal_dependencies.append({
'pipeline_id': pipeline_id,
'notification_id': notification_id
})
# Todo: Optionally reference causal dependencies in current pipeline.
# Todo: Support processing notification from a single pipeline in parallel, according to dependencies.
return all_aggregates, causal_dependencies
[docs] @staticmethod
def policy(repository, event):
"""Empty method, can be overridden in subclasses to implement concrete policy."""
[docs] def collect_pending_events(self, aggregates):
pending_events = []
num_changed_aggregates = 0
# This doesn't necessarily obtain events in causal order...
for aggregate in aggregates:
batch = aggregate.__batch_pending_events__()
if len(batch):
num_changed_aggregates += 1
pending_events += batch
# ...so sort pending events across all aggregates.
if num_changed_aggregates > 1:
# Sort the events by timestamp.
# - this method is intended to establish the correct
# causal ordering of all these new events across all aggregates. It
# should work if all events are timestamped, all their timestamps
# are from the same clock, and none have the same value. If this
# doesn't work properly, for some reason, it would be possible when
# several aggregates publish events that depend on each other that
# concatenating pending events taken from each in turn will be incorrect
# and could potentially cause processing errors in a downstream process
# application that depends on the correct causal ordering of events. In
# the worst case, the events will still be placed correctly in the
# aggregate sequence, but if timestamps are skewed and so do not correctly
# order the events, the events may be out of order in their notification log.
# It is expected in normal usage that these events are created in the same
# operating system thread, with timestamps from the same operating system clock,
# and so the timestamps will provide the correct order. However, if somehow
# different events are timestamped from different clocks, then problems may occur
# if those clocks give timestamps that skew the correct causal order.
pending_events.sort(key=lambda x: x.timestamp)
return pending_events
[docs] def construct_tracking_kwargs(self, notification_id, upstream_application_name):
return {
'application_name': self.name,
'upstream_application_name': upstream_application_name,
'pipeline_id': self.pipeline_id,
'notification_id': notification_id,
}
[docs] def record_process_event(self, process_event):
# Construct event records.
event_records = self.construct_event_records(process_event.new_events,
process_event.causal_dependencies)
# Write event records with tracking record.
record_manager = self.event_store.record_manager
assert isinstance(record_manager, ACIDRecordManager)
record_manager.write_records(records=event_records,
tracking_kwargs=process_event.tracking_kwargs)
[docs] def construct_event_records(self, pending_events, causal_dependencies=None):
# Convert to event records.
sequenced_items = self.event_store.item_from_event(pending_events)
event_records = self.event_store.record_manager.to_records(sequenced_items)
# Set notification log IDs, and causal dependencies.
if len(event_records):
# Todo: Maybe keep track of what this probably is, to avoid query. Like log reader, invalidate on error.
if self.set_notification_ids:
current_max = self.event_store.record_manager.get_max_record_id() or 0
for domain_event, event_record in zip(pending_events, event_records):
if type(domain_event).__notifiable__:
current_max += 1
event_record.id = current_max
else:
event_record.id = 'event-not-notifiable'
if self.use_causal_dependencies:
assert hasattr(self.event_store.record_manager.record_class, 'causal_dependencies')
causal_dependencies = json_dumps(causal_dependencies)
# Only need first event to carry the dependencies.
event_records[0].causal_dependencies = causal_dependencies
return event_records
[docs] def setup_table(self):
super(ProcessApplication, self).setup_table()
if self.datastore is not None:
self.datastore.setup_table(
self.event_store.record_manager.tracking_record_class
)
[docs] def drop_table(self):
super(ProcessApplication, self).drop_table()
if self.datastore is not None:
self.datastore.drop_table(
self.event_store.record_manager.tracking_record_class
)
[docs]class RepositoryWrapper(object):
def __init__(self, repository):
self.retrieved_aggregates = {}
assert isinstance(repository, EventSourcedRepository)
self.repository = repository
self.causal_dependencies = []
def __getitem__(self, entity_id):
try:
return self.retrieved_aggregates[entity_id]
except KeyError:
entity = self.repository.__getitem__(entity_id)
self.retrieved_aggregates[entity_id] = entity
self.causal_dependencies.append((entity.id, entity.__version__))
return entity
def __contains__(self, entity_id):
return self.repository.__contains__(entity_id)
[docs]class Prompt(object):
def __init__(self, process_name, pipeline_id):
self.process_name = process_name
self.pipeline_id = pipeline_id
def __eq__(self, other):
return (
other
and isinstance(other, type(self))
and self.process_name == other.process_name
and self.pipeline_id == other.pipeline_id
)
def __repr__(self):
return "{}({}={}, {}={})".format(
type(self).__name__,
'process_name', self.process_name,
'pipeline_id', self.pipeline_id
)
[docs]class ProcessApplicationWithSnapshotting(SnapshottingApplication, ProcessApplication):
[docs] def take_snapshots(self, new_events):
for event in new_events:
if self.snapshotting_policy.condition(event):
self.snapshotting_policy.take_snapshot(event)