Source code for eventsourcing.system.definition

import weakref
from abc import ABC, abstractmethod
from collections import OrderedDict
from copy import deepcopy
from typing import Any, Dict, List, Optional, Type, TypeVar

from _weakref import ReferenceType
from eventsourcing.application.popo import PopoApplication
from eventsourcing.application.process import ProcessApplication
from eventsourcing.application.simple import ApplicationWithConcreteInfrastructure
from eventsourcing.exceptions import ProgrammingError
from eventsourcing.infrastructure.base import DEFAULT_PIPELINE_ID

TProcessApplication = TypeVar("TProcessApplication", bound=ProcessApplication)
TSystemRunner = TypeVar("TSystemRunner", bound="AbstractSystemRunner")
TSystem = TypeVar("TSystem", bound="System")


[docs]class System(object): """ A system object has a set of pipeline expressions, which involve process application classes. A system object can be run using a system runner. """
[docs] def __init__(self, *pipeline_exprs: Any, **kwargs: Any): """ Initialises a "process network" system object. Each pipeline expression of process classes shows directly which process following which other process in the system. For example, the pipeline expression (A | B | C) shows that B following A, and C following B. The pipeline expression (A | A) shows that A following A. The pipeline expression (A | B | A) shows that B following A, and A following B. The pipeline expressions ((A | B | A), (A | C | A)) are equivalent to (A | B | A | C | A). :param pipeline_exprs: Pipeline expressions involving process application classes. """ self.pipelines_exprs = pipeline_exprs self.setup_tables = kwargs.get("setup_tables", False) self.infrastructure_class = kwargs.get("infrastructure_class", None) self.use_direct_query_if_available = kwargs.get( "use_direct_query_if_available", False ) self.session = kwargs.get("session", None) self.shared_session = None self.process_classes: OrderedDict[str, Type[ProcessApplication]] = OrderedDict() for pipeline_expr in self.pipelines_exprs: for process_class in pipeline_expr: process_name = process_class.create_name() if process_name not in self.process_classes: self.process_classes[process_name] = process_class self.runner: Optional[ReferenceType] = None self.is_session_shared = True self.shared_session = None # Construct pipeline graph. self.downstream_names: OrderedDict[str, List[str]] = OrderedDict() self.upstream_names: OrderedDict[str, List[str]] = OrderedDict() edges = [] nodes = [] for pipeline_expr in self.pipelines_exprs: previous_name = None for process_class in pipeline_expr: process_name = process_class.create_name() if process_name not in nodes: nodes.append(process_name) if previous_name: edges.append((previous_name, process_name)) previous_name = process_name for process_name in nodes: self.downstream_names[process_name] = [] self.upstream_names[process_name] = [] for upstream_name, downstream_name in edges: self.downstream_names[upstream_name].append(downstream_name) self.upstream_names[downstream_name].append(upstream_name) self.nodes_of_pipeline_spec = nodes self.edges_of_pipeline_spec = edges
[docs] def construct_app( self, process_class: Type[TProcessApplication], infrastructure_class: Optional[ Type[ApplicationWithConcreteInfrastructure] ] = None, **kwargs: Any, ) -> TProcessApplication: """ Constructs process application from given ``process_class``. """ # If process class isn't already an infrastructure class, then # subclass the process class with concrete infrastructure. if not issubclass(process_class, ApplicationWithConcreteInfrastructure): # Default to PopoApplication infrastructure. if infrastructure_class is None: infrastructure_class = self.infrastructure_class or PopoApplication # Assert that we now have an application with concrete infrastructure. if not issubclass( infrastructure_class, ApplicationWithConcreteInfrastructure ): raise ProgrammingError( "Given infrastructure_class {} is not subclass of {}" "".format( infrastructure_class, ApplicationWithConcreteInfrastructure ) ) # Subclass the process application class with the infrastructure class. process_class = process_class.mixin(infrastructure_class) assert issubclass(process_class, ApplicationWithConcreteInfrastructure) # Set 'session' and 'setup_table' in kwargs. kwargs = dict(kwargs) if "session" not in kwargs and process_class.is_constructed_with_session: kwargs["session"] = self.session or self.shared_session if "setup_tables" not in kwargs and self.setup_tables: kwargs["setup_table"] = self.setup_tables # Construct the process application. app = process_class(**kwargs) # Catch the session, so it can be shared. if self.session is None and self.shared_session is None: if process_class.is_constructed_with_session and self.is_session_shared: if self.shared_session is None: self.shared_session = app.session assert isinstance(app, ProcessApplication), app return app
[docs] def __enter__(self) -> "AbstractSystemRunner": """ Supports running a system object directly as a context manager. The system is run with the SingleThreadedRunner. """ from eventsourcing.system.runner import SingleThreadedRunner if self.runner: raise ProgrammingError("System is already running: {}".format(self.runner)) runner = SingleThreadedRunner( system=self, use_direct_query_if_available=self.use_direct_query_if_available, ) runner.start() self.runner = runner return runner
[docs] def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """ Supports usage of a system object as a context manager. """ if self.runner: runner: Optional[AbstractSystemRunner] = self.runner if runner is not None: runner.close() self.runner = None
[docs] def bind( self: TSystem, infrastructure_class: Type[ApplicationWithConcreteInfrastructure] ) -> TSystem: """ Constructs a system object that has an infrastructure class from a system object constructed without infrastructure class. Raises ProgrammingError if already have an infrastructure class. :param infrastructure_class: :return: System object that has an infrastructure class. :rtype: System """ # Check system doesn't already have an infrastructure class. if self.infrastructure_class: raise ProgrammingError("System already has an infrastructure class") # Clone the system object, and set the infrastructure class. system = type(self).__new__(type(self)) system.__dict__.update(dict(deepcopy(self.__dict__))) system.__dict__.update(infrastructure_class=infrastructure_class) return system
[docs]class AbstractSystemRunner(ABC):
[docs] def __init__( self, system: System, infrastructure_class: Optional[ Type[ApplicationWithConcreteInfrastructure] ] = None, setup_tables: bool = False, use_direct_query_if_available: bool = False, ): self.system = system self.infrastructure_class = ( infrastructure_class or self.system.infrastructure_class ) self.setup_tables = setup_tables self.use_direct_query_if_available = ( use_direct_query_if_available or system.use_direct_query_if_available ) self.processes: Dict[str, Any] = {}
[docs] def __enter__(self: TSystemRunner) -> TSystemRunner: """ Supports usage of a system runner as a context manager. """ self.start() return self
[docs] def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """ Supports usage of a system runner as a context manager. """ self.close()
[docs] @abstractmethod def start(self) -> None: """ Starts running the system. Abstract method which must be implemented on concrete descendants. """
[docs] def close(self) -> None: """ Closes a running system. """ self.system.shared_session = None if self.processes: process: ProcessApplication for process in self.processes.values(): process.close() self.processes.clear()
def get_class(self, process_name: str) -> Type[ProcessApplication]: return self.system.process_classes[process_name] def get( self, process_class: Type[TProcessApplication], pipeline_id=DEFAULT_PIPELINE_ID ) -> TProcessApplication: process_name = process_class.create_name() try: process = self.processes[(process_name, pipeline_id)] except KeyError: process = self._construct_app_by_class(process_class, pipeline_id) return process def _construct_app_by_class( self, process_class: Type[TProcessApplication], pipeline_id: int ) -> TProcessApplication: process = self.system.construct_app( process_class=process_class, pipeline_id=pipeline_id, infrastructure_class=self.infrastructure_class, setup_table=self.setup_tables or self.system.setup_tables, use_direct_query_if_available=self.use_direct_query_if_available, ) self.processes[(process.name, pipeline_id)] = process return process