Source code for eventsourcing.system.multiprocess

import multiprocessing
from multiprocessing import Manager
from queue import Empty, Queue
from time import sleep
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple, Type

from eventsourcing.application.notificationlog import RecordManagerNotificationLog
from eventsourcing.application.popo import PopoApplication
from eventsourcing.application.process import ProcessApplication, PromptToQuit
from eventsourcing.application.simple import (
    ApplicationWithConcreteInfrastructure,
    Prompt,
    PromptToPull,
    is_prompt_to_pull,
)
from eventsourcing.domain.model.decorators import retry
from eventsourcing.domain.model.events import subscribe, unsubscribe
from eventsourcing.exceptions import (
    CausalDependencyFailed,
    OperationalError,
    ProgrammingError,
    RecordConflictError,
)
from eventsourcing.infrastructure.base import DEFAULT_PIPELINE_ID
from eventsourcing.system.definition import AbstractSystemRunner, System
from eventsourcing.system.runner import DEFAULT_POLL_INTERVAL, PromptOutbox


[docs]class MultiprocessRunner(AbstractSystemRunner):
[docs] def __init__( self, system: System, pipeline_ids: Sequence[int] = (DEFAULT_PIPELINE_ID,), poll_interval: Optional[int] = None, setup_tables: bool = False, sleep_for_setup_tables: int = 0, **kwargs: Any ): super(MultiprocessRunner, self).__init__(system=system, **kwargs) self.pipeline_ids = pipeline_ids self.poll_interval = poll_interval or DEFAULT_POLL_INTERVAL assert isinstance(system, System) self.setup_tables = setup_tables or system.setup_tables self.sleep_for_setup_tables = sleep_for_setup_tables self.os_processes: List[OperatingSystemProcess] = []
[docs] def start(self) -> None: self.os_processes = [] self.manager = Manager() if TYPE_CHECKING: self.inboxes: Dict[Tuple[int, str], Queue[Prompt]] self.outboxes: Dict[Tuple[int, str], PromptOutbox[Tuple[int, str]]] self.inboxes = {} self.outboxes = {} # Setup queues. for pipeline_id in self.pipeline_ids: for process_name, upstream_names in self.system.upstream_names.items(): inbox_id = (pipeline_id, process_name.lower()) if inbox_id not in self.inboxes: self.inboxes[inbox_id] = self.manager.Queue() for upstream_class_name in upstream_names: outbox_id = (pipeline_id, upstream_class_name.lower()) if outbox_id not in self.outboxes: self.outboxes[outbox_id] = PromptOutbox() if inbox_id not in self.outboxes[outbox_id].downstream_inboxes: self.outboxes[outbox_id].downstream_inboxes[ inbox_id ] = self.inboxes[inbox_id] # Check we have the infrastructure classes we need. for process_class in self.system.process_classes.values(): if not isinstance(process_class, ApplicationWithConcreteInfrastructure): if not self.infrastructure_class: raise ProgrammingError("infrastructure_class is not set") elif issubclass(self.infrastructure_class, PopoApplication): raise ProgrammingError("Can't use %s with %s" % ( type(self), self.infrastructure_class )) elif not issubclass( self.infrastructure_class, ApplicationWithConcreteInfrastructure ): raise ProgrammingError( "infrastructure_class is not a subclass of {}".format( ApplicationWithConcreteInfrastructure ) ) # Subscribe to broadcast prompts published by a process # application in the parent operating system process. subscribe(handler=self.broadcast_prompt, predicate=is_prompt_to_pull) # Start operating system process. expect_tables_exist = False for pipeline_id in self.pipeline_ids: for process_name, upstream_names in self.system.upstream_names.items(): process_class = self.system.process_classes[process_name] inbox = self.inboxes[(pipeline_id, process_name.lower())] outbox = self.outboxes.get((pipeline_id, process_name.lower())) os_process = OperatingSystemProcess( application_process_class=process_class, infrastructure_class=self.infrastructure_class, upstream_names=upstream_names, poll_interval=self.poll_interval, pipeline_id=pipeline_id, setup_tables=self.setup_tables, inbox=inbox, outbox=outbox, ) os_process.daemon = True os_process.start() self.os_processes.append(os_process) if self.setup_tables and not expect_tables_exist: # Avoid conflicts when creating tables. sleep(self.sleep_for_setup_tables) expect_tables_exist = True # Construct process applications in local process. for process_class in self.system.process_classes.values(): self.get(process_class)
def broadcast_prompt(self, prompt: PromptToPull) -> None: outbox_id = (prompt.pipeline_id, prompt.process_name) outbox = self.outboxes.get(outbox_id) if outbox: outbox.put(prompt)
[docs] def close(self) -> None: super(MultiprocessRunner, self).close() unsubscribe(handler=self.broadcast_prompt, predicate=is_prompt_to_pull) for os_process in self.os_processes: os_process.inbox.put(PromptToQuit()) for os_process in self.os_processes: os_process.join(timeout=10) for os_process in self.os_processes: if os_process.is_alive(): os_process.terminate() self.os_processes.clear()
[docs]class OperatingSystemProcess(multiprocessing.Process):
[docs] def __init__( self, application_process_class: Type[ProcessApplication], infrastructure_class: Type[ApplicationWithConcreteInfrastructure], upstream_names: List[str], inbox: Queue, outbox: Optional[PromptOutbox[Tuple[int, str]]] = None, pipeline_id: int = DEFAULT_PIPELINE_ID, poll_interval: int = DEFAULT_POLL_INTERVAL, setup_tables: bool = False, *args: Any, **kwargs: Any ): super(OperatingSystemProcess, self).__init__(*args, **kwargs) self.application_process_class = application_process_class self.infrastructure_class = infrastructure_class self.upstream_names = upstream_names self.daemon = True self.pipeline_id = pipeline_id self.poll_interval = poll_interval self.inbox = inbox self.outbox = outbox self.setup_tables = setup_tables
[docs] def run(self) -> None: # Construct process application class. process_class = self.application_process_class if not isinstance(process_class, ApplicationWithConcreteInfrastructure): if self.infrastructure_class: process_class = process_class.mixin(self.infrastructure_class) else: raise ProgrammingError("infrastructure_class is not set") # Construct process application object. self.process: ProcessApplication = process_class( pipeline_id=self.pipeline_id, setup_table=self.setup_tables ) # Follow upstream notification logs. for upstream_name in self.upstream_names: # Obtain a notification log object (local or remote) for the upstream # process. if upstream_name == self.process.name: # Upstream is this process's application, # so use own notification log. notification_log = self.process.notification_log else: # For a different application, we need to construct a notification # log with a record manager that has the upstream application ID. # Currently assumes all applications are using the same database # and record manager class. If it wasn't the same database,we would # to use a remote notification log, and upstream would need to provide # an API from which we can pull. It's not unreasonable to have a fixed # number of application processes connecting to the same database. record_manager = self.process.event_store.record_manager notification_log = RecordManagerNotificationLog( record_manager=record_manager.clone( application_name=upstream_name, # Todo: Check if setting pipeline_id is necessary (it's the # same?). pipeline_id=self.pipeline_id, ), section_size=self.process.notification_log_section_size, ) # Todo: Support upstream partition IDs different from self.pipeline_id? # Todo: Support combining partitions. Read from different partitions # but write to the same partition, # could be one os process that reads from many logs of the same # upstream app, or many processes each # reading one partition with contention writing to the same partition). # Todo: Support dividing partitions Read from one but write to many. # Maybe one process per # upstream partition, round-robin to pick partition for write. Or # have many processes reading # with each taking it in turn to skip processing somehow. # Todo: Dividing partitions would allow a stream to flow at the same # rate through slower # process applications. # Todo: Support merging results from "replicated state machines" - # could have a command # logging process that takes client commands and presents them in a # notification log. # Then the system could be deployed in different places, running # independently, receiving # the same commands, and running the same processes. The command # logging process could # be accompanied with a result logging process that reads results # from replicas as they # are available. Not sure what to do if replicas return different # things. If one replica # goes down, then it could resume by pulling events from another? Not # sure what to do. # External systems could be modelled as commands. # Make the process follow the upstream notification log. self.process.follow(upstream_name, notification_log) # Subscribe to broadcast prompts published by the process application. subscribe(handler=self.broadcast_prompt, predicate=is_prompt_to_pull) try: self.loop_on_prompts() finally: unsubscribe(handler=self.broadcast_prompt, predicate=is_prompt_to_pull)
@retry(CausalDependencyFailed, max_attempts=100, wait=0.1) def loop_on_prompts(self) -> None: # Run once, in case prompts were missed. self.run_process() # Loop on getting prompts. while True: try: # Todo: Make the poll interval gradually increase if there are only # timeouts? item = self.inbox.get(timeout=self.poll_interval) self.inbox.task_done() if isinstance(item, PromptToQuit): self.process.close() break elif isinstance(item, PromptToPull): self.run_process(item) else: raise ProgrammingError("Unsupported prompt: {}".format(item)) except Empty: # Basically, we're polling after a timeout. self.run_process() @retry((OperationalError, RecordConflictError), max_attempts=100, wait=0.1) def run_process(self, prompt: Optional[Prompt] = None) -> None: self.process.run(prompt) def broadcast_prompt(self, prompt: PromptToPull) -> None: if self.outbox is not None: self.outbox.put(prompt)