application¶
The application layer brings together the domain and infrastructure layers.
simple¶
-
class
eventsourcing.application.simple.
SimpleApplication
(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]¶ Bases:
eventsourcing.application.pipeline.Pipeable
,typing.Generic
Base class for event sourced applications.
Constructs infrastructure objects such as the repository and event store, and also the notification log which presents the application state as a sequence of events.
Needs actual infrastructure classes.
-
infrastructure_factory_class
¶ alias of
eventsourcing.infrastructure.factory.InfrastructureFactory
-
repository_class
¶ alias of
eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
-
__init__
(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]¶ Initialises application object.
Parameters: - name – Name of application.
- persistence_policy – Persistence policy object.
- persist_event_type – Tuple of domain event classes to be persisted.
- cipher_key – Base64 unicode string cipher key.
- compressor – Compressor used to compress serialized event state.
- sequenced_item_class – Named tuple for mapping and recording events.
- sequenced_item_mapper_class – Object class for mapping stored events.
- record_manager_class – Object class for recording stored events.
- stored_event_record_class – Object class for event records.
- event_store_class – Object class uses to store and retrieve domain events.
- snapshot_record_class – Object class used to represent snapshots.
- setup_table – Option to create database tables when application starts.
- contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
- pipeline_id – ID of instance of system pipeline expressions.
- json_encoder_class – Object class used to encode object as JSON strings.
- json_decoder_class – Object class used to decode JSON strings as objects.
- notification_log_section_size – Number of notification items in a section.
- use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
-
event_store_class
¶
-
construct_infrastructure
(*args, **kwargs) → None[source]¶ Constructs infrastructure for application.
-
construct_infrastructure_factory
(*args, **kwargs) → eventsourcing.infrastructure.factory.InfrastructureFactory[source]¶ Constructs infrastructure factory object.
-
construct_datastore
() → None[source]¶ Constructs datastore object (which helps by creating and dropping tables).
-
change_pipeline
(pipeline_id: int) → None[source]¶ Switches pipeline being used by this application object.
-
-
class
eventsourcing.application.simple.
ApplicationWithConcreteInfrastructure
(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]¶ Bases:
eventsourcing.application.simple.SimpleApplication
Base class for application classes that have actual infrastructure.
policies¶
-
class
eventsourcing.application.policies.
PersistencePolicy
(event_store: eventsourcing.infrastructure.base.AbstractEventStore, persist_event_type: Union[type, Tuple, None] = None)[source]¶ Bases:
object
Stores events of given type to given event store, whenever they are published.
-
class
eventsourcing.application.policies.
SnapshottingPolicy
(repository: eventsourcing.infrastructure.base.AbstractEntityRepository, snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[eventsourcing.infrastructure.base.AbstractSnapshop, eventsourcing.infrastructure.base.AbstractRecordManager][eventsourcing.infrastructure.base.AbstractSnapshop, eventsourcing.infrastructure.base.AbstractRecordManager], persist_event_type: Union[type, Tuple, None] = (<class 'eventsourcing.domain.model.events.EventWithOriginatorVersion'>,), period: int = 2)[source]¶ Bases:
typing.Generic
-
__init__
(repository: eventsourcing.infrastructure.base.AbstractEntityRepository, snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[eventsourcing.infrastructure.base.AbstractSnapshop, eventsourcing.infrastructure.base.AbstractRecordManager][eventsourcing.infrastructure.base.AbstractSnapshop, eventsourcing.infrastructure.base.AbstractRecordManager], persist_event_type: Union[type, Tuple, None] = (<class 'eventsourcing.domain.model.events.EventWithOriginatorVersion'>,), period: int = 2)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
notificationlog¶
-
class
eventsourcing.application.notificationlog.
Section
(section_id: str, items: List[T], previous_id: Optional[str] = None, next_id: Optional[str] = None)[source]¶ Bases:
object
Section of a notification log.
Contains items, and has an ID.
May also have either IDs of previous and next sections of the notification log.
-
class
eventsourcing.application.notificationlog.
AbstractNotificationLog
[source]¶ Bases:
abc.ABC
Presents a sequence of sections from a sequence of notifications.
-
__getitem__
(section_id: str) → eventsourcing.application.notificationlog.Section[source]¶ Get section of notification log.
Parameters: section_id – ID of a section of the notification log.
-
section_size
¶ Size of section of notification log.
-
-
class
eventsourcing.application.notificationlog.
LocalNotificationLog
(section_size: Optional[int] = None)[source]¶ Bases:
eventsourcing.application.notificationlog.AbstractNotificationLog
Presents a sequence of sections from a sequence of notifications.
-
__init__
(section_size: Optional[int] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
section_size
¶ Size of section of notification log.
-
__getitem__
(section_id: str) → eventsourcing.application.notificationlog.Section[source]¶ Get section of notification log.
Parameters: section_id – ID of a section of the notification log.
-
get_next_position
() → int[source]¶ Returns next unoccupied position in zero-based sequence.
Since the notification IDs are one-based, the next position in the zero-based sequence equals the current max notification ID which is 1-based. If there are no records, the max notification ID will be None, and the next position is zero.
Returns: Non-negative integer. Return type: int
-
-
class
eventsourcing.application.notificationlog.
RecordManagerNotificationLog
(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, section_size: Optional[int] = None)[source]¶ Bases:
eventsourcing.application.notificationlog.LocalNotificationLog
Local notification log that gets notifications from a record manager.
-
__init__
(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, section_size: Optional[int] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
get_items
(start: int, stop: Optional[int]) → Sequence[Any][source]¶ Returns notification in log.
Parameters: - start – Inclusive start position in log.
- stop – Inclusive stop position in log.
Returns:
-
get_next_position
() → int[source]¶ Returns next unoccupied position in zero-based sequence.
Since the notification IDs are one-based, the next position in the zero-based sequence equals the current max notification ID which is 1-based. If there are no records, the max notification ID will be None, and the next position is zero.
Returns: Non-negative integer. Return type: int
-
-
class
eventsourcing.application.notificationlog.
BigArrayNotificationLog
(big_array: eventsourcing.domain.model.array.BigArray, section_size: int)[source]¶ Bases:
eventsourcing.application.notificationlog.LocalNotificationLog
Notification log that uses the BigArray class.
-
class
eventsourcing.application.notificationlog.
NotificationLogReader
(notification_log: eventsourcing.application.notificationlog.AbstractNotificationLog, use_direct_query_if_available: bool = False)[source]¶ Bases:
abc.ABC
-
__init__
(notification_log: eventsourcing.application.notificationlog.AbstractNotificationLog, use_direct_query_if_available: bool = False)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
seek
(position: int) → None[source]¶ Sets position of reader in notification log sequence.
This represents the position of the last notification read by the reader. The next notification returned by the reader will be the next position.
Parameters: position (int) – Position is notification log sequence. Raises: ValueError – if the position is less than zero
-
initial_section_id
¶ Returns initial section ID used to start getting linked sections from the notification log.
Slight departure from Vaughn Vernon’s design by not using ‘current’ as initial section ID, but a section ID that just includes the “next” position, which the notification log can use to return the section containing this position. This avoids lengthy back- tracking when reader has a lot of notifications to catch-up on.
This property has been extracted in order to allow a subclass to adjust this default behaviour.
It would be possible to calculate the actual section ID from the current reader position. Using section ID of an actual section may hit a cache and avoid troubling the server, but the reader would need to know the section size of the notification log it is reading. If we don’t know section size, perhaps it is a remote notification log, we can use ‘current’ to hit a cache. In future, it might be possible to ask the notification log to disclose it’s section size, or compute an actual section ID for a given position.
Returns: A notification log section ID. Return type: str
-
django¶
-
class
eventsourcing.application.django.
DjangoApplication
(tracking_record_class: Any = None, *args, **kwargs)[source]¶ Bases:
eventsourcing.application.simple.ApplicationWithConcreteInfrastructure
-
infrastructure_factory_class
¶ alias of
eventsourcing.infrastructure.django.factory.DjangoInfrastructureFactory
-
__init__
(tracking_record_class: Any = None, *args, **kwargs)[source]¶ Initialises application object.
Parameters: - name – Name of application.
- persistence_policy – Persistence policy object.
- persist_event_type – Tuple of domain event classes to be persisted.
- cipher_key – Base64 unicode string cipher key.
- compressor – Compressor used to compress serialized event state.
- sequenced_item_class – Named tuple for mapping and recording events.
- sequenced_item_mapper_class – Object class for mapping stored events.
- record_manager_class – Object class for recording stored events.
- stored_event_record_class – Object class for event records.
- event_store_class – Object class uses to store and retrieve domain events.
- snapshot_record_class – Object class used to represent snapshots.
- setup_table – Option to create database tables when application starts.
- contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
- pipeline_id – ID of instance of system pipeline expressions.
- json_encoder_class – Object class used to encode object as JSON strings.
- json_decoder_class – Object class used to decode JSON strings as objects.
- notification_log_section_size – Number of notification items in a section.
- use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
-
popo¶
-
class
eventsourcing.application.popo.
PopoApplication
(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]¶ Bases:
eventsourcing.application.simple.ApplicationWithConcreteInfrastructure
-
infrastructure_factory_class
¶ alias of
eventsourcing.infrastructure.popo.factory.PopoInfrastructureFactory
-
sequenced_item_mapper_class
¶ alias of
eventsourcing.infrastructure.popo.mapper.SequencedItemMapperForPopo
-
sqlalchemy¶
-
class
eventsourcing.application.sqlalchemy.
SQLAlchemyApplication
(uri: Optional[str] = None, session: Optional[Any] = None, tracking_record_class: Any = None, **kwargs)[source]¶ Bases:
eventsourcing.application.simple.ApplicationWithConcreteInfrastructure
-
infrastructure_factory_class
¶ alias of
eventsourcing.infrastructure.sqlalchemy.factory.SQLAlchemyInfrastructureFactory
-
stored_event_record_class
¶ alias of
eventsourcing.infrastructure.sqlalchemy.records.StoredEventRecord
-
snapshot_record_class
¶ alias of
eventsourcing.infrastructure.sqlalchemy.records.EntitySnapshotRecord
-
__init__
(uri: Optional[str] = None, session: Optional[Any] = None, tracking_record_class: Any = None, **kwargs)[source]¶ Initialises application object.
Parameters: - name – Name of application.
- persistence_policy – Persistence policy object.
- persist_event_type – Tuple of domain event classes to be persisted.
- cipher_key – Base64 unicode string cipher key.
- compressor – Compressor used to compress serialized event state.
- sequenced_item_class – Named tuple for mapping and recording events.
- sequenced_item_mapper_class – Object class for mapping stored events.
- record_manager_class – Object class for recording stored events.
- stored_event_record_class – Object class for event records.
- event_store_class – Object class uses to store and retrieve domain events.
- snapshot_record_class – Object class used to represent snapshots.
- setup_table – Option to create database tables when application starts.
- contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
- pipeline_id – ID of instance of system pipeline expressions.
- json_encoder_class – Object class used to encode object as JSON strings.
- json_decoder_class – Object class used to decode JSON strings as objects.
- notification_log_section_size – Number of notification items in a section.
- use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
-
construct_infrastructure
(*args, **kwargs) → None[source]¶ Constructs infrastructure for application.
-
process¶
-
class
eventsourcing.application.process.
ProcessEvent
(domain_events: Iterable[TDomainEvent], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, causal_dependencies: Optional[List[Dict[str, int]]] = None, orm_objs_pending_save: Sequence[Any] = (), orm_objs_pending_delete: Sequence[Any] = ())[source]¶ Bases:
eventsourcing.whitehead.ActualOccasion
,typing.Generic
-
__init__
(domain_events: Iterable[TDomainEvent], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, causal_dependencies: Optional[List[Dict[str, int]]] = None, orm_objs_pending_save: Sequence[Any] = (), orm_objs_pending_delete: Sequence[Any] = ())[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.application.process.
PromptToPull
(process_name: str, pipeline_id: int)[source]¶
-
class
eventsourcing.application.process.
WrappedRepository
(repository: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository[~TAggregate, ~TAggregateEvent][TAggregate, TAggregateEvent])[source]¶ Bases:
typing.Generic
Used to wrap an event sourced repository for use in process application policy so that use of, and changes to, domain model aggregates can be automatically detected and recorded.
Implements a “dictionary like” interface, so that aggregates can be accessed by ID.
-
__init__
(repository: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository[~TAggregate, ~TAggregateEvent][TAggregate, TAggregateEvent]) → None[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.application.process.
ProcessApplication
(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]¶ Bases:
eventsourcing.application.simple.SimpleApplication
-
__init__
(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]¶ Initialises application object.
Parameters: - name – Name of application.
- persistence_policy – Persistence policy object.
- persist_event_type – Tuple of domain event classes to be persisted.
- cipher_key – Base64 unicode string cipher key.
- compressor – Compressor used to compress serialized event state.
- sequenced_item_class – Named tuple for mapping and recording events.
- sequenced_item_mapper_class – Object class for mapping stored events.
- record_manager_class – Object class for recording stored events.
- stored_event_record_class – Object class for event records.
- event_store_class – Object class uses to store and retrieve domain events.
- snapshot_record_class – Object class used to represent snapshots.
- setup_table – Option to create database tables when application starts.
- contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
- pipeline_id – ID of instance of system pipeline expressions.
- json_encoder_class – Object class used to encode object as JSON strings.
- json_decoder_class – Object class used to decode JSON strings as objects.
- notification_log_section_size – Number of notification items in a section.
- use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
-
notification_log_reader_class
¶ alias of
eventsourcing.application.notificationlog.NotificationLogReader
-
publish_prompt
(_: Optional[Iterable[eventsourcing.whitehead.ActualOccasion]] = None) → None[source]¶ Publishes prompt for a given event.
Used to prompt downstream process application when an event is published by this application’s model, which can happen when application command methods, rather than the process policy, are called.
Wraps exceptions with PromptFailed, to avoid application policy exceptions being seen directly in other applications when running synchronously in single thread.
-
-
class
eventsourcing.application.process.
ProcessApplicationWithSnapshotting
(snapshot_period: Optional[int] = None, **kwargs)[source]¶ Bases:
eventsourcing.application.snapshotting.SnapshottingApplication
,eventsourcing.application.process.ProcessApplication
command¶
-
class
eventsourcing.application.command.
CommandProcess
(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]¶ Bases:
eventsourcing.application.process.ProcessApplication
-
persist_event_type
¶
-
pipeline¶
-
class
eventsourcing.application.pipeline.
PipelineExpression
(left: Union[PipelineExpression, PipeableMetaclass], right: Union[PipelineExpression, PipeableMetaclass])[source]¶ Bases:
object
Implements a left-to-right association between two objects.
snapshotting¶
-
class
eventsourcing.application.snapshotting.
SnapshottingApplication
(snapshot_period: Optional[int] = None, **kwargs)[source]¶ Bases:
eventsourcing.application.simple.SimpleApplication
-
__init__
(snapshot_period: Optional[int] = None, **kwargs)[source]¶ Initialises application object.
Parameters: - name – Name of application.
- persistence_policy – Persistence policy object.
- persist_event_type – Tuple of domain event classes to be persisted.
- cipher_key – Base64 unicode string cipher key.
- compressor – Compressor used to compress serialized event state.
- sequenced_item_class – Named tuple for mapping and recording events.
- sequenced_item_mapper_class – Object class for mapping stored events.
- record_manager_class – Object class for recording stored events.
- stored_event_record_class – Object class for event records.
- event_store_class – Object class uses to store and retrieve domain events.
- snapshot_record_class – Object class used to represent snapshots.
- setup_table – Option to create database tables when application starts.
- contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
- pipeline_id – ID of instance of system pipeline expressions.
- json_encoder_class – Object class used to encode object as JSON strings.
- json_decoder_class – Object class used to decode JSON strings as objects.
- notification_log_section_size – Number of notification items in a section.
- use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
-