Source code for eventsourcing.example.application

from eventsourcing.application.base import ApplicationWithPersistencePolicies
from eventsourcing.example.domainmodel import create_new_example
from eventsourcing.example.infrastructure import ExampleRepository
from eventsourcing.infrastructure.snapshotting import EventSourcedSnapshotStrategy


[docs]class ExampleApplication(ApplicationWithPersistencePolicies): """ Example event sourced application with entity factory and repository. """ def __init__(self, **kwargs): super(ExampleApplication, self).__init__(**kwargs) self.snapshot_strategy = None if self.snapshot_event_store: self.snapshot_strategy = EventSourcedSnapshotStrategy( event_store=self.snapshot_event_store, ) assert self.entity_event_store is not None self.example_repository = ExampleRepository( event_store=self.entity_event_store, snapshot_strategy=self.snapshot_strategy, )
[docs] def create_new_example(self, foo='', a='', b=''): """Entity object factory.""" return create_new_example(foo=foo, a=a, b=b)
[docs]def construct_example_application(**kwargs): """Application object factory.""" return ExampleApplication(**kwargs)
# "Global" variable for single instance of application. _application = None
[docs]def init_example_application(**kwargs): """ Constructs single global instance of application. To be called when initialising a worker process. """ global _application if _application is not None: raise AssertionError("init_example_application() has already been called") _application = construct_example_application(**kwargs)
[docs]def get_example_application(): """ Returns single global instance of application. To be called when handling a worker request, if required. """ if _application is None: raise AssertionError("init_example_application() must be called first") assert isinstance(_application, ExampleApplication) return _application
[docs]def close_example_application(): """ Shuts down single global instance of application. To be called when tearing down, perhaps between tests, in order to allow a subsequent call to init_example_application(). """ global _application if _application is not None: _application.close() _application = None