application

The application layer brings together the domain and infrastructure layers.

simple

class eventsourcing.application.simple.ProcessEvent(domain_events: Iterable[TDomainEvent], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, causal_dependencies: Optional[List[Dict[str, int]]] = None, orm_objs_pending_save: Sequence[Any] = (), orm_objs_pending_delete: Sequence[Any] = ())[source]

Bases: eventsourcing.whitehead.ActualOccasion, typing.Generic

__init__(domain_events: Iterable[TDomainEvent], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, causal_dependencies: Optional[List[Dict[str, int]]] = None, orm_objs_pending_save: Sequence[Any] = (), orm_objs_pending_delete: Sequence[Any] = ())[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.application.simple.SimpleApplication(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]

Bases: eventsourcing.application.pipeline.Pipeable, typing.Generic

Base class for event sourced applications.

Constructs infrastructure objects such as the repository and event store, and also the notification log which presents the application state as a sequence of events.

Needs actual infrastructure classes.

infrastructure_factory_class

alias of eventsourcing.infrastructure.factory.InfrastructureFactory

repository_class

alias of eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

__init__(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]

Initialises application object.

Parameters:
  • name – Name of application.
  • persistence_policy – Persistence policy object.
  • persist_event_type – Tuple of domain event classes to be persisted.
  • cipher_key – Base64 unicode string cipher key.
  • compressor – Compressor used to compress serialized event state.
  • sequenced_item_class – Named tuple for mapping and recording events.
  • sequenced_item_mapper_class – Object class for mapping stored events.
  • record_manager_class – Object class for recording stored events.
  • stored_event_record_class – Object class for event records.
  • event_store_class – Object class uses to store and retrieve domain events.
  • snapshot_record_class – Object class used to represent snapshots.
  • setup_table – Option to create database tables when application starts.
  • contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
  • pipeline_id – ID of instance of system pipeline expressions.
  • json_encoder_class – Object class used to encode object as JSON strings.
  • json_decoder_class – Object class used to decode JSON strings as objects.
  • notification_log_section_size – Number of notification items in a section.
  • use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
event_store_class

alias of eventsourcing.infrastructure.eventstore.EventStore

construct_infrastructure(*args, **kwargs) → None[source]

Constructs infrastructure for application.

construct_infrastructure_factory(*args, **kwargs) → eventsourcing.infrastructure.factory.InfrastructureFactory[source]

Constructs infrastructure factory object.

construct_datastore() → None[source]

Constructs datastore object (used to create and drop database tables).

construct_event_store() → None[source]

Constructs event store object.

construct_repository(**kwargs) → None[source]

Constructs repository object.

setup_table() → None[source]

Sets up the database table using event store’s record class.

drop_table() → None[source]

Drops the database table using event store’s record class.

construct_notification_log() → None[source]

Constructs notification log object.

construct_persistence_policy() → None[source]

Constructs persistence policy object.

change_pipeline(pipeline_id: int) → None[source]

Switches pipeline being used by this application object.

close() → None[source]

Closes the application for further use.

The persistence policy is closed, and the application’s connection to the database is closed.

__enter__() → T[source]

Supports use of application as context manager.

__exit__(exc_type: Any, exc_val: Any, exc_tb: Any) → None[source]

Closes application when exiting context manager.

classmethod mixin(infrastructure_class: type) → T[source]

Returns subclass that inherits also from given infrastructure class.

save(aggregates=(), orm_objects_pending_save=(), orm_objects_pending_delete=()) → None[source]

Saves state of aggregates, and ORM objects.

All of the pending events of the aggregates, along with the ORM objects, are recorded atomically as a process event.

Then a “prompt to pull” is published, and, if the repository cache is in use, then puts the aggregates in the cache.

Parameters:
  • aggregates – One or many aggregates.
  • orm_objects_pending_save – Sequence of ORM objects to be saved.
  • orm_objects_pending_delete – Sequance of ORM objects to be deleted.
record_process_event(process_event: eventsourcing.application.simple.ProcessEvent) → List[T][source]

Records a process event.

Converts the domain events of the process event to event record objects, and writes the event records and the ORM objects to the database using the application’s event store’s record manager.

Parameters:process_event – An instance of ProcessEvent
Returns:A list of event records.
construct_event_records(pending_events: Iterable[TAggregateEvent], causal_dependencies: Optional[List[Dict[str, int]]]) → List[T][source]

Constructs event records from domain events.

Parameters:
  • pending_events – An iterable of domain events.
  • causal_dependencies – A list of causal dependencies.
Returns:

A list of event records.

publish_prompt(head_notification_id=None)[source]

Publishes a “prompt to pull” (instance of PromptToPull).

Parameters:head_notification_id – Maximum notification ID of event records to be pulled.
class eventsourcing.application.simple.ApplicationWithConcreteInfrastructure(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]

Bases: eventsourcing.application.simple.SimpleApplication

Base class for application classes that have actual infrastructure.

class eventsourcing.application.simple.Prompt[source]

Bases: eventsourcing.whitehead.ActualOccasion

class eventsourcing.application.simple.PromptToPull(process_name: str, pipeline_id: int, head_notification_id=None)[source]

Bases: eventsourcing.application.simple.Prompt

__init__(process_name: str, pipeline_id: int, head_notification_id=None)[source]

Initialize self. See help(type(self)) for accurate signature.

__eq__(other: object) → bool[source]

Return self==value.

__repr__() → str[source]

Return repr(self).

policies

class eventsourcing.application.policies.PersistencePolicy(event_store: eventsourcing.infrastructure.base.AbstractEventStore, persist_event_type: Union[type, Tuple, None] = None)[source]

Bases: object

Stores events of given type to given event store, whenever they are published.

__init__(event_store: eventsourcing.infrastructure.base.AbstractEventStore, persist_event_type: Union[type, Tuple, None] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.application.policies.SnapshottingPolicy(repository: eventsourcing.domain.model.repository.AbstractEntityRepository, snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager][eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager], persist_event_type: Union[type, Tuple, None] = (<class 'eventsourcing.domain.model.events.EventWithOriginatorVersion'>,), period: int = 0)[source]

Bases: typing.Generic

__init__(repository: eventsourcing.domain.model.repository.AbstractEntityRepository, snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager][eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager], persist_event_type: Union[type, Tuple, None] = (<class 'eventsourcing.domain.model.events.EventWithOriginatorVersion'>,), period: int = 0)[source]

Initialize self. See help(type(self)) for accurate signature.

notificationlog

class eventsourcing.application.notificationlog.Section(section_id: str, items: List[T], previous_id: Optional[str] = None, next_id: Optional[str] = None)[source]

Bases: object

Section of a notification log.

Contains items, and has an ID.

May also have either IDs of previous and next sections of the notification log.

__init__(section_id: str, items: List[T], previous_id: Optional[str] = None, next_id: Optional[str] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.application.notificationlog.AbstractNotificationLog[source]

Bases: abc.ABC

Presents a sequence of sections from a sequence of notifications.

__getitem__(section_id: str) → eventsourcing.application.notificationlog.Section[source]

Get section of notification log.

Parameters:section_id – ID of a section of the notification log.
section_size

Size of section of notification log.

class eventsourcing.application.notificationlog.LocalNotificationLog(section_size: Optional[int] = None)[source]

Bases: eventsourcing.application.notificationlog.AbstractNotificationLog

Presents a sequence of sections from a sequence of notifications.

__init__(section_size: Optional[int] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

section_size

Size of section of notification log.

__getitem__(section_id: str) → eventsourcing.application.notificationlog.Section[source]

Get section of notification log.

Parameters:section_id – ID of a section of the notification log.
get_next_position() → int[source]

Returns next unoccupied position in zero-based sequence.

Since the notification IDs are one-based, the next position in the zero-based sequence equals the current max notification ID which is 1-based. If there are no records, the max notification ID will be None, and the next position is zero.

Returns:Non-negative integer.
Return type:int
get_items(start: int, stop: int) → Sequence[Any][source]

Returns items for section.

class eventsourcing.application.notificationlog.RecordManagerNotificationLog(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, section_size: Optional[int] = None)[source]

Bases: eventsourcing.application.notificationlog.LocalNotificationLog

Local notification log that gets notifications from a record manager.

__init__(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, section_size: Optional[int] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

get_items(start: int, stop: Optional[int]) → Sequence[Any][source]

Returns notification in log.

Parameters:
  • start – Inclusive start position in log.
  • stop – Inclusive stop position in log.
Returns:

get_next_position() → int[source]

Returns next unoccupied position in zero-based sequence.

Since the notification IDs are one-based, the next position in the zero-based sequence equals the current max notification ID which is 1-based. If there are no records, the max notification ID will be None, and the next position is zero.

Returns:Non-negative integer.
Return type:int
class eventsourcing.application.notificationlog.BigArrayNotificationLog(big_array: eventsourcing.domain.model.array.BigArray, section_size: int)[source]

Bases: eventsourcing.application.notificationlog.LocalNotificationLog

Notification log that uses the BigArray class.

__init__(big_array: eventsourcing.domain.model.array.BigArray, section_size: int)[source]

Initialize self. See help(type(self)) for accurate signature.

get_items(start: int, stop: int) → Sequence[Any][source]

Returns items for section.

get_next_position() → int[source]

Returns next unoccupied position in zero-based sequence.

Returns:Non-negative integer.
Return type:int
class eventsourcing.application.notificationlog.NotificationLogReader(notification_log: eventsourcing.application.notificationlog.AbstractNotificationLog, use_direct_query_if_available: bool = False)[source]

Bases: abc.ABC

__init__(notification_log: eventsourcing.application.notificationlog.AbstractNotificationLog, use_direct_query_if_available: bool = False)[source]

Initialize self. See help(type(self)) for accurate signature.

seek(position: int) → None[source]

Sets position of reader in notification log sequence.

This represents the position of the last notification read by the reader. The next notification returned by the reader will be the next position.

Parameters:position (int) – Position is notification log sequence.
Raises:ValueError – if the position is less than zero
initial_section_id

Returns initial section ID used to start getting linked sections from the notification log.

Slight departure from Vaughn Vernon’s design by not using ‘current’ as initial section ID, but a section ID that just includes the “next” position, which the notification log can use to return the section containing this position. This avoids lengthy back- tracking when reader has a lot of notifications to catch-up on.

This property has been extracted in order to allow a subclass to adjust this default behaviour.

It would be possible to calculate the actual section ID from the current reader position. Using section ID of an actual section may hit a cache and avoid troubling the server, but the reader would need to know the section size of the notification log it is reading. If we don’t know section size, perhaps it is a remote notification log, we can use ‘current’ to hit a cache. In future, it might be possible to ask the notification log to disclose it’s section size, or compute an actual section ID for a given position.

Returns:A notification log section ID.
Return type:str
read_list(advance_by: Optional[int] = None) → List[Dict[str, Any]][source]

Deprecated in 8.0.0.

Please use list_notifications() instead.

read_items(stop_index: Optional[int] = None, advance_by: Optional[int] = None) → Iterator[Dict[str, Any]][source]

Deprecated in 8.0.0.

Please use iter_notifications() instead.

django

class eventsourcing.application.django.DjangoApplication(tracking_record_class: Any = None, *args, **kwargs)[source]

Bases: eventsourcing.application.simple.ApplicationWithConcreteInfrastructure

infrastructure_factory_class

alias of eventsourcing.infrastructure.django.factory.DjangoInfrastructureFactory

__init__(tracking_record_class: Any = None, *args, **kwargs)[source]

Initialises application object.

Parameters:
  • name – Name of application.
  • persistence_policy – Persistence policy object.
  • persist_event_type – Tuple of domain event classes to be persisted.
  • cipher_key – Base64 unicode string cipher key.
  • compressor – Compressor used to compress serialized event state.
  • sequenced_item_class – Named tuple for mapping and recording events.
  • sequenced_item_mapper_class – Object class for mapping stored events.
  • record_manager_class – Object class for recording stored events.
  • stored_event_record_class – Object class for event records.
  • event_store_class – Object class uses to store and retrieve domain events.
  • snapshot_record_class – Object class used to represent snapshots.
  • setup_table – Option to create database tables when application starts.
  • contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
  • pipeline_id – ID of instance of system pipeline expressions.
  • json_encoder_class – Object class used to encode object as JSON strings.
  • json_decoder_class – Object class used to decode JSON strings as objects.
  • notification_log_section_size – Number of notification items in a section.
  • use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
construct_infrastructure(*args, **kwargs) → None[source]

Constructs infrastructure for application.

classmethod reset_connection_after_forking() → None[source]

Resets database connection after forking.

popo

class eventsourcing.application.popo.PopoApplication(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]

Bases: eventsourcing.application.simple.ApplicationWithConcreteInfrastructure

infrastructure_factory_class

alias of eventsourcing.infrastructure.popo.factory.PopoInfrastructureFactory

sequenced_item_mapper_class

alias of eventsourcing.infrastructure.popo.mapper.SequencedItemMapperForPopo

stored_event_record_class

alias of eventsourcing.infrastructure.popo.records.StoredEventRecord

sqlalchemy

class eventsourcing.application.sqlalchemy.SQLAlchemyApplication(uri: Optional[str] = None, session: Optional[Any] = None, tracking_record_class: Any = None, **kwargs)[source]

Bases: eventsourcing.application.simple.ApplicationWithConcreteInfrastructure

infrastructure_factory_class

alias of eventsourcing.infrastructure.sqlalchemy.factory.SQLAlchemyInfrastructureFactory

stored_event_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.StoredEventRecord

snapshot_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.EntitySnapshotRecord

__init__(uri: Optional[str] = None, session: Optional[Any] = None, tracking_record_class: Any = None, **kwargs)[source]

Initialises application object.

Parameters:
  • name – Name of application.
  • persistence_policy – Persistence policy object.
  • persist_event_type – Tuple of domain event classes to be persisted.
  • cipher_key – Base64 unicode string cipher key.
  • compressor – Compressor used to compress serialized event state.
  • sequenced_item_class – Named tuple for mapping and recording events.
  • sequenced_item_mapper_class – Object class for mapping stored events.
  • record_manager_class – Object class for recording stored events.
  • stored_event_record_class – Object class for event records.
  • event_store_class – Object class uses to store and retrieve domain events.
  • snapshot_record_class – Object class used to represent snapshots.
  • setup_table – Option to create database tables when application starts.
  • contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
  • pipeline_id – ID of instance of system pipeline expressions.
  • json_encoder_class – Object class used to encode object as JSON strings.
  • json_decoder_class – Object class used to decode JSON strings as objects.
  • notification_log_section_size – Number of notification items in a section.
  • use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
construct_infrastructure(*args, **kwargs) → None[source]

Constructs infrastructure for application.

construct_infrastructure_factory(*args, **kwargs) → eventsourcing.infrastructure.factory.InfrastructureFactory[source]

Constructs infrastructure factory object.

construct_datastore() → None[source]

Constructs datastore object (used to create and drop database tables).

process

class eventsourcing.application.process.PromptToQuit[source]

Bases: eventsourcing.application.simple.Prompt

class eventsourcing.application.process.WrappedRepository(repository: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository[~TAggregate, ~TAggregateEvent][TAggregate, TAggregateEvent])[source]

Bases: typing.Generic

Used to wrap an event sourced repository for use in process application policy so that use of, and changes to, domain model aggregates can be automatically detected and recorded.

Implements a “dictionary like” interface, so that aggregates can be accessed by ID.

__init__(repository: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository[~TAggregate, ~TAggregateEvent][TAggregate, TAggregateEvent]) → None[source]

Initialize self. See help(type(self)) for accurate signature.

save_orm_obj(orm_obj: Any) → None[source]

Includes orm_obj in “process event”, so that projections into custom ORM objects is as reliable with respect to sudden restarts as “normal” domain event processing in a process application.

delete_orm_obj(orm_obj: Any) → None[source]

Includes orm_obj in “process event”, so that projections into custom ORM objects is as reliable with respect to sudden restarts as “normal” domain event processing in a process application.

class eventsourcing.application.process.ProcessApplication(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]

Bases: eventsourcing.application.simple.SimpleApplication

__init__(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]

Initialises application object.

Parameters:
  • name – Name of application.
  • persistence_policy – Persistence policy object.
  • persist_event_type – Tuple of domain event classes to be persisted.
  • cipher_key – Base64 unicode string cipher key.
  • compressor – Compressor used to compress serialized event state.
  • sequenced_item_class – Named tuple for mapping and recording events.
  • sequenced_item_mapper_class – Object class for mapping stored events.
  • record_manager_class – Object class for recording stored events.
  • stored_event_record_class – Object class for event records.
  • event_store_class – Object class uses to store and retrieve domain events.
  • snapshot_record_class – Object class used to represent snapshots.
  • setup_table – Option to create database tables when application starts.
  • contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
  • pipeline_id – ID of instance of system pipeline expressions.
  • json_encoder_class – Object class used to encode object as JSON strings.
  • json_decoder_class – Object class used to decode JSON strings as objects.
  • notification_log_section_size – Number of notification items in a section.
  • use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
notification_log_reader_class

alias of eventsourcing.application.notificationlog.NotificationLogReader

close() → None[source]

Closes the application for further use.

The persistence policy is closed, and the application’s connection to the database is closed.

publish_prompt_for_events(_: Optional[Iterable[eventsourcing.whitehead.ActualOccasion]] = None) → None[source]

Publishes prompt for a given event.

Used to prompt downstream process application when an event is published by this application’s model, which can happen when application command methods, rather than the process policy, are called.

Wraps exceptions with PromptFailed, to avoid application policy exceptions being seen directly in other applications when running synchronously in single thread.

follow(upstream_application_name: str, notification_log: eventsourcing.application.notificationlog.AbstractNotificationLog) → None[source]

Sets up process application to follow the given notification log of an upstream application.

Parameters:
  • upstream_application_name – Name of the upstream application.
  • notification_log – Notification log that will be processed.
run(prompt: Optional[eventsourcing.application.simple.Prompt] = None, advance_by: Optional[int] = None) → int[source]

Pulls event notifications from notification logs being followed by this process application, and processes the contained domain events.

Parameters:
  • prompt – Optional prompt, specifying a particular notification log.
  • advance_by – Maximum event notifications to process.
Returns:

Returns number of events that have been processed.

check_causal_dependencies(upstream_name, causal_dependencies_json)[source]

Checks the causal dependencies are satisfied (have already been processed).

Parameters:
  • upstream_name – Name of the upstream application being processed.
  • causal_dependencies_json – Pipelines and positions in notification logs.
Raises:

CausalDependencyFailed – If causal dependencies are not satisfied.

process_upstream_event(domain_event: TAggregateEvent, notification_id: int, upstream_name: str) → Tuple[List[TAggregateEvent], List[T]][source]

Processes given domain event from an upstream notification log.

Calls the process application policy, and then records a process event, hence recording atomically all new domain events created by the call to the policy along with any ORM objects that may result.

Parameters:
  • domain_event – Domain event to be processed.
  • notification_id – Position in notification log.
  • upstream_name – Name of upstream application.
Returns:

Returns a list of new domain events.

event_from_notification(notification: Dict[str, Any]) → TAggregateEvent[source]

Reconstructs domain event from an event notification.

Parameters:notification – The event notification.
Returns:A domain event.
call_policy(domain_event: TAggregateEvent) → Tuple[List[TAggregateEvent], List[Dict[str, int]], List[Any], List[Any]][source]

Calls the process application policy with the given domain event.

Parameters:domain_event – Domain event that will be given to the policy.
Returns:Returns a list of domain events, and a list of causal dependencies.
policy(repository: eventsourcing.application.process.WrappedRepository[~TAggregate, ~TAggregateEvent][TAggregate, TAggregateEvent], event: TAggregateEvent) → Union[TAggregate, Sequence[TAggregate], None][source]

Empty method, can be overridden in subclasses to implement concrete policy.

collect_pending_events(aggregates: Sequence[TAggregate]) → List[TAggregateEvent][source]

Collects all the pending events from the given sequence of aggregates.

Parameters:aggregates – Sequence of aggregates.
Returns:Returns a list of domain events.
setup_table() → None[source]

Sets up the database table using event store’s record class.

drop_table() → None[source]

Drops the database table using event store’s record class.

class eventsourcing.application.process.ProcessApplicationWithSnapshotting(snapshot_period: int = 0, **kwargs)[source]

Bases: eventsourcing.application.snapshotting.SnapshottingApplication, eventsourcing.application.process.ProcessApplication

Supplements process applications that will use snapshotting.

take_snapshots(new_events: Sequence[TAggregateEvent]) → None[source]

Takes snapshot of aggregates, according to the policy.

Parameters:new_events – Domain events used to detect if a snapshot is to be taken.

decorators

eventsourcing.application.decorators.applicationpolicy(arg: Callable) → Callable[source]

Decorator for application policy method.

Allows policy to be built up from methods registered for different event classes.

eventsourcing.application.decorators.applicationpolicy2(arg: Callable) → Callable[source]

This one doesn’t use weakrefs.

command

class eventsourcing.application.command.CommandProcess(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]

Bases: eventsourcing.application.process.ProcessApplication

persist_event_type

alias of eventsourcing.domain.model.command.Command.Event

pipeline

class eventsourcing.application.pipeline.PipelineExpression(left: Union[PipelineExpression, PipeableMetaclass], right: Union[PipelineExpression, PipeableMetaclass])[source]

Bases: object

Implements a left-to-right association between two objects.

__init__(left: Union[PipelineExpression, PipeableMetaclass], right: Union[PipelineExpression, PipeableMetaclass])[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.application.pipeline.PipeableMetaclass[source]

Bases: abc.ABCMeta

Meta class for pipeable classes.

__or__(other: Union[PipelineExpression, PipeableMetaclass]) → eventsourcing.application.pipeline.PipelineExpression[source]

Implements bitwise or operator ‘|’ as a pipe between pipeable classes.

class eventsourcing.application.pipeline.Pipeable[source]

Bases: object

Base class for pipeable classes.

snapshotting

class eventsourcing.application.snapshotting.SnapshottingApplication(snapshot_period: int = 0, **kwargs)[source]

Bases: eventsourcing.application.simple.SimpleApplication

__init__(snapshot_period: int = 0, **kwargs)[source]

Initialises application object.

Parameters:
  • name – Name of application.
  • persistence_policy – Persistence policy object.
  • persist_event_type – Tuple of domain event classes to be persisted.
  • cipher_key – Base64 unicode string cipher key.
  • compressor – Compressor used to compress serialized event state.
  • sequenced_item_class – Named tuple for mapping and recording events.
  • sequenced_item_mapper_class – Object class for mapping stored events.
  • record_manager_class – Object class for recording stored events.
  • stored_event_record_class – Object class for event records.
  • event_store_class – Object class uses to store and retrieve domain events.
  • snapshot_record_class – Object class used to represent snapshots.
  • setup_table – Option to create database tables when application starts.
  • contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
  • pipeline_id – ID of instance of system pipeline expressions.
  • json_encoder_class – Object class used to encode object as JSON strings.
  • json_decoder_class – Object class used to decode JSON strings as objects.
  • notification_log_section_size – Number of notification items in a section.
  • use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
construct_event_store() → None[source]

Constructs event store object.

construct_repository(**kwargs) → None[source]

Constructs repository object.

construct_persistence_policy() → None[source]

Constructs persistence policy object.

setup_table() → None[source]

Sets up the database table using event store’s record class.

drop_table() → None[source]

Drops the database table using event store’s record class.

close() → None[source]

Closes the application for further use.

The persistence policy is closed, and the application’s connection to the database is closed.