import os
from eventsourcing.application.notificationlog import RecordManagerNotificationLog
from eventsourcing.application.pipeline import Pipeable
from eventsourcing.application.policies import PersistencePolicy
from eventsourcing.infrastructure.base import DEFAULT_PIPELINE_ID
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.factory import InfrastructureFactory
from eventsourcing.infrastructure.sequenceditem import StoredEvent
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
from eventsourcing.utils.cipher.aes import AESCipher
from eventsourcing.utils.random import decode_bytes
[docs]class SimpleApplication(Pipeable):
"""
Base class for event sourced applications.
Constructs infrastructure objects such as the repository and
event store, and also the notification log which presents the
application state as a sequence of events.
Needs actual infrastructure classes.
"""
infrastructure_factory_class = InfrastructureFactory
is_constructed_with_session = False
record_manager_class = None
stored_event_record_class = None
snapshot_record_class = None
sequenced_item_class = None
sequenced_item_mapper_class = None
json_encoder_class = None
json_decoder_class = None
persist_event_type = None
notification_log_section_size = None
use_cache = False
event_store_class = EventStore
repository_class = EventSourcedRepository
[docs] def __init__(self, name='', persistence_policy=None, persist_event_type=None,
cipher_key=None, sequenced_item_class=None, sequenced_item_mapper_class=None,
record_manager_class=None, stored_event_record_class=None, event_store_class=None,
snapshot_record_class=None, setup_table=True,
contiguous_record_ids=True, pipeline_id=DEFAULT_PIPELINE_ID, json_encoder_class=None,
json_decoder_class=None, notification_log_section_size=None):
self._datastore = None
self._event_store = None
self._repository = None
self.infrastructure_factory = None
self.name = name or type(self).__name__.lower()
self.notification_log_section_size = notification_log_section_size
self.sequenced_item_class = sequenced_item_class \
or type(self).sequenced_item_class \
or StoredEvent
self.sequenced_item_mapper_class = sequenced_item_mapper_class \
or type(self).sequenced_item_mapper_class \
or SequencedItemMapper
self.record_manager_class = record_manager_class or type(self).record_manager_class
self.event_store_class = event_store_class or type(self).event_store_class
self._stored_event_record_class = stored_event_record_class
self._snapshot_record_class = snapshot_record_class
self.json_encoder_class = json_encoder_class or type(self).json_encoder_class
self.json_decoder_class = json_decoder_class or type(self).json_decoder_class
self.persist_event_type = persist_event_type or type(self).persist_event_type
self.contiguous_record_ids = contiguous_record_ids
self.pipeline_id = pipeline_id
self.persistence_policy = persistence_policy
self.construct_cipher(cipher_key)
if self.record_manager_class or self.infrastructure_factory_class.record_manager_class:
self.construct_infrastructure()
if setup_table:
self.setup_table()
self.construct_notification_log()
if self.persistence_policy is None:
self.construct_persistence_policy()
@property
def datastore(self):
return self._datastore
@property
def event_store(self) -> EventStore:
return self._event_store
@property
def repository(self) -> EventSourcedRepository:
return self._repository
def construct_cipher(self, cipher_key):
cipher_key = decode_bytes(cipher_key or os.getenv('CIPHER_KEY', ''))
self.cipher = AESCipher(cipher_key) if cipher_key else None
def construct_infrastructure(self, *args, **kwargs):
self.infrastructure_factory = self.construct_infrastructure_factory(*args, **kwargs)
self.construct_datastore()
self.construct_event_store()
self.construct_repository()
[docs] def construct_infrastructure_factory(self, *args, **kwargs):
"""
:rtype: InfrastructureFactory
"""
factory_class = self.infrastructure_factory_class
assert issubclass(factory_class, InfrastructureFactory)
integer_sequenced_record_class = self._stored_event_record_class or self.stored_event_record_class
snapshot_record_class = self._snapshot_record_class or self.snapshot_record_class
return factory_class(
record_manager_class=self.record_manager_class,
integer_sequenced_record_class=integer_sequenced_record_class,
snapshot_record_class=snapshot_record_class,
sequenced_item_class=self.sequenced_item_class,
sequenced_item_mapper_class=self.sequenced_item_mapper_class,
contiguous_record_ids=self.contiguous_record_ids,
application_name=self.name,
pipeline_id=self.pipeline_id,
event_store_class=self.event_store_class,
*args, **kwargs
)
def construct_datastore(self):
self._datastore = self.infrastructure_factory.construct_datastore()
def construct_event_store(self):
self._event_store = self.infrastructure_factory.construct_integer_sequenced_event_store(self.cipher)
def construct_repository(self, **kwargs):
self._repository = self.repository_class(
event_store=self.event_store,
use_cache=self.use_cache,
**kwargs
)
def setup_table(self):
# Setup the database table using event store's record class.
if self.datastore is not None:
self.datastore.setup_table(
self.event_store.record_manager.record_class
)
def drop_table(self):
# Drop the database table using event store's record class.
if self.datastore is not None:
self.datastore.drop_table(
self.event_store.record_manager.record_class
)
def construct_notification_log(self):
self.notification_log = RecordManagerNotificationLog(
self.event_store.record_manager,
section_size=self.notification_log_section_size
)
def construct_persistence_policy(self):
self.persistence_policy = PersistencePolicy(
event_store=self.event_store,
persist_event_type=self.persist_event_type
)
def change_pipeline(self, pipeline_id):
self.pipeline_id = pipeline_id
self.event_store.record_manager.pipeline_id = pipeline_id
def close(self):
# Close the persistence policy.
if self.persistence_policy is not None:
self.persistence_policy.close()
# Close database connection.
if self.datastore is not None:
self.datastore.close_connection()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@classmethod
def reset_connection_after_forking(cls):
pass
@classmethod
def mixin(cls, infrastructure_class):
return type(cls.__name__, (infrastructure_class, cls), {})
[docs]class ApplicationWithConcreteInfrastructure(SimpleApplication):
"""
Subclasses have actual infrastructure.
"""