
The application layer brings together the domain and infrastructure layers.


class eventsourcing.application.simple.ProcessEvent(domain_events: Iterable[TDomainEvent], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, causal_dependencies: Optional[List[Dict[str, int]]] = None, orm_objs_pending_save: Sequence[Any] = (), orm_objs_pending_delete: Sequence[Any] = ())[source]

Bases: eventsourcing.whitehead.ActualOccasion, typing.Generic

__init__(domain_events: Iterable[TDomainEvent], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, causal_dependencies: Optional[List[Dict[str, int]]] = None, orm_objs_pending_save: Sequence[Any] = (), orm_objs_pending_delete: Sequence[Any] = ())[source]

class eventsourcing.application.simple.SimpleApplication(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[], Tuple[Type[]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]

Bases: eventsourcing.application.pipeline.Pipeable, typing.Generic

Base class for event sourced applications.

Constructs infrastructure objects such as the repository and event store, and also the notification log which presents the application state as a sequence of events.

Needs actual infrastructure classes.


alias of eventsourcing.infrastructure.factory.InfrastructureFactory


alias of eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

__init__(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[], Tuple[Type[]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]

Initialises application object.

  • name – Name of application.
  • persistence_policy – Persistence policy object.
  • persist_event_type – Tuple of domain event classes to be persisted.
  • cipher_key – Base64 unicode string cipher key.
  • compressor – Compressor used to compress serialized event state.
  • sequenced_item_class – Named tuple for mapping and recording events.
  • sequenced_item_mapper_class – Object class for mapping stored events.
  • record_manager_class – Object class for recording stored events.
  • stored_event_record_class – Object class for event records.
  • event_store_class – Object class uses to store and retrieve domain events.
  • snapshot_record_class – Object class used to represent snapshots.
  • setup_table – Option to create database tables when application starts.
  • contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
  • pipeline_id – ID of instance of system pipeline expressions.
  • json_encoder_class – Object class used to encode object as JSON strings.
  • json_decoder_class – Object class used to decode JSON strings as objects.
  • notification_log_section_size – Number of notification items in a section.
  • use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).

alias of eventsourcing.infrastructure.eventstore.EventStore

construct_infrastructure(*args, **kwargs) → None[source]

Constructs infrastructure for application.

construct_infrastructure_factory(*args, **kwargs) → eventsourcing.infrastructure.factory.InfrastructureFactory[source]

Constructs infrastructure factory object.

construct_datastore() → None[source]

Constructs datastore object (which helps by creating and dropping tables).

construct_event_store() → None[source]

Constructs event store object.

construct_repository(**kwargs) → None[source]

Constructs repository object.

setup_table() → None[source]

Sets up the database table using event store’s record class.

drop_table() → None[source]

Drops the database table using event store’s record class.

construct_notification_log() → None[source]

Constructs notification log object.

construct_persistence_policy() → None[source]

Constructs persistence policy object.

change_pipeline(pipeline_id: int) → None[source]

Switches pipeline being used by this application object.

__enter__() → T[source]

Supports use of application as context manager.

__exit__(exc_type: Any, exc_val: Any, exc_tb: Any) → None[source]

Closes application when exiting context manager.

classmethod mixin(infrastructure_class: type) → T[source]

Returns subclass that inherits also from given infrastructure class.

class eventsourcing.application.simple.ApplicationWithConcreteInfrastructure(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[], Tuple[Type[]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]

Bases: eventsourcing.application.simple.SimpleApplication

Base class for application classes that have actual infrastructure.

class eventsourcing.application.simple.Prompt[source]

Bases: eventsourcing.whitehead.ActualOccasion

class eventsourcing.application.simple.PromptToPull(process_name: str, pipeline_id: int, head_notification_id=None)[source]

Bases: eventsourcing.application.simple.Prompt

__init__(process_name: str, pipeline_id: int, head_notification_id=None)[source]

__eq__(other: object) → bool[source]

__repr__() → str[source]

class eventsourcing.application.policies.PersistencePolicy(event_store: eventsourcing.infrastructure.base.AbstractEventStore, persist_event_type: Union[type, Tuple, None] = None)[source]

Bases: object

Stores events of given type to given event store, whenever they are published.

__init__(event_store: eventsourcing.infrastructure.base.AbstractEventStore, persist_event_type: Union[type, Tuple, None] = None)[source]

class eventsourcing.application.policies.SnapshottingPolicy(repository: eventsourcing.domain.model.repository.AbstractEntityRepository, snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[, eventsourcing.infrastructure.base.AbstractRecordManager][, eventsourcing.infrastructure.base.AbstractRecordManager], persist_event_type: Union[type, Tuple, None] = (<class ''>,), period: int = 0)[source]

Bases: typing.Generic

__init__(repository: eventsourcing.domain.model.repository.AbstractEntityRepository, snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[, eventsourcing.infrastructure.base.AbstractRecordManager][, eventsourcing.infrastructure.base.AbstractRecordManager], persist_event_type: Union[type, Tuple, None] = (<class ''>,), period: int = 0)[source]

Initialize self. See help(type(self)) for accurate signature.


class eventsourcing.application.notificationlog.Section(section_id: str, items: List[T], previous_id: Optional[str] = None, next_id: Optional[str] = None)[source]

Bases: object

Section of a notification log.

Contains items, and has an ID.

May also have either IDs of previous and next sections of the notification log.

__init__(section_id: str, items: List[T], previous_id: Optional[str] = None, next_id: Optional[str] = None)[source]

class eventsourcing.application.notificationlog.AbstractNotificationLog[source]

Bases: abc.ABC

Presents a sequence of sections from a sequence of notifications.

__getitem__(section_id: str) → eventsourcing.application.notificationlog.Section[source]

Get section of notification log.

Parameters:section_id – ID of a section of the notification log.

Size of section of notification log.

class eventsourcing.application.notificationlog.LocalNotificationLog(section_size: Optional[int] = None)[source]

Bases: eventsourcing.application.notificationlog.AbstractNotificationLog

Presents a sequence of sections from a sequence of notifications.

__init__(section_size: Optional[int] = None)[source]

Size of section of notification log.

__getitem__(section_id: str) → eventsourcing.application.notificationlog.Section[source]

Get section of notification log.

Parameters:section_id – ID of a section of the notification log.
get_next_position() → int[source]

Returns next unoccupied position in zero-based sequence.

Since the notification IDs are one-based, the next position in the zero-based sequence equals the current max notification ID which is 1-based. If there are no records, the max notification ID will be None, and the next position is zero.

Returns:Non-negative integer.
Return type:int
get_items(start: int, stop: int) → Sequence[Any][source]

Returns items for section.

class eventsourcing.application.notificationlog.RecordManagerNotificationLog(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, section_size: Optional[int] = None)[source]

Bases: eventsourcing.application.notificationlog.LocalNotificationLog

Local notification log that gets notifications from a record manager.

__init__(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, section_size: Optional[int] = None)[source]

get_items(start: int, stop: Optional[int]) → Sequence[Any][source]

Returns notification in log.

  • start – Inclusive start position in log.
  • stop – Inclusive stop position in log.

get_next_position() → int[source]

Returns next unoccupied position in zero-based sequence.

Since the notification IDs are one-based, the next position in the zero-based sequence equals the current max notification ID which is 1-based. If there are no records, the max notification ID will be None, and the next position is zero.

Returns:Non-negative integer.
Return type:int
class eventsourcing.application.notificationlog.BigArrayNotificationLog(big_array: eventsourcing.domain.model.array.BigArray, section_size: int)[source]

Bases: eventsourcing.application.notificationlog.LocalNotificationLog

Notification log that uses the BigArray class.

__init__(big_array: eventsourcing.domain.model.array.BigArray, section_size: int)[source]

get_items(start: int, stop: int) → Sequence[Any][source]

Returns items for section.

get_next_position() → int[source]

Returns next unoccupied position in zero-based sequence.

Returns:Non-negative integer.
Return type:int
class eventsourcing.application.notificationlog.NotificationLogReader(notification_log: eventsourcing.application.notificationlog.AbstractNotificationLog, use_direct_query_if_available: bool = False)[source]

Bases: abc.ABC

__init__(notification_log: eventsourcing.application.notificationlog.AbstractNotificationLog, use_direct_query_if_available: bool = False)[source]

seek(position: int) → None[source]

Sets position of reader in notification log sequence.

This represents the position of the last notification read by the reader. The next notification returned by the reader will be the next position.

Parameters:position (int) – Position is notification log sequence.
Raises:ValueError – if the position is less than zero

Returns initial section ID used to start getting linked sections from the notification log.

Slight departure from Vaughn Vernon’s design by not using ‘current’ as initial section ID, but a section ID that just includes the “next” position, which the notification log can use to return the section containing this position. This avoids lengthy back- tracking when reader has a lot of notifications to catch-up on.

This property has been extracted in order to allow a subclass to adjust this default behaviour.

It would be possible to calculate the actual section ID from the current reader position. Using section ID of an actual section may hit a cache and avoid troubling the server, but the reader would need to know the section size of the notification log it is reading. If we don’t know section size, perhaps it is a remote notification log, we can use ‘current’ to hit a cache. In future, it might be possible to ask the notification log to disclose it’s section size, or compute an actual section ID for a given position.

Returns:A notification log section ID.
Return type:str
read_list(advance_by: Optional[int] = None) → List[Dict[str, Any]][source]

Deprecated in 8.0.0.

Please use list_notifications() instead.

read_items(stop_index: Optional[int] = None, advance_by: Optional[int] = None) → Iterator[Dict[str, Any]][source]

Deprecated in 8.0.0.

Please use iter_notifications() instead.


class eventsourcing.application.django.DjangoApplication(tracking_record_class: Any = None, *args, **kwargs)[source]

Bases: eventsourcing.application.simple.ApplicationWithConcreteInfrastructure


alias of eventsourcing.infrastructure.django.factory.DjangoInfrastructureFactory

__init__(tracking_record_class: Any = None, *args, **kwargs)[source]

Initialises application object.

construct_infrastructure(*args, **kwargs) → None[source]

Constructs infrastructure for application.

classmethod reset_connection_after_forking() → None[source]

Resets database connection after forking.


class eventsourcing.application.popo.PopoApplication(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[], Tuple[Type[]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]

Bases: eventsourcing.application.simple.ApplicationWithConcreteInfrastructure


alias of eventsourcing.infrastructure.popo.factory.PopoInfrastructureFactory


alias of eventsourcing.infrastructure.popo.mapper.SequencedItemMapperForPopo


class eventsourcing.application.sqlalchemy.SQLAlchemyApplication(uri: Optional[str] = None, session: Optional[Any] = None, tracking_record_class: Any = None, **kwargs)[source]

Bases: eventsourcing.application.simple.ApplicationWithConcreteInfrastructure


alias of eventsourcing.infrastructure.sqlalchemy.factory.SQLAlchemyInfrastructureFactory


alias of eventsourcing.infrastructure.sqlalchemy.records.StoredEventRecord


alias of eventsourcing.infrastructure.sqlalchemy.records.EntitySnapshotRecord

__init__(uri: Optional[str] = None, session: Optional[Any] = None, tracking_record_class: Any = None, **kwargs)[source]

Initialises application object.

construct_infrastructure(*args, **kwargs) → None[source]

Constructs infrastructure for application.

construct_infrastructure_factory(*args, **kwargs) → eventsourcing.infrastructure.factory.InfrastructureFactory[source]

Constructs infrastructure factory object.

construct_datastore() → None[source]

Constructs datastore object (which helps by creating and dropping tables).


class eventsourcing.application.process.PromptToQuit[source]

Bases: eventsourcing.application.simple.Prompt

class eventsourcing.application.process.WrappedRepository(repository: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository[~TAggregate, ~TAggregateEvent][TAggregate, TAggregateEvent])[source]

Bases: typing.Generic

Used to wrap an event sourced repository for use in process application policy so that use of, and changes to, domain model aggregates can be automatically detected and recorded.

Implements a “dictionary like” interface, so that aggregates can be accessed by ID.

__init__(repository: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository[~TAggregate, ~TAggregateEvent][TAggregate, TAggregateEvent]) → None[source]

save_orm_obj(orm_obj: Any) → None[source]

Includes orm_obj in “process event”, so that projections into custom ORM objects is as reliable with respect to sudden restarts as “normal” domain event processing in a process application.

delete_orm_obj(orm_obj: Any) → None[source]

Includes orm_obj in “process event”, so that projections into custom ORM objects is as reliable with respect to sudden restarts as “normal” domain event processing in a process application.

class eventsourcing.application.process.ProcessApplication(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]

Bases: eventsourcing.application.simple.SimpleApplication

__init__(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]

Initialises application object.

alias of eventsourcing.application.notificationlog.NotificationLogReader

publish_prompt_for_events(_: Optional[Iterable[eventsourcing.whitehead.ActualOccasion]] = None) → None[source]

Publishes prompt for a given event.

Used to prompt downstream process application when an event is published by this application’s model, which can happen when application command methods, rather than the process policy, are called.

Wraps exceptions with PromptFailed, to avoid application policy exceptions being seen directly in other applications when running synchronously in single thread.

policy(repository: eventsourcing.application.process.WrappedRepository[~TAggregate, ~TAggregateEvent][TAggregate, TAggregateEvent], event: TAggregateEvent) → Union[TAggregate, Sequence[TAggregate], None][source]

Empty method, can be overridden in subclasses to implement concrete policy.

setup_table() → None[source]

Sets up the database table using event store’s record class.

drop_table() → None[source]

Drops the database table using event store’s record class.

class eventsourcing.application.process.ProcessApplicationWithSnapshotting(snapshot_period: int = 0, **kwargs)[source]

Bases: eventsourcing.application.snapshotting.SnapshottingApplication, eventsourcing.application.process.ProcessApplication


class eventsourcing.application.command.CommandProcess(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]

Bases: eventsourcing.application.process.ProcessApplication


alias of eventsourcing.domain.model.command.Command.Event


class eventsourcing.application.pipeline.PipelineExpression(left: Union[PipelineExpression, PipeableMetaclass], right: Union[PipelineExpression, PipeableMetaclass])[source]

Bases: object

Implements a left-to-right association between two objects.

__init__(left: Union[PipelineExpression, PipeableMetaclass], right: Union[PipelineExpression, PipeableMetaclass])[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.application.pipeline.PipeableMetaclass[source]

Bases: abc.ABCMeta

Meta class for pipeable classes.

__or__(other: Union[PipelineExpression, PipeableMetaclass]) → eventsourcing.application.pipeline.PipelineExpression[source]

Implements bitwise or operator ‘|’ as a pipe between pipeable classes.

class eventsourcing.application.pipeline.Pipeable[source]

Bases: object

Base class for pipeable classes.


class eventsourcing.application.snapshotting.SnapshottingApplication(snapshot_period: int = 0, **kwargs)[source]

Bases: eventsourcing.application.simple.SimpleApplication

__init__(snapshot_period: int = 0, **kwargs)[source]

Initialises application object.

construct_event_store() → None[source]

Constructs event store object.

construct_repository(**kwargs) → None[source]

Constructs repository object.

construct_persistence_policy() → None[source]

Constructs persistence policy object.

setup_table() → None[source]

Sets up the database table using event store’s record class.

drop_table() → None[source]

Drops the database table using event store’s record class.