import os
from eventsourcing.application.policies import PersistencePolicy
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.factory import InfrastructureFactory
from eventsourcing.infrastructure.sequenceditem import StoredEvent
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
from eventsourcing.interface.notificationlog import RecordManagerNotificationLog
from eventsourcing.utils.cipher.aes import AESCipher
from eventsourcing.utils.random import decode_random_bytes
from eventsourcing.utils.uuids import uuid_from_application_name
[docs]class SimpleApplication(object):
persist_event_type = None
sequenced_item_class = None
sequenced_item_mapper_class = None
infrastructure_factory_class = None
record_manager_class = None
stored_event_record_class = None
snapshot_record_class = None
json_encoder_class = None
json_decoder_class = None
is_constructed_with_session = False
def __init__(self, name='', persistence_policy=None, persist_event_type=None,
cipher_key=None, sequenced_item_class=None, sequenced_item_mapper_class=None,
infrastructure_factory_class=None, record_manager_class=None,
stored_event_record_class=None, snapshot_record_class=None,
setup_table=True, contiguous_record_ids=True, pipeline_id=-1,
json_encoder_class=None, json_decoder_class=None,
notification_log_section_size=None):
self.name = name or type(self).__name__.lower()
self.notification_log_section_size = notification_log_section_size
self.sequenced_item_class = sequenced_item_class \
or type(self).sequenced_item_class \
or StoredEvent
self.sequenced_item_mapper_class = sequenced_item_mapper_class \
or type(self).sequenced_item_mapper_class \
or SequencedItemMapper
self.infrastructure_factory_class = infrastructure_factory_class \
or type(self).infrastructure_factory_class \
or InfrastructureFactory
self.record_manager_class = record_manager_class or type(self).record_manager_class
self.stored_event_record_class = stored_event_record_class or type(self).stored_event_record_class
self.snapshot_record_class = snapshot_record_class or type(self).snapshot_record_class
self.json_encoder_class = json_encoder_class or type(self).json_encoder_class
self.json_decoder_class = json_decoder_class or type(self).json_decoder_class
self.persist_event_type = persist_event_type or type(self).persist_event_type
self.contiguous_record_ids = contiguous_record_ids
self.application_id = uuid_from_application_name(self.name)
self.pipeline_id = pipeline_id
self.setup_cipher(cipher_key)
self.setup_infrastructure()
if setup_table:
self.setup_table()
self.setup_notification_log()
self.persistence_policy = persistence_policy
if self.persistence_policy is None:
self.setup_persistence_policy()
[docs] def setup_cipher(self, cipher_key):
cipher_key = decode_random_bytes(cipher_key or os.getenv('CIPHER_KEY', ''))
self.cipher = AESCipher(cipher_key) if cipher_key else None
[docs] def setup_infrastructure(self, *args, **kwargs):
self.infrastructure_factory = self.construct_infrastructure_factory(*args, **kwargs)
self.datastore = self.infrastructure_factory.construct_datastore()
self.setup_event_store()
self.setup_repository()
[docs] def construct_infrastructure_factory(self, *args, **kwargs):
"""
:rtype: InfrastructureFactory
"""
return self.infrastructure_factory_class(
record_manager_class=self.record_manager_class,
integer_sequenced_record_class=self.stored_event_record_class,
sequenced_item_class=self.sequenced_item_class,
contiguous_record_ids=self.contiguous_record_ids,
application_id=self.application_id,
pipeline_id=self.pipeline_id,
snapshot_record_class=self.snapshot_record_class,
*args, **kwargs
)
[docs] def setup_event_store(self):
# Construct event store.
sequenced_item_mapper = self.sequenced_item_mapper_class(
sequenced_item_class=self.sequenced_item_class,
cipher=self.cipher,
# sequence_id_attr_name=sequence_id_attr_name,
# position_attr_name=position_attr_name,
json_encoder_class=self.json_encoder_class,
json_decoder_class=self.json_decoder_class,
)
record_manager = self.infrastructure_factory.construct_integer_sequenced_record_manager()
self.event_store = EventStore(
record_manager=record_manager,
sequenced_item_mapper=sequenced_item_mapper,
)
[docs] def setup_repository(self, **kwargs):
self.repository = EventSourcedRepository(
event_store=self.event_store,
**kwargs
)
[docs] def setup_table(self):
# Setup the database table using event store's record class.
if self.datastore is not None:
self.datastore.setup_table(
self.event_store.record_manager.record_class
)
[docs] def setup_notification_log(self):
self.notification_log = RecordManagerNotificationLog(
self.event_store.record_manager,
section_size=self.notification_log_section_size
)
[docs] def setup_persistence_policy(self):
self.persistence_policy = PersistencePolicy(
event_store=self.event_store,
event_type=self.persist_event_type
)
[docs] def change_pipeline(self, pipeline_id):
self.pipeline_id = pipeline_id
self.event_store.record_manager.pipeline_id = pipeline_id
[docs] def drop_table(self):
# Drop the database table using event store's record class.
if self.datastore is not None:
self.datastore.drop_table(
self.event_store.record_manager.record_class
)
[docs] def close(self):
# Close the persistence policy.
if self.persistence_policy is not None:
self.persistence_policy.close()
# Close database connection.
if self.datastore is not None:
self.datastore.close_connection()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()