Source code for eventsourcing.application.simple

import os
import zlib
from json import JSONDecoder, JSONEncoder
from typing import (
    Any,
    Dict,
    Generic,
    Iterable,
    List,
    NamedTuple,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
)

from eventsourcing.application.notificationlog import (
    LocalNotificationLog,
    RecordManagerNotificationLog,
)
from eventsourcing.application.pipeline import Pipeable
from eventsourcing.application.policies import PersistencePolicy
from eventsourcing.domain.model.aggregate import BaseAggregateRoot, TAggregateEvent
from eventsourcing.domain.model.entity import (
    TDomainEvent,
    TVersionedEntity,
    TVersionedEvent,
)
from eventsourcing.domain.model.events import DomainEvent, publish
from eventsourcing.exceptions import ProgrammingError, PromptFailed
from eventsourcing.infrastructure.base import (
    AbstractEventStore,
    AbstractRecordManager,
    BaseRecordManager,
    DEFAULT_PIPELINE_ID,
    EVENT_NOT_NOTIFIABLE,
    RecordManagerWithNotifications,
    RecordManagerWithTracking,
    TrackingKwargs,
)
from eventsourcing.infrastructure.datastore import AbstractDatastore
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.factory import InfrastructureFactory
from eventsourcing.infrastructure.sequenceditem import StoredEvent
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
from eventsourcing.utils.cipher.aes import AESCipher
from eventsourcing.utils.random import decode_bytes
from eventsourcing.whitehead import ActualOccasion, IterableOfEvents, T

PersistEventType = Optional[Union[Type[DomainEvent], Tuple[Type[DomainEvent]]]]

CausalDependencies = Dict[str, int]
ListOfCausalDependencies = List[CausalDependencies]


[docs]class ProcessEvent(ActualOccasion, Generic[TDomainEvent]):
[docs] def __init__( self, domain_events: Iterable[TDomainEvent], tracking_kwargs: Optional[TrackingKwargs] = None, causal_dependencies: Optional[ListOfCausalDependencies] = None, orm_objs_pending_save: Sequence[Any] = (), orm_objs_pending_delete: Sequence[Any] = (), ): self.domain_events = domain_events self.tracking_kwargs = tracking_kwargs self.causal_dependencies = causal_dependencies self.orm_objs_pending_save = orm_objs_pending_save self.orm_objs_pending_delete = orm_objs_pending_delete
[docs]class SimpleApplication(Pipeable, Generic[TVersionedEntity, TVersionedEvent]): """ Base class for event sourced applications. Constructs infrastructure objects such as the repository and event store, and also the notification log which presents the application state as a sequence of events. Needs actual infrastructure classes. """ infrastructure_factory_class: Type[InfrastructureFactory] = InfrastructureFactory is_constructed_with_session: bool = False record_manager_class: Optional[Type[AbstractRecordManager]] = None stored_event_record_class: Optional[type] = None snapshot_record_class: Optional[type] = None sequenced_item_class: Optional[Type[NamedTuple]] = None sequenced_item_mapper_class: Optional[Type[SequencedItemMapper]] = None compressor: Any = None json_encoder_class: Optional[Type[JSONEncoder]] = None sort_keys: bool = False json_decoder_class: Optional[Type[JSONDecoder]] = None persist_event_type: Optional[PersistEventType] = None notification_log_section_size: Optional[int] = None use_cache: bool = False event_store_class: Type[EventStore] = EventStore repository_class: Type[EventSourcedRepository] = EventSourcedRepository use_causal_dependencies = False set_notification_ids = False
[docs] def __init__( self, name: str = "", persistence_policy: Optional[PersistencePolicy] = None, persist_event_type: PersistEventType = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[SequencedItemMapper]] = None, record_manager_class: Optional[Type[AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = DEFAULT_PIPELINE_ID, json_encoder_class: Optional[Type[JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False, ): """ Initialises application object. :param name: Name of application. :param persistence_policy: Persistence policy object. :param persist_event_type: Tuple of domain event classes to be persisted. :param cipher_key: Base64 unicode string cipher key. :param compressor: Compressor used to compress serialized event state. :param sequenced_item_class: Named tuple for mapping and recording events. :param sequenced_item_mapper_class: Object class for mapping stored events. :param record_manager_class: Object class for recording stored events. :param stored_event_record_class: Object class for event records. :param event_store_class: Object class uses to store and retrieve domain events. :param snapshot_record_class: Object class used to represent snapshots. :param setup_table: Option to create database tables when application starts. :param contiguous_record_ids: Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps). :param pipeline_id: ID of instance of system pipeline expressions. :param json_encoder_class: Object class used to encode object as JSON strings. :param json_decoder_class: Object class used to decode JSON strings as objects. :param notification_log_section_size: Number of notification items in a section. :param use_cache: Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory). """ self.name = name or type(self).create_name() self.notification_log_section_size = ( notification_log_section_size or type(self).notification_log_section_size ) sequenced_item_class = sequenced_item_class or type(self).sequenced_item_class sequenced_item_class = sequenced_item_class or StoredEvent # type: ignore self.sequenced_item_class = sequenced_item_class assert self.sequenced_item_class is not None self.sequenced_item_mapper_class: Type[SequencedItemMapper] = ( sequenced_item_mapper_class or type(self).sequenced_item_mapper_class or SequencedItemMapper ) self.record_manager_class = ( record_manager_class or type(self).record_manager_class ) self._stored_event_record_class = stored_event_record_class self._snapshot_record_class = snapshot_record_class self.event_store_class = event_store_class or type(self).event_store_class self.json_encoder_class = json_encoder_class or type(self).json_encoder_class self.sort_keys = sort_keys or type(self).sort_keys self.json_decoder_class = json_decoder_class or type(self).json_decoder_class self.persist_event_type = persist_event_type or type(self).persist_event_type self.contiguous_record_ids = contiguous_record_ids self.pipeline_id = pipeline_id self._persistence_policy = persistence_policy self.cipher = self.construct_cipher(cipher_key) self.compressor = compressor or type(self).compressor # Default to using zlib compression when encrypting. if self.cipher and self.compressor is None: self.compressor = zlib self.infrastructure_factory: Optional[ InfrastructureFactory[TVersionedEvent] ] = None self._datastore: Optional[AbstractDatastore] = None self._event_store: Optional[ AbstractEventStore[TVersionedEvent, BaseRecordManager] ] = None self._repository: Optional[ EventSourcedRepository[TVersionedEntity, TVersionedEvent] ] = None self._notification_log: Optional[LocalNotificationLog] = None self.use_cache = use_cache or type(self).use_cache if ( self.record_manager_class or self.infrastructure_factory_class.record_manager_class ): self.construct_infrastructure() if setup_table: self.setup_table() self.construct_notification_log() if self._persistence_policy is None: self.construct_persistence_policy()
@classmethod def create_name(cls): return cls.__name__.lower() @property def datastore(self) -> AbstractDatastore: if self._datastore is None: self._raise_on_missing_infrastructure("datastore") return self._datastore @property def event_store(self) -> AbstractEventStore[TVersionedEvent, BaseRecordManager]: if self._event_store is None: self._raise_on_missing_infrastructure("event_store") return self._event_store @property def repository(self) -> EventSourcedRepository[TVersionedEntity, TVersionedEvent]: if self._repository is None: self._raise_on_missing_infrastructure("repository") return self._repository @property def notification_log(self) -> LocalNotificationLog: if self._notification_log is None: self._raise_on_missing_infrastructure("notification_log") return self._notification_log @property def persistence_policy(self) -> PersistencePolicy: if self._persistence_policy is None: self._raise_on_missing_infrastructure("persistence_policy") return self._persistence_policy def _raise_on_missing_infrastructure(self, what_is_missing): if not isinstance(self, ApplicationWithConcreteInfrastructure): msg = ( "Application class %s is not an subclass of %s." " Try using or inheriting from or mixin() an application" " class with concrete infrastructure such as" " SQLAlchemyApplication or DjangoApplication or AxonApplication." ) % (type(self), ApplicationWithConcreteInfrastructure) else: msg = "Application class %s does not have a %s" % ( type(self).__name__, what_is_missing, ) raise ProgrammingError(msg) def construct_cipher(self, cipher_key_str: Optional[str]) -> Optional[AESCipher]: cipher_key_bytes = decode_bytes( cipher_key_str or os.getenv("CIPHER_KEY", "") or "" ) return AESCipher(cipher_key_bytes) if cipher_key_bytes else None
[docs] def construct_infrastructure(self, *args: Any, **kwargs: Any) -> None: """ Constructs infrastructure for application. """ self.infrastructure_factory = self.construct_infrastructure_factory( *args, **kwargs ) self.construct_datastore() self.construct_event_store() self.construct_repository()
[docs] def construct_infrastructure_factory( self, *args: Any, **kwargs: Any ) -> InfrastructureFactory: """ Constructs infrastructure factory object. """ factory_class = self.infrastructure_factory_class assert issubclass(factory_class, InfrastructureFactory) integer_sequenced_record_class = ( self._stored_event_record_class or self.stored_event_record_class ) snapshot_record_class = ( self._snapshot_record_class or self.snapshot_record_class ) return factory_class( # type:ignore # multiple values for keyword argument record_manager_class=self.record_manager_class, integer_sequenced_record_class=integer_sequenced_record_class, snapshot_record_class=snapshot_record_class, sequenced_item_class=self.sequenced_item_class, sequenced_item_mapper_class=self.sequenced_item_mapper_class, json_encoder_class=self.json_encoder_class, sort_keys=self.sort_keys, json_decoder_class=self.json_decoder_class, contiguous_record_ids=self.contiguous_record_ids, application_name=self.name, pipeline_id=self.pipeline_id, event_store_class=self.event_store_class, *args, **kwargs )
[docs] def construct_datastore(self) -> None: """ Constructs datastore object (used to create and drop database tables). """ assert self.infrastructure_factory self._datastore = self.infrastructure_factory.construct_datastore()
[docs] def construct_event_store(self) -> None: """ Constructs event store object. """ assert self.infrastructure_factory factory = self.infrastructure_factory self._event_store = factory.construct_integer_sequenced_event_store( self.cipher, self.compressor )
[docs] def construct_repository(self, **kwargs: Any) -> None: """ Constructs repository object. """ assert self.repository_class self._repository = self.repository_class( event_store=self.event_store, use_cache=self.use_cache, **kwargs )
[docs] def setup_table(self) -> None: """ Sets up the database table using event store's record class. """ if self._datastore is not None: record_class = self.event_store.record_manager.record_class self.datastore.setup_table(record_class)
[docs] def drop_table(self) -> None: """ Drops the database table using event store's record class. """ if self._datastore is not None: record_class = self.event_store.record_manager.record_class self.datastore.drop_table(record_class)
[docs] def construct_notification_log(self) -> None: """ Constructs notification log object. """ self._notification_log = RecordManagerNotificationLog( self.event_store.record_manager, section_size=self.notification_log_section_size, )
[docs] def construct_persistence_policy(self) -> None: """ Constructs persistence policy object. """ self._persistence_policy = PersistencePolicy( event_store=self.event_store, persist_event_type=self.persist_event_type )
[docs] def change_pipeline(self, pipeline_id: int) -> None: """ Switches pipeline being used by this application object. """ self.pipeline_id = pipeline_id self.event_store.record_manager.pipeline_id = pipeline_id
[docs] def close(self) -> None: """ Closes the application for further use. The persistence policy is closed, and the application's connection to the database is closed. """ # Close the persistence policy. if self._persistence_policy is not None: self._persistence_policy.close() # Close database connection. if self._datastore is not None: self._datastore.close_connection()
[docs] def __enter__(self: T) -> T: """ Supports use of application as context manager. """ return self
[docs] def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """ Closes application when exiting context manager. """ self.close()
@classmethod def reset_connection_after_forking(cls) -> None: pass
[docs] @classmethod def mixin(cls: T, infrastructure_class: type) -> T: """ Returns subclass that inherits also from given infrastructure class. """ return type(cls.__name__, (infrastructure_class, cls), {})
[docs] def save( self, aggregates=(), orm_objects_pending_save=(), orm_objects_pending_delete=(), ) -> None: """ Saves state of aggregates, and ORM objects. All of the pending events of the aggregates, along with the ORM objects, are recorded atomically as a process event. Then a "prompt to pull" is published, and, if the repository cache is in use, then puts the aggregates in the cache. :param aggregates: One or many aggregates. :param orm_objects_pending_save: Sequence of ORM objects to be saved. :param orm_objects_pending_delete: Sequance of ORM objects to be deleted. """ # Collect pending events from the aggregates. new_events = [] if isinstance(aggregates, BaseAggregateRoot): aggregates = [aggregates] else: pass # Todo: Make sure record manager supports writing events from more than # one aggregate atomically (e.g. Cassandra and EventStore don't). for aggregate in aggregates: new_events += aggregate.__batch_pending_events__() process_event = ProcessEvent( domain_events=new_events, orm_objs_pending_save=orm_objects_pending_save, orm_objs_pending_delete=orm_objects_pending_delete, ) new_records = self.record_process_event(process_event) record_manager = self.event_store.record_manager if isinstance(record_manager, RecordManagerWithNotifications): # Find the head notification ID. notifiable_events = [e for e in new_events if e.__notifiable__] head_notification_id = None if len(notifiable_events): notification_id_name = record_manager.notification_id_name notifications = [] for record in new_records: if not hasattr(record, notification_id_name): continue if not isinstance(getattr(record, notification_id_name), int): continue notifications.append( record_manager.create_notification_from_record(record) ) if len(notifications): head_notification_id = notifications[-1]["id"] self.publish_prompt(head_notification_id) if self.repository.use_cache: for aggregate in aggregates: self.repository.put_entity_in_cache(aggregate.id, aggregate)
[docs] def record_process_event(self, process_event: ProcessEvent) -> List: """ Records a process event. Converts the domain events of the process event to event record objects, and writes the event records and the ORM objects to the database using the application's event store's record manager. :param process_event: An instance of :class:`~eventsourcing.application.simple.ProcessEvent` :return: A list of event records. """ # Construct event records. event_records = self.construct_event_records( process_event.domain_events, process_event.causal_dependencies ) # Write event records with tracking record. record_manager = self.event_store.record_manager assert isinstance(record_manager, RecordManagerWithTracking) record_manager.write_records( records=event_records, tracking_kwargs=process_event.tracking_kwargs, orm_objs_pending_save=process_event.orm_objs_pending_save, orm_objs_pending_delete=process_event.orm_objs_pending_delete, ) return event_records
[docs] def construct_event_records( self, pending_events: Iterable[TAggregateEvent], causal_dependencies: Optional[ListOfCausalDependencies], ) -> List: """ Constructs event records from domain events. :param pending_events: An iterable of domain events. :param causal_dependencies: A list of causal dependencies. :return: A list of event records. """ # Convert to event records. sequenced_items = self.event_store.items_from_events(pending_events) record_manager = self.event_store.record_manager assert record_manager assert isinstance(record_manager, RecordManagerWithTracking) event_records = list(record_manager.to_records(sequenced_items)) # Set notification log IDs, and causal dependencies. if len(event_records): # Todo: Maybe keep track of what this probably is, to # avoid query. Like log reader, invalidate on error. if self.set_notification_ids: notification_id_name = record_manager.notification_id_name current_max = record_manager.get_max_notification_id() for domain_event, event_record in zip(pending_events, event_records): if type(domain_event).__notifiable__: current_max += 1 setattr(event_record, notification_id_name, current_max) else: setattr( event_record, notification_id_name, EVENT_NOT_NOTIFIABLE ) else: if any((not e.__notifiable__ for e in pending_events)): raise Exception( "Can't set __notifiable__=False without " "set_notification_ids=True " ) if self.use_causal_dependencies: assert hasattr(record_manager.record_class, "causal_dependencies") causal_dependencies_json = self.event_store.event_mapper.json_dumps( causal_dependencies ).decode("utf8") # Only need first event to carry the dependencies. event_records[0].causal_dependencies = causal_dependencies_json return event_records
[docs] def publish_prompt(self, head_notification_id=None): """ Publishes a "prompt to pull" (instance of :class:`~eventsourcing.application.simple.PromptToPull`). :param head_notification_id: Maximum notification ID of event records to be pulled. """ prompt = PromptToPull(self.name, self.pipeline_id, head_notification_id) try: publish(prompt) except PromptFailed: raise except Exception as e: raise PromptFailed("{}: {}".format(type(e), str(e)))
[docs]class ApplicationWithConcreteInfrastructure(SimpleApplication): """ Base class for application classes that have actual infrastructure. """
[docs]class Prompt(ActualOccasion): pass
def is_prompt_to_pull(events: IterableOfEvents) -> bool: return isinstance(events, PromptToPull)
[docs]class PromptToPull(Prompt):
[docs] def __init__(self, process_name: str, pipeline_id: int, head_notification_id=None): self.process_name: str = process_name self.pipeline_id: int = pipeline_id self.head_notification_id = head_notification_id
[docs] def __eq__(self, other: object) -> bool: return bool( other and isinstance(other, type(self)) and self.process_name == other.process_name and self.pipeline_id == other.pipeline_id )
[docs] def __repr__(self) -> str: return "{}({}={}, {}={}, {}={})".format( type(self).__name__, "process_name", self.process_name, "pipeline_id", self.pipeline_id, "head_notification_id", self.head_notification_id, )