Source code for eventsourcing.application.simple

import os
import zlib
from json import JSONDecoder, JSONEncoder
from typing import (
    Any,
    Dict,
    Generic,
    Iterable,
    List,
    NamedTuple,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
)

from eventsourcing.application.notificationlog import (
    LocalNotificationLog,
    RecordManagerNotificationLog,
)
from eventsourcing.application.pipeline import Pipeable
from eventsourcing.application.policies import PersistencePolicy
from eventsourcing.domain.model.aggregate import BaseAggregateRoot, TAggregateEvent
from eventsourcing.domain.model.entity import (
    TDomainEvent,
    TVersionedEntity,
    TVersionedEvent,
)
from eventsourcing.domain.model.events import DomainEvent, publish
from eventsourcing.exceptions import ProgrammingError, PromptFailed
from eventsourcing.infrastructure.base import (
    AbstractEventStore,
    AbstractRecordManager,
    BaseRecordManager,
    DEFAULT_PIPELINE_ID,
    RecordManagerWithTracking,
    TrackingKwargs,
)
from eventsourcing.infrastructure.datastore import AbstractDatastore
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.factory import InfrastructureFactory
from eventsourcing.infrastructure.sequenceditem import StoredEvent
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
from eventsourcing.utils.cipher.aes import AESCipher
from eventsourcing.utils.random import decode_bytes
from eventsourcing.whitehead import ActualOccasion, IterableOfEvents, T

PersistEventType = Optional[Union[Type[DomainEvent], Tuple[Type[DomainEvent]]]]

CausalDependencies = Dict[str, int]
ListOfCausalDependencies = List[CausalDependencies]


[docs]class ProcessEvent(ActualOccasion, Generic[TDomainEvent]):
[docs] def __init__( self, domain_events: Iterable[TDomainEvent], tracking_kwargs: Optional[TrackingKwargs] = None, causal_dependencies: Optional[ListOfCausalDependencies] = None, orm_objs_pending_save: Sequence[Any] = (), orm_objs_pending_delete: Sequence[Any] = (), ): self.domain_events = domain_events self.tracking_kwargs = tracking_kwargs self.causal_dependencies = causal_dependencies self.orm_objs_pending_save = orm_objs_pending_save self.orm_objs_pending_delete = orm_objs_pending_delete
[docs]class SimpleApplication(Pipeable, Generic[TVersionedEntity, TVersionedEvent]): """ Base class for event sourced applications. Constructs infrastructure objects such as the repository and event store, and also the notification log which presents the application state as a sequence of events. Needs actual infrastructure classes. """ infrastructure_factory_class: Type[InfrastructureFactory] = InfrastructureFactory is_constructed_with_session: bool = False record_manager_class: Optional[Type[AbstractRecordManager]] = None stored_event_record_class: Optional[type] = None snapshot_record_class: Optional[type] = None sequenced_item_class: Optional[Type[NamedTuple]] = None sequenced_item_mapper_class: Optional[Type[SequencedItemMapper]] = None compressor: Any = None json_encoder_class: Optional[Type[JSONEncoder]] = None sort_keys: bool = False json_decoder_class: Optional[Type[JSONDecoder]] = None persist_event_type: Optional[PersistEventType] = None notification_log_section_size: Optional[int] = None use_cache: bool = False event_store_class: Type[EventStore] = EventStore repository_class: Type[EventSourcedRepository] = EventSourcedRepository use_causal_dependencies = False set_notification_ids = False
[docs] def __init__( self, name: str = "", persistence_policy: Optional[PersistencePolicy] = None, persist_event_type: PersistEventType = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[SequencedItemMapper]] = None, record_manager_class: Optional[Type[AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = DEFAULT_PIPELINE_ID, json_encoder_class: Optional[Type[JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False, ): """ Initialises application object. :param name: Name of application. :param persistence_policy: Persistence policy object. :param persist_event_type: Tuple of domain event classes to be persisted. :param cipher_key: Base64 unicode string cipher key. :param compressor: Compressor used to compress serialized event state. :param sequenced_item_class: Named tuple for mapping and recording events. :param sequenced_item_mapper_class: Object class for mapping stored events. :param record_manager_class: Object class for recording stored events. :param stored_event_record_class: Object class for event records. :param event_store_class: Object class uses to store and retrieve domain events. :param snapshot_record_class: Object class used to represent snapshots. :param setup_table: Option to create database tables when application starts. :param contiguous_record_ids: Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps). :param pipeline_id: ID of instance of system pipeline expressions. :param json_encoder_class: Object class used to encode object as JSON strings. :param json_decoder_class: Object class used to decode JSON strings as objects. :param notification_log_section_size: Number of notification items in a section. :param use_cache: Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory). """ self.name = name or type(self).create_name() self.notification_log_section_size = ( notification_log_section_size or type(self).notification_log_section_size ) sequenced_item_class = sequenced_item_class or type(self).sequenced_item_class sequenced_item_class = sequenced_item_class or StoredEvent # type: ignore self.sequenced_item_class = sequenced_item_class assert self.sequenced_item_class is not None self.sequenced_item_mapper_class: Type[SequencedItemMapper] = ( sequenced_item_mapper_class or type(self).sequenced_item_mapper_class or SequencedItemMapper ) self.record_manager_class = ( record_manager_class or type(self).record_manager_class ) self._stored_event_record_class = stored_event_record_class self._snapshot_record_class = snapshot_record_class self.event_store_class = event_store_class or type(self).event_store_class self.json_encoder_class = json_encoder_class or type(self).json_encoder_class self.sort_keys = sort_keys or type(self).sort_keys self.json_decoder_class = json_decoder_class or type(self).json_decoder_class self.persist_event_type = persist_event_type or type(self).persist_event_type self.contiguous_record_ids = contiguous_record_ids self.pipeline_id = pipeline_id self._persistence_policy = persistence_policy self.cipher = self.construct_cipher(cipher_key) self.compressor = compressor or type(self).compressor # Default to using zlib compression when encrypting. if self.cipher and self.compressor is None: self.compressor = zlib self.infrastructure_factory: Optional[ InfrastructureFactory[TVersionedEvent] ] = None self._datastore: Optional[AbstractDatastore] = None self._event_store: Optional[ AbstractEventStore[TVersionedEvent, BaseRecordManager] ] = None self._repository: Optional[ EventSourcedRepository[TVersionedEntity, TVersionedEvent] ] = None self._notification_log: Optional[LocalNotificationLog] = None self.use_cache = use_cache or type(self).use_cache if ( self.record_manager_class or self.infrastructure_factory_class.record_manager_class ): self.construct_infrastructure() if setup_table: self.setup_table() self.construct_notification_log() if self._persistence_policy is None: self.construct_persistence_policy()
@classmethod def create_name(cls): return cls.__name__.lower() @property def datastore(self) -> AbstractDatastore: if self._datastore is None: self._raise_on_missing_infrastructure("datastore") return self._datastore @property def event_store(self) -> AbstractEventStore[TVersionedEvent, BaseRecordManager]: if self._event_store is None: self._raise_on_missing_infrastructure("event_store") return self._event_store @property def repository(self) -> EventSourcedRepository[TVersionedEntity, TVersionedEvent]: if self._repository is None: self._raise_on_missing_infrastructure("repository") return self._repository @property def notification_log(self) -> LocalNotificationLog: if self._notification_log is None: self._raise_on_missing_infrastructure("notification_log") return self._notification_log @property def persistence_policy(self) -> PersistencePolicy: if self._persistence_policy is None: self._raise_on_missing_infrastructure("persistence_policy") return self._persistence_policy def _raise_on_missing_infrastructure(self, what_is_missing): msg = "Application class %s does not have a %s." % ( type(self).__name__, what_is_missing, ) if not isinstance(self, ApplicationWithConcreteInfrastructure): msg += ( " and is not an ApplicationWithConcreteInfrastructure." " Try using or inheriting from or mixin() an application" " class with concrete infrastructure such as SQLAlchemyApplication" " or DjangoApplication or AxonApplication." ) raise ProgrammingError(msg) def construct_cipher(self, cipher_key_str: Optional[str]) -> Optional[AESCipher]: cipher_key_bytes = decode_bytes( cipher_key_str or os.getenv("CIPHER_KEY", "") or "" ) return AESCipher(cipher_key_bytes) if cipher_key_bytes else None
[docs] def construct_infrastructure(self, *args: Any, **kwargs: Any) -> None: """ Constructs infrastructure for application. """ self.infrastructure_factory = self.construct_infrastructure_factory( *args, **kwargs ) self.construct_datastore() self.construct_event_store() self.construct_repository()
[docs] def construct_infrastructure_factory( self, *args: Any, **kwargs: Any ) -> InfrastructureFactory: """ Constructs infrastructure factory object. """ factory_class = self.infrastructure_factory_class assert issubclass(factory_class, InfrastructureFactory) integer_sequenced_record_class = ( self._stored_event_record_class or self.stored_event_record_class ) snapshot_record_class = ( self._snapshot_record_class or self.snapshot_record_class ) return factory_class( # type:ignore # multiple values for keyword argument record_manager_class=self.record_manager_class, integer_sequenced_record_class=integer_sequenced_record_class, snapshot_record_class=snapshot_record_class, sequenced_item_class=self.sequenced_item_class, sequenced_item_mapper_class=self.sequenced_item_mapper_class, json_encoder_class=self.json_encoder_class, sort_keys=self.sort_keys, json_decoder_class=self.json_decoder_class, contiguous_record_ids=self.contiguous_record_ids, application_name=self.name, pipeline_id=self.pipeline_id, event_store_class=self.event_store_class, *args, **kwargs )
[docs] def construct_datastore(self) -> None: """ Constructs datastore object (which helps by creating and dropping tables). """ assert self.infrastructure_factory self._datastore = self.infrastructure_factory.construct_datastore()
[docs] def construct_event_store(self) -> None: """ Constructs event store object. """ assert self.infrastructure_factory factory = self.infrastructure_factory self._event_store = factory.construct_integer_sequenced_event_store( self.cipher, self.compressor )
[docs] def construct_repository(self, **kwargs: Any) -> None: """ Constructs repository object. """ assert self.repository_class self._repository = self.repository_class( event_store=self.event_store, use_cache=self.use_cache, **kwargs )
[docs] def setup_table(self) -> None: """ Sets up the database table using event store's record class. """ if self._datastore is not None: record_class = self.event_store.record_manager.record_class self.datastore.setup_table(record_class)
[docs] def drop_table(self) -> None: """ Drops the database table using event store's record class. """ if self._datastore is not None: record_class = self.event_store.record_manager.record_class self.datastore.drop_table(record_class)
[docs] def construct_notification_log(self) -> None: """ Constructs notification log object. """ self._notification_log = RecordManagerNotificationLog( self.event_store.record_manager, section_size=self.notification_log_section_size, )
[docs] def construct_persistence_policy(self) -> None: """ Constructs persistence policy object. """ self._persistence_policy = PersistencePolicy( event_store=self.event_store, persist_event_type=self.persist_event_type )
[docs] def change_pipeline(self, pipeline_id: int) -> None: """ Switches pipeline being used by this application object. """ self.pipeline_id = pipeline_id self.event_store.record_manager.pipeline_id = pipeline_id
def close(self) -> None: # Close the persistence policy. if self._persistence_policy is not None: self._persistence_policy.close() # Close database connection. if self._datastore is not None: self._datastore.close_connection()
[docs] def __enter__(self: T) -> T: """ Supports use of application as context manager. """ return self
[docs] def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """ Closes application when exiting context manager. """ self.close()
@classmethod def reset_connection_after_forking(cls) -> None: pass
[docs] @classmethod def mixin(cls: T, infrastructure_class: type) -> T: """ Returns subclass that inherits also from given infrastructure class. """ return type(cls.__name__, (infrastructure_class, cls), {})
def save( self, aggregates=(), orm_objects_pending_save=(), orm_objects_pending_delete=(), ): new_events = [] if isinstance(aggregates, BaseAggregateRoot): aggregates = [aggregates] for aggregate in aggregates: new_events += aggregate.__batch_pending_events__() process_event = ProcessEvent( domain_events=new_events, orm_objs_pending_save=orm_objects_pending_save, orm_objs_pending_delete=orm_objects_pending_delete, ) new_records = self.record_process_event(process_event) # Find the head notification ID. notifiable_events = [e for e in new_events if e.__notifiable__] head_notification_id = None if len(notifiable_events): record_manager = self.event_store.record_manager notification_id_name = record_manager.notification_id_name notifications = [] for record in new_records: if not hasattr(record, notification_id_name): continue if not isinstance(getattr(record, notification_id_name), int): continue notifications.append( record_manager.create_notification_from_record(record) ) if len(notifications): head_notification_id = notifications[-1]["id"] self.publish_prompt(head_notification_id) for aggregate in aggregates: if self.repository.use_cache: self.repository.put_entity_in_cache(aggregate.id, aggregate) def record_process_event(self, process_event: ProcessEvent) -> List: # Construct event records. event_records = self.construct_event_records( process_event.domain_events, process_event.causal_dependencies ) # Write event records with tracking record. record_manager = self.event_store.record_manager assert isinstance(record_manager, RecordManagerWithTracking) record_manager.write_records( records=event_records, tracking_kwargs=process_event.tracking_kwargs, orm_objs_pending_save=process_event.orm_objs_pending_save, orm_objs_pending_delete=process_event.orm_objs_pending_delete, ) return event_records def construct_event_records( self, pending_events: Iterable[TAggregateEvent], causal_dependencies: Optional[ListOfCausalDependencies], ) -> List: # Convert to event records. sequenced_items = self.event_store.items_from_events(pending_events) record_manager = self.event_store.record_manager assert record_manager assert isinstance(record_manager, RecordManagerWithTracking) event_records = list(record_manager.to_records(sequenced_items)) # Set notification log IDs, and causal dependencies. if len(event_records): # Todo: Maybe keep track of what this probably is, to # avoid query. Like log reader, invalidate on error. if self.set_notification_ids: notification_id_name = record_manager.notification_id_name current_max = record_manager.get_max_notification_id() for domain_event, event_record in zip(pending_events, event_records): if type(domain_event).__notifiable__: current_max += 1 setattr(event_record, notification_id_name, current_max) else: setattr( event_record, notification_id_name, "event-not-notifiable" ) if self.use_causal_dependencies: assert hasattr(record_manager.record_class, "causal_dependencies") causal_dependencies_json = self.event_store.event_mapper.json_dumps( causal_dependencies ).decode("utf8") # Only need first event to carry the dependencies. event_records[0].causal_dependencies = causal_dependencies_json return event_records def publish_prompt(self, head_notification_id=None): prompt = PromptToPull(self.name, self.pipeline_id, head_notification_id) try: publish(prompt) except PromptFailed: raise except Exception as e: raise PromptFailed("{}: {}".format(type(e), str(e)))
[docs]class ApplicationWithConcreteInfrastructure(SimpleApplication): """ Base class for application classes that have actual infrastructure. """
[docs]class Prompt(ActualOccasion): pass
def is_prompt_to_pull(events: IterableOfEvents) -> bool: return isinstance(events, PromptToPull)
[docs]class PromptToPull(Prompt):
[docs] def __init__(self, process_name: str, pipeline_id: int, head_notification_id=None): self.process_name: str = process_name self.pipeline_id: int = pipeline_id self.head_notification_id = head_notification_id
[docs] def __eq__(self, other: object) -> bool: return bool( other and isinstance(other, type(self)) and self.process_name == other.process_name and self.pipeline_id == other.pipeline_id )
[docs] def __repr__(self) -> str: return "{}({}={}, {}={})".format( type(self).__name__, "process_name", self.process_name, "pipeline_id", self.pipeline_id, )