Source code for eventsourcing.system.grpc.processor

import json
import logging
import sys
import traceback
from concurrent import futures
from datetime import datetime
from json.decoder import JSONDecodeError
from logging import DEBUG
from queue import Queue
from signal import SIGINT, signal
from threading import Event, Lock, Thread
from time import sleep
from typing import Dict, Type

# Todo: Check connection and reconnect if necessary - somehow.


import grpc
from grpc._channel import _InactiveRpcError

from eventsourcing.application import notificationlog
from eventsourcing.application.notificationlog import (
    AbstractNotificationLog,
    LocalNotificationLog,
    NotificationLogReader,
    Section,
)
from eventsourcing.application.process import ProcessApplication
from eventsourcing.application.simple import (
    ApplicationWithConcreteInfrastructure,
    is_prompt_to_pull,
)
from eventsourcing.domain.model.events import subscribe
from eventsourcing.system.grpc.processor_pb2 import (
    CallReply,
    CallRequest,
    Empty,
    LeadRequest,
    NotificationsReply,
    NotificationsRequest,
    PromptRequest,
)
from eventsourcing.system.grpc.processor_pb2_grpc import (
    ProcessorServicer,
    ProcessorStub,
    add_ProcessorServicer_to_server,
)
from eventsourcing.utils.topic import resolve_topic
from eventsourcing.utils.transcoding import ObjectJSONDecoder, ObjectJSONEncoder


[docs]class NotificationLogView(object): """ Presents sections of notification log for gRPC server. """
[docs] def __init__( self, notification_log: LocalNotificationLog, json_encoder: ObjectJSONEncoder ): self.notification_log = notification_log self.json_encoder = json_encoder
def present_resource(self, section_id: str) -> bytes: section = self.notification_log[section_id] return self.json_encoder.encode(section.__dict__)
class ProcessorClient(object): def __init__(self): self.channel = None self.json_encoder = ObjectJSONEncoder() self.json_decoder = ObjectJSONDecoder() def connect(self, address, timeout=5): """ Connect to client to server at given address. Calls ping() until it gets a response, or timeout is reached. """ self.close() self.channel = grpc.insecure_channel(address) self.stub = ProcessorStub(self.channel) timer_started = datetime.now() while True: # Ping until get a response. try: self.ping() except _InactiveRpcError: if timeout is not None: timer_duration = (datetime.now() - timer_started).total_seconds() if timer_duration > 15: raise Exception("Timed out trying to connect to %s" % address) else: continue else: break # def __enter__(self): # return self # # def __exit__(self, exc_type, exc_val, exc_tb): # self.close() # def close(self): """ Closes the client's GPRC channel. """ if self.channel is not None: self.channel.close() def ping(self): """ Sends a Ping request to the server. """ request = Empty() response = self.stub.Ping(request, timeout=5) # def follow(self, upstream_name, upstream_address): # request = FollowRequest( # upstream_name=upstream_name, upstream_address=upstream_address # ) # response = self.stub.Follow(request, timeout=5,) def prompt(self, upstream_name): """ Prompts downstream server with upstream name, so that downstream process and promptly pull new notifications from upstream process. """ request = PromptRequest(upstream_name=upstream_name) response = self.stub.Prompt(request, timeout=5) def get_notifications(self, section_id): """ Gets a section of event notifications from server. """ request = NotificationsRequest(section_id=section_id) notifications_reply = self.stub.GetNotifications(request, timeout=5) assert isinstance(notifications_reply, NotificationsReply) return notifications_reply.section def lead(self, application_name, address): """ Requests a process to connect and then send prompts to given address. """ request = LeadRequest( downstream_name=application_name, downstream_address=address ) response = self.stub.Lead(request, timeout=5) def call_application(self, method_name, *args, **kwargs): """ Calls named method on server's application with given args. """ request = CallRequest( method_name=method_name, args=self.json_encoder.encode(args), kwargs=self.json_encoder.encode(kwargs), ) response = self.stub.CallApplicationMethod(request, timeout=5) return self.json_decoder.decode(response.data)
[docs]class StartClient(Thread): """ Thread that creates a gRPC client and connects to a gRPC server. """
[docs] def __init__(self, clients, name, address): super(StartClient, self).__init__() self.clients = clients self.name = name self.address = address self.error = None
[docs] def run(self): """ Creates client and connects to address. """ client = ProcessorClient() try: client.connect(self.address) except Exception as e: self.error = e logging.error(e) else: self.clients[self.name] = client
[docs]class PullNotifications(Thread): """ Thread the pulls notifications from upstream process application. """
[docs] def __init__( self, prompt_event: Event, reader: NotificationLogReader, process_application: ProcessApplication, event_queue: Queue, upstream_name: str, has_been_stopped: Event, ): super(PullNotifications, self).__init__() self.prompt_event = prompt_event self.reader = reader self.process_application = process_application self.event_queue = event_queue self.upstream_name = upstream_name self.has_been_stopped = has_been_stopped
[docs] def run(self) -> None: """ Loops over waiting for prompt event to be set, reads event notifications from reader, gets domain events from notifications, and puts domain events on the queue of unprocessed events. """ # logging.info("started pull notifications thread") self.set_reader_position() while not self.has_been_stopped.is_set(): self.prompt_event.wait() self.prompt_event.clear() try: for notification in self.reader.read(): if self.has_been_stopped.is_set(): break domain_event = self.process_application.event_from_notification( notification ) self.event_queue.put( (domain_event, notification["id"], self.upstream_name) ) except Exception as e: logging.error(traceback.format_exc(e)) logging.error("Error reading notification log: %s" % e) logging.error("Retrying...") self.set_reader_position() sleep(1)
[docs] def set_reader_position(self): """ Sets reader position from recorded position. """ recorded_position = self.process_application.get_recorded_position( self.upstream_name ) self.reader.seek(recorded_position)
[docs]class RemoteNotificationLog(AbstractNotificationLog): """ Notification log that get notification log sections using gRPC client. """
[docs] def __init__( self, client: ProcessorClient, json_decoder: ObjectJSONDecoder, section_size: int, ): self.client = client self.json_decoder = json_decoder self._section_size = section_size
@property def section_size(self) -> int: return self._section_size
[docs] def __getitem__(self, section_id: str) -> Section: section = self.client.get_notifications(section_id) try: obj = self.json_decoder.decode(section) except JSONDecodeError: raise ValueError("Couldn't decode section: %s" % section) return Section(**obj)
[docs]class ProcessorServer(ProcessorServicer):
[docs] def __init__( self, application_topic, pipeline_id, infrastructure_topic, setup_table, address, upstreams, downstreams, push_prompt_interval, ): super(ProcessorServer, self).__init__() # Make getting notifications more efficient. notificationlog.USE_REGULAR_SECTIONS = False notificationlog.DEFAULT_SECTION_SIZE = 100 self.has_been_stopped = Event() signal(SIGINT, self.stop) self.application_class: Type[ProcessApplication] = resolve_topic( application_topic ) self.pipeline_id = pipeline_id self.application_name = self.application_class.create_name() infrastructure_class: Type[ ApplicationWithConcreteInfrastructure ] = resolve_topic(infrastructure_topic) self.application = self.application_class.mixin( infrastructure_class=infrastructure_class )(pipeline_id=self.pipeline_id, setup_table=setup_table) self.address = address self.json_encoder = ObjectJSONEncoder() self.json_decoder = ObjectJSONDecoder() self.upstreams = upstreams self.downstreams = downstreams self.prompt_events = {} self.push_prompt_interval = push_prompt_interval self.notification_log_view = NotificationLogView( self.application.notification_log, json_encoder=ObjectJSONEncoder(), ) for upstream_name in self.upstreams: self.prompt_events[upstream_name] = Event() # self.prompt_events[upstream_name].set() self.downstream_prompt_event = Event() subscribe(self._set_downstream_prompt_event, is_prompt_to_pull) self.serve() self.clients: Dict[str, ProcessorClient] = {} self.clients_lock = Lock() start_client_threads = [] remotes = {} remotes.update(self.upstreams) remotes.update(self.downstreams) for name, address in remotes.items(): thread = StartClient(self.clients, name, address) thread.setDaemon(True) thread.start() start_client_threads.append(thread) for thread in start_client_threads: thread.join() # logging.info("%s connected to %s" % (self.application_name, thread.name)) self.push_prompts_thread = Thread(target=self._push_prompts) self.push_prompts_thread.setDaemon(True) self.push_prompts_thread.start() # self.count_of_events = 0 self.pull_notifications_threads = {} self.unprocessed_domain_event_queue = Queue() for upstream_name, upstream_address in self.upstreams.items(): thread = PullNotifications( prompt_event=self.prompt_events[upstream_name], reader=NotificationLogReader( RemoteNotificationLog( client=self.clients[upstream_name], json_decoder=ObjectJSONDecoder(), section_size=self.application.notification_log.section_size, ) ), process_application=self.application, event_queue=self.unprocessed_domain_event_queue, upstream_name=upstream_name, has_been_stopped=self.has_been_stopped, ) thread.setDaemon(True) self.pull_notifications_threads[upstream_name] = thread self.process_events_thread = Thread(target=self._process_events) self.process_events_thread.setDaemon(True) self.process_events_thread.start() # Start the threads. for thread in self.pull_notifications_threads.values(): thread.start() # Wait for termination. self.wait_for_termination()
def _set_downstream_prompt_event(self, event): # logging.info( # "Setting downstream prompt event on %s for %s" # % (self.application_name, event) # ) self.downstream_prompt_event.set() def _push_prompts(self) -> None: # logging.info("Started push prompts thread") while not self.has_been_stopped.is_set(): try: self.__push_prompts() sleep(self.push_prompt_interval) except Exception as e: if not self.has_been_stopped.is_set(): logging.error(traceback.format_exc()) logging.error( "Continuing after error in 'push prompts' thread: %s", e ) sleep(1) def __push_prompts(self): self.downstream_prompt_event.wait() self.downstream_prompt_event.clear() # logging.info("Pushing prompts from %s" % self.application_name) for downstream_name in self.downstreams: client = self.clients[downstream_name] if not self.has_been_stopped.is_set(): client.prompt(self.application_name) def _process_events(self) -> None: while not self.has_been_stopped.is_set(): try: self.__process_events() except Exception as e: logging.error(traceback.format_exc()) logging.error("Continuing after error in 'process events' thread:", e) sleep(1) def __process_events(self): unprocessed_item = self.unprocessed_domain_event_queue.get() self.unprocessed_domain_event_queue.task_done() if unprocessed_item is None: return else: # Process domain event. domain_event, notification_id, upstream_name = unprocessed_item # logging.info("Unprocessed event: %s" % domain_event) new_events, new_records = self.application.process_upstream_event( domain_event, notification_id, upstream_name ) # Publish a prompt if there are new notifications. if any([event.__notifiable__ for event in new_events]): self.application.publish_prompt()
[docs] def serve(self): """ Starts gRPC server. """ self.executor = futures.ThreadPoolExecutor(max_workers=10) self.server = grpc.server(self.executor) # logging.info(self.application_class) add_ProcessorServicer_to_server(self, self.server) self.server.add_insecure_port(self.address) self.server.start()
[docs] def wait_for_termination(self): """ Runs until termination of process. """ self.server.wait_for_termination()
[docs] def Ping(self, request, context): return Empty()
# def Follow(self, request, context): # upstream_name = request.upstream_name # upstream_address = request.upstream_address # self.follow(upstream_name, upstream_address) # return Empty() # # def follow(self, upstream_name, upstream_address): # """""" # # logging.debug("%s is following %s" % (self.application_name, upstream_name)) # self.clients[upstream_name].lead(self.application_name, self.address)
[docs] def Lead(self, request, context): downstream_name = request.downstream_name downstream_address = request.downstream_address self.lead(downstream_name, downstream_address) return Empty()
[docs] def lead(self, downstream_name, downstream_address): """ Starts client and registers downstream to receive prompts. """ # logging.debug("%s is leading %s" % (self.application_name, downstream_name)) thread = StartClient(self.clients, downstream_name, downstream_address) thread.setDaemon(True) thread.start() thread.join() if thread.error: raise Exception( "Couldn't lead '%s' on address '%s': %s" % (downstream_name, downstream_address, thread.error) ) else: self.downstreams[downstream_name] = downstream_address
[docs] def start_client(self, name, address): """ Starts client connected to given address. """ if name not in self.clients: self.clients[name] = ProcessorClient() self.clients[name].connect(address)
[docs] def Prompt(self, request, context): upstream_name = request.upstream_name self.prompt(upstream_name) return Empty()
[docs] def prompt(self, upstream_name): """ Set prompt event for upstream name. """ self.prompt_events[upstream_name].set()
[docs] def GetNotifications(self, request, context): section_id = request.section_id section = self.get_notification_log_section(section_id) return NotificationsReply(section=section)
[docs] def get_notification_log_section(self, section_id): """ Returns section for given section ID. """ return self.notification_log_view.present_resource(section_id=section_id)
[docs] def CallApplicationMethod(self, request, context): method_name = request.method_name # logging.info("Call application method: %s" % method_name) args = self.json_decoder.decode(request.args) kwargs = self.json_decoder.decode(request.kwargs) method = getattr(self.application, method_name) return_value = method(*args, **kwargs) return CallReply(data=self.json_encoder.encode(return_value))
[docs] def stop(self, *args): """ Stops the gRPC server. """ # logging.debug("Stopping....") self.has_been_stopped.set() self.server.stop(grace=1)
if __name__ == "__main__": logging.basicConfig(level=DEBUG) application_topic = sys.argv[1] pipeline_id = json.loads(sys.argv[2]) infrastructure_topic = sys.argv[3] setup_table = (json.loads(sys.argv[4]),) address = sys.argv[5] upstreams = json.loads(sys.argv[6]) downstreams = json.loads(sys.argv[7]) push_prompt_interval = json.loads(sys.argv[8]) processor = ProcessorServer( application_topic, pipeline_id, infrastructure_topic, setup_table, address, upstreams, downstreams, push_prompt_interval, )