application¶
The application layer brings together the domain and infrastructure layers.
simple¶
-
class
eventsourcing.application.simple.
ProcessEvent
(domain_events: Iterable[TDomainEvent], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, causal_dependencies: Optional[List[Dict[str, int]]] = None, orm_objs_pending_save: Sequence[Any] = (), orm_objs_pending_delete: Sequence[Any] = ())[source]¶ Bases:
eventsourcing.whitehead.ActualOccasion
,typing.Generic
-
__init__
(domain_events: Iterable[TDomainEvent], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, causal_dependencies: Optional[List[Dict[str, int]]] = None, orm_objs_pending_save: Sequence[Any] = (), orm_objs_pending_delete: Sequence[Any] = ())[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.application.simple.
SimpleApplication
(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]¶ Bases:
eventsourcing.application.pipeline.Pipeable
,typing.Generic
Base class for event sourced applications.
Constructs infrastructure objects such as the repository and event store, and also the notification log which presents the application state as a sequence of events.
Needs actual infrastructure classes.
-
infrastructure_factory_class
¶ alias of
eventsourcing.infrastructure.factory.InfrastructureFactory
-
repository_class
¶ alias of
eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
-
__init__
(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]¶ Initialises application object.
Parameters: - name – Name of application.
- persistence_policy – Persistence policy object.
- persist_event_type – Tuple of domain event classes to be persisted.
- cipher_key – Base64 unicode string cipher key.
- compressor – Compressor used to compress serialized event state.
- sequenced_item_class – Named tuple for mapping and recording events.
- sequenced_item_mapper_class – Object class for mapping stored events.
- record_manager_class – Object class for recording stored events.
- stored_event_record_class – Object class for event records.
- event_store_class – Object class uses to store and retrieve domain events.
- snapshot_record_class – Object class used to represent snapshots.
- setup_table – Option to create database tables when application starts.
- contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
- pipeline_id – ID of instance of system pipeline expressions.
- json_encoder_class – Object class used to encode object as JSON strings.
- json_decoder_class – Object class used to decode JSON strings as objects.
- notification_log_section_size – Number of notification items in a section.
- use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
-
event_store_class
¶
-
construct_infrastructure
(*args, **kwargs) → None[source]¶ Constructs infrastructure for application.
-
construct_infrastructure_factory
(*args, **kwargs) → eventsourcing.infrastructure.factory.InfrastructureFactory[source]¶ Constructs infrastructure factory object.
-
construct_datastore
() → None[source]¶ Constructs datastore object (used to create and drop database tables).
-
change_pipeline
(pipeline_id: int) → None[source]¶ Switches pipeline being used by this application object.
-
close
() → None[source]¶ Closes the application for further use.
The persistence policy is closed, and the application’s connection to the database is closed.
-
__exit__
(exc_type: Any, exc_val: Any, exc_tb: Any) → None[source]¶ Closes application when exiting context manager.
-
classmethod
mixin
(infrastructure_class: type) → T[source]¶ Returns subclass that inherits also from given infrastructure class.
-
save
(aggregates=(), orm_objects_pending_save=(), orm_objects_pending_delete=()) → None[source]¶ Saves state of aggregates, and ORM objects.
All of the pending events of the aggregates, along with the ORM objects, are recorded atomically as a process event.
Then a “prompt to pull” is published, and, if the repository cache is in use, then puts the aggregates in the cache.
Parameters: - aggregates – One or many aggregates.
- orm_objects_pending_save – Sequence of ORM objects to be saved.
- orm_objects_pending_delete – Sequance of ORM objects to be deleted.
-
record_process_event
(process_event: eventsourcing.application.simple.ProcessEvent) → List[T][source]¶ Records a process event.
Converts the domain events of the process event to event record objects, and writes the event records and the ORM objects to the database using the application’s event store’s record manager.
Parameters: process_event – An instance of ProcessEvent
Returns: A list of event records.
-
construct_event_records
(pending_events: Iterable[TAggregateEvent], causal_dependencies: Optional[List[Dict[str, int]]]) → List[T][source]¶ Constructs event records from domain events.
Parameters: - pending_events – An iterable of domain events.
- causal_dependencies – A list of causal dependencies.
Returns: A list of event records.
-
publish_prompt
(head_notification_id=None)[source]¶ Publishes a “prompt to pull” (instance of
PromptToPull
).Parameters: head_notification_id – Maximum notification ID of event records to be pulled.
-
-
class
eventsourcing.application.simple.
ApplicationWithConcreteInfrastructure
(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]¶ Bases:
eventsourcing.application.simple.SimpleApplication
Base class for application classes that have actual infrastructure.
-
class
eventsourcing.application.simple.
PromptToPull
(process_name: str, pipeline_id: int, head_notification_id=None)[source]¶
policies¶
-
class
eventsourcing.application.policies.
PersistencePolicy
(event_store: eventsourcing.infrastructure.base.AbstractEventStore, persist_event_type: Union[type, Tuple, None] = None)[source]¶ Bases:
object
Stores events of given type to given event store, whenever they are published.
-
class
eventsourcing.application.policies.
SnapshottingPolicy
(repository: eventsourcing.domain.model.repository.AbstractEntityRepository, snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager][eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager], persist_event_type: Union[type, Tuple, None] = (<class 'eventsourcing.domain.model.events.EventWithOriginatorVersion'>,), period: int = 0)[source]¶ Bases:
typing.Generic
-
__init__
(repository: eventsourcing.domain.model.repository.AbstractEntityRepository, snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager][eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager], persist_event_type: Union[type, Tuple, None] = (<class 'eventsourcing.domain.model.events.EventWithOriginatorVersion'>,), period: int = 0)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
notificationlog¶
-
class
eventsourcing.application.notificationlog.
Section
(section_id: str, items: List[T], previous_id: Optional[str] = None, next_id: Optional[str] = None)[source]¶ Bases:
object
Section of a notification log.
Contains items, and has an ID.
May also have either IDs of previous and next sections of the notification log.
-
class
eventsourcing.application.notificationlog.
AbstractNotificationLog
[source]¶ Bases:
abc.ABC
Presents a sequence of sections from a sequence of notifications.
-
__getitem__
(section_id: str) → eventsourcing.application.notificationlog.Section[source]¶ Get section of notification log.
Parameters: section_id – ID of a section of the notification log.
-
section_size
¶ Size of section of notification log.
-
-
class
eventsourcing.application.notificationlog.
LocalNotificationLog
(section_size: Optional[int] = None)[source]¶ Bases:
eventsourcing.application.notificationlog.AbstractNotificationLog
Presents a sequence of sections from a sequence of notifications.
-
__init__
(section_size: Optional[int] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
section_size
¶ Size of section of notification log.
-
__getitem__
(section_id: str) → eventsourcing.application.notificationlog.Section[source]¶ Get section of notification log.
Parameters: section_id – ID of a section of the notification log.
-
get_next_position
() → int[source]¶ Returns next unoccupied position in zero-based sequence.
Since the notification IDs are one-based, the next position in the zero-based sequence equals the current max notification ID which is 1-based. If there are no records, the max notification ID will be None, and the next position is zero.
Returns: Non-negative integer. Return type: int
-
-
class
eventsourcing.application.notificationlog.
RecordManagerNotificationLog
(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, section_size: Optional[int] = None)[source]¶ Bases:
eventsourcing.application.notificationlog.LocalNotificationLog
Local notification log that gets notifications from a record manager.
-
__init__
(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, section_size: Optional[int] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
get_items
(start: int, stop: Optional[int]) → Sequence[Any][source]¶ Returns notification in log.
Parameters: - start – Inclusive start position in log.
- stop – Inclusive stop position in log.
Returns:
-
get_next_position
() → int[source]¶ Returns next unoccupied position in zero-based sequence.
Since the notification IDs are one-based, the next position in the zero-based sequence equals the current max notification ID which is 1-based. If there are no records, the max notification ID will be None, and the next position is zero.
Returns: Non-negative integer. Return type: int
-
-
class
eventsourcing.application.notificationlog.
BigArrayNotificationLog
(big_array: eventsourcing.domain.model.array.BigArray, section_size: int)[source]¶ Bases:
eventsourcing.application.notificationlog.LocalNotificationLog
Notification log that uses the BigArray class.
-
class
eventsourcing.application.notificationlog.
NotificationLogReader
(notification_log: eventsourcing.application.notificationlog.AbstractNotificationLog, use_direct_query_if_available: bool = False)[source]¶ Bases:
abc.ABC
-
__init__
(notification_log: eventsourcing.application.notificationlog.AbstractNotificationLog, use_direct_query_if_available: bool = False)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
seek
(position: int) → None[source]¶ Sets position of reader in notification log sequence.
This represents the position of the last notification read by the reader. The next notification returned by the reader will be the next position.
Parameters: position (int) – Position is notification log sequence. Raises: ValueError – if the position is less than zero
-
initial_section_id
¶ Returns initial section ID used to start getting linked sections from the notification log.
Slight departure from Vaughn Vernon’s design by not using ‘current’ as initial section ID, but a section ID that just includes the “next” position, which the notification log can use to return the section containing this position. This avoids lengthy back- tracking when reader has a lot of notifications to catch-up on.
This property has been extracted in order to allow a subclass to adjust this default behaviour.
It would be possible to calculate the actual section ID from the current reader position. Using section ID of an actual section may hit a cache and avoid troubling the server, but the reader would need to know the section size of the notification log it is reading. If we don’t know section size, perhaps it is a remote notification log, we can use ‘current’ to hit a cache. In future, it might be possible to ask the notification log to disclose it’s section size, or compute an actual section ID for a given position.
Returns: A notification log section ID. Return type: str
-
django¶
-
class
eventsourcing.application.django.
DjangoApplication
(tracking_record_class: Any = None, *args, **kwargs)[source]¶ Bases:
eventsourcing.application.simple.ApplicationWithConcreteInfrastructure
-
infrastructure_factory_class
¶ alias of
eventsourcing.infrastructure.django.factory.DjangoInfrastructureFactory
-
__init__
(tracking_record_class: Any = None, *args, **kwargs)[source]¶ Initialises application object.
Parameters: - name – Name of application.
- persistence_policy – Persistence policy object.
- persist_event_type – Tuple of domain event classes to be persisted.
- cipher_key – Base64 unicode string cipher key.
- compressor – Compressor used to compress serialized event state.
- sequenced_item_class – Named tuple for mapping and recording events.
- sequenced_item_mapper_class – Object class for mapping stored events.
- record_manager_class – Object class for recording stored events.
- stored_event_record_class – Object class for event records.
- event_store_class – Object class uses to store and retrieve domain events.
- snapshot_record_class – Object class used to represent snapshots.
- setup_table – Option to create database tables when application starts.
- contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
- pipeline_id – ID of instance of system pipeline expressions.
- json_encoder_class – Object class used to encode object as JSON strings.
- json_decoder_class – Object class used to decode JSON strings as objects.
- notification_log_section_size – Number of notification items in a section.
- use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
-
popo¶
-
class
eventsourcing.application.popo.
PopoApplication
(name: str = '', persistence_policy: Optional[eventsourcing.application.policies.PersistencePolicy] = None, persist_event_type: Union[Type[eventsourcing.domain.model.events.DomainEvent], Tuple[Type[eventsourcing.domain.model.events.DomainEvent]], None] = None, cipher_key: Optional[str] = None, compressor: Any = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper]] = None, record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, stored_event_record_class: Optional[type] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.eventstore.EventStore]] = None, snapshot_record_class: Optional[type] = None, setup_table: bool = True, contiguous_record_ids: bool = True, pipeline_id: int = 0, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, notification_log_section_size: Optional[int] = None, use_cache: bool = False)[source]¶ Bases:
eventsourcing.application.simple.ApplicationWithConcreteInfrastructure
-
infrastructure_factory_class
¶ alias of
eventsourcing.infrastructure.popo.factory.PopoInfrastructureFactory
-
sequenced_item_mapper_class
¶ alias of
eventsourcing.infrastructure.popo.mapper.SequencedItemMapperForPopo
-
stored_event_record_class
¶ alias of
eventsourcing.infrastructure.popo.records.StoredEventRecord
-
sqlalchemy¶
-
class
eventsourcing.application.sqlalchemy.
SQLAlchemyApplication
(uri: Optional[str] = None, session: Optional[Any] = None, tracking_record_class: Any = None, **kwargs)[source]¶ Bases:
eventsourcing.application.simple.ApplicationWithConcreteInfrastructure
-
infrastructure_factory_class
¶ alias of
eventsourcing.infrastructure.sqlalchemy.factory.SQLAlchemyInfrastructureFactory
-
stored_event_record_class
¶ alias of
eventsourcing.infrastructure.sqlalchemy.records.StoredEventRecord
-
snapshot_record_class
¶ alias of
eventsourcing.infrastructure.sqlalchemy.records.EntitySnapshotRecord
-
__init__
(uri: Optional[str] = None, session: Optional[Any] = None, tracking_record_class: Any = None, **kwargs)[source]¶ Initialises application object.
Parameters: - name – Name of application.
- persistence_policy – Persistence policy object.
- persist_event_type – Tuple of domain event classes to be persisted.
- cipher_key – Base64 unicode string cipher key.
- compressor – Compressor used to compress serialized event state.
- sequenced_item_class – Named tuple for mapping and recording events.
- sequenced_item_mapper_class – Object class for mapping stored events.
- record_manager_class – Object class for recording stored events.
- stored_event_record_class – Object class for event records.
- event_store_class – Object class uses to store and retrieve domain events.
- snapshot_record_class – Object class used to represent snapshots.
- setup_table – Option to create database tables when application starts.
- contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
- pipeline_id – ID of instance of system pipeline expressions.
- json_encoder_class – Object class used to encode object as JSON strings.
- json_decoder_class – Object class used to decode JSON strings as objects.
- notification_log_section_size – Number of notification items in a section.
- use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
-
construct_infrastructure
(*args, **kwargs) → None[source]¶ Constructs infrastructure for application.
-
process¶
-
class
eventsourcing.application.process.
WrappedRepository
(repository: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository[~TAggregate, ~TAggregateEvent][TAggregate, TAggregateEvent])[source]¶ Bases:
typing.Generic
Used to wrap an event sourced repository for use in process application policy so that use of, and changes to, domain model aggregates can be automatically detected and recorded.
Implements a “dictionary like” interface, so that aggregates can be accessed by ID.
-
__init__
(repository: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository[~TAggregate, ~TAggregateEvent][TAggregate, TAggregateEvent]) → None[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.application.process.
ProcessApplication
(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]¶ Bases:
eventsourcing.application.simple.SimpleApplication
-
__init__
(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]¶ Initialises application object.
Parameters: - name – Name of application.
- persistence_policy – Persistence policy object.
- persist_event_type – Tuple of domain event classes to be persisted.
- cipher_key – Base64 unicode string cipher key.
- compressor – Compressor used to compress serialized event state.
- sequenced_item_class – Named tuple for mapping and recording events.
- sequenced_item_mapper_class – Object class for mapping stored events.
- record_manager_class – Object class for recording stored events.
- stored_event_record_class – Object class for event records.
- event_store_class – Object class uses to store and retrieve domain events.
- snapshot_record_class – Object class used to represent snapshots.
- setup_table – Option to create database tables when application starts.
- contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
- pipeline_id – ID of instance of system pipeline expressions.
- json_encoder_class – Object class used to encode object as JSON strings.
- json_decoder_class – Object class used to decode JSON strings as objects.
- notification_log_section_size – Number of notification items in a section.
- use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
-
notification_log_reader_class
¶ alias of
eventsourcing.application.notificationlog.NotificationLogReader
-
close
() → None[source]¶ Closes the application for further use.
The persistence policy is closed, and the application’s connection to the database is closed.
-
publish_prompt_for_events
(_: Optional[Iterable[eventsourcing.whitehead.ActualOccasion]] = None) → None[source]¶ Publishes prompt for a given event.
Used to prompt downstream process application when an event is published by this application’s model, which can happen when application command methods, rather than the process policy, are called.
Wraps exceptions with PromptFailed, to avoid application policy exceptions being seen directly in other applications when running synchronously in single thread.
-
follow
(upstream_application_name: str, notification_log: eventsourcing.application.notificationlog.AbstractNotificationLog) → None[source]¶ Sets up process application to follow the given notification log of an upstream application.
Parameters: - upstream_application_name – Name of the upstream application.
- notification_log – Notification log that will be processed.
-
run
(prompt: Optional[eventsourcing.application.simple.Prompt] = None, advance_by: Optional[int] = None) → int[source]¶ Pulls event notifications from notification logs being followed by this process application, and processes the contained domain events.
Parameters: - prompt – Optional prompt, specifying a particular notification log.
- advance_by – Maximum event notifications to process.
Returns: Returns number of events that have been processed.
-
check_causal_dependencies
(upstream_name, causal_dependencies_json)[source]¶ Checks the causal dependencies are satisfied (have already been processed).
Parameters: - upstream_name – Name of the upstream application being processed.
- causal_dependencies_json – Pipelines and positions in notification logs.
Raises: CausalDependencyFailed – If causal dependencies are not satisfied.
-
process_upstream_event
(domain_event: TAggregateEvent, notification_id: int, upstream_name: str) → Tuple[List[TAggregateEvent], List[T]][source]¶ Processes given domain event from an upstream notification log.
Calls the process application policy, and then records a process event, hence recording atomically all new domain events created by the call to the policy along with any ORM objects that may result.
Parameters: - domain_event – Domain event to be processed.
- notification_id – Position in notification log.
- upstream_name – Name of upstream application.
Returns: Returns a list of new domain events.
-
event_from_notification
(notification: Dict[str, Any]) → TAggregateEvent[source]¶ Reconstructs domain event from an event notification.
Parameters: notification – The event notification. Returns: A domain event.
-
call_policy
(domain_event: TAggregateEvent) → Tuple[List[TAggregateEvent], List[Dict[str, int]], List[Any], List[Any]][source]¶ Calls the process application policy with the given domain event.
Parameters: domain_event – Domain event that will be given to the policy. Returns: Returns a list of domain events, and a list of causal dependencies.
-
policy
(repository: eventsourcing.application.process.WrappedRepository[~TAggregate, ~TAggregateEvent][TAggregate, TAggregateEvent], event: TAggregateEvent) → Union[TAggregate, Sequence[TAggregate], None][source]¶ Empty method, can be overridden in subclasses to implement concrete policy.
-
-
class
eventsourcing.application.process.
ProcessApplicationWithSnapshotting
(snapshot_period: int = 0, **kwargs)[source]¶ Bases:
eventsourcing.application.snapshotting.SnapshottingApplication
,eventsourcing.application.process.ProcessApplication
Supplements process applications that will use snapshotting.
decorators¶
command¶
-
class
eventsourcing.application.command.
CommandProcess
(name: str = '', policy: Optional[function] = None, setup_table: bool = False, use_direct_query_if_available: bool = False, notification_log_reader_class: Optional[Type[eventsourcing.application.notificationlog.NotificationLogReader]] = None, apply_policy_to_generated_events: bool = False, **kwargs)[source]¶ Bases:
eventsourcing.application.process.ProcessApplication
-
persist_event_type
¶
-
pipeline¶
-
class
eventsourcing.application.pipeline.
PipelineExpression
(left: Union[PipelineExpression, PipeableMetaclass], right: Union[PipelineExpression, PipeableMetaclass])[source]¶ Bases:
object
Implements a left-to-right association between two objects.
snapshotting¶
-
class
eventsourcing.application.snapshotting.
SnapshottingApplication
(snapshot_period: int = 0, **kwargs)[source]¶ Bases:
eventsourcing.application.simple.SimpleApplication
-
__init__
(snapshot_period: int = 0, **kwargs)[source]¶ Initialises application object.
Parameters: - name – Name of application.
- persistence_policy – Persistence policy object.
- persist_event_type – Tuple of domain event classes to be persisted.
- cipher_key – Base64 unicode string cipher key.
- compressor – Compressor used to compress serialized event state.
- sequenced_item_class – Named tuple for mapping and recording events.
- sequenced_item_mapper_class – Object class for mapping stored events.
- record_manager_class – Object class for recording stored events.
- stored_event_record_class – Object class for event records.
- event_store_class – Object class uses to store and retrieve domain events.
- snapshot_record_class – Object class used to represent snapshots.
- setup_table – Option to create database tables when application starts.
- contiguous_record_ids – Whether or not to delegate notification ID generation to the record manager (to guarantee there will be no gaps).
- pipeline_id – ID of instance of system pipeline expressions.
- json_encoder_class – Object class used to encode object as JSON strings.
- json_decoder_class – Object class used to decode JSON strings as objects.
- notification_log_section_size – Number of notification items in a section.
- use_cache – Whether or not to keep aggregates in memory (saves replaying when accessing again, but uses memory).
-