Source code for eventsourcing.example.application

from eventsourcing.application.base import ApplicationWithPersistencePolicies
from eventsourcing.example.domainmodel import create_new_example
from eventsourcing.example.infrastructure import ExampleRepository
from eventsourcing.infrastructure.snapshotting import EventSourcedSnapshotStrategy

[docs]class ExampleApplication(ApplicationWithPersistencePolicies): """ Example event sourced application with entity factory and repository. """ def __init__(self, **kwargs): super(ExampleApplication, self).__init__(**kwargs) self.snapshot_strategy = None if self.snapshot_event_store: self.snapshot_strategy = EventSourcedSnapshotStrategy( event_store=self.snapshot_event_store, ) assert self.entity_event_store is not None self.example_repository = ExampleRepository( event_store=self.entity_event_store, snapshot_strategy=self.snapshot_strategy, )
[docs] def create_new_example(self, foo='', a='', b=''): """Entity object factory."""
return create_new_example(foo=foo, a=a, b=b)
[docs]def construct_example_application(**kwargs): """Application object factory."""
return ExampleApplication(**kwargs) # "Global" variable for single instance of application. _application = None
[docs]def init_example_application(**kwargs): """ Constructs single global instance of application. To be called when initialising a worker process. """ global _application if _application is not None: raise AssertionError("init_example_application() has already been called")
_application = construct_example_application(**kwargs)
[docs]def get_example_application(): """ Returns single global instance of application. To be called when handling a worker request, if required. """ if _application is None: raise AssertionError("init_example_application() must be called first") assert isinstance(_application, ExampleApplication)
return _application
[docs]def close_example_application(): """ Shuts down single global instance of application. To be called when tearing down, perhaps between tests, in order to allow a subsequent call to init_example_application(). """ global _application if _application is not None: _application.close()
_application = None