infrastructure¶
The infrastructure layer adapts external devices in ways that are useful for the application, such as the way an event store encapsulates a database.
sequenceditem¶
The persistence model for storing events.
-
class
eventsourcing.infrastructure.sequenceditem.
SequencedItem
(sequence_id, position, topic, state)[source]¶ Bases:
tuple
-
sequence_id
¶ Alias for field number 0
-
position
¶ Alias for field number 1
-
topic
¶ Alias for field number 2
-
state
¶ Alias for field number 3
-
__getnewargs__
()¶ Return self as a plain tuple. Used by copy and pickle.
-
static
__new__
(_cls, sequence_id: uuid.UUID, position: int, topic: str, state: bytes)¶ Create new instance of SequencedItem(sequence_id, position, topic, state)
-
__repr__
()¶ Return a nicely formatted representation string
-
_asdict
()¶ Return a new OrderedDict which maps field names to their values.
-
classmethod
_make
(iterable)¶ Make a new SequencedItem object from a sequence or iterable
-
_replace
(**kwds)¶ Return a new SequencedItem object replacing specified fields with new values
-
-
class
eventsourcing.infrastructure.sequenceditem.
StoredEvent
(originator_id, originator_version, topic, state)[source]¶ Bases:
tuple
-
originator_id
¶ Alias for field number 0
-
originator_version
¶ Alias for field number 1
-
topic
¶ Alias for field number 2
-
state
¶ Alias for field number 3
-
__getnewargs__
()¶ Return self as a plain tuple. Used by copy and pickle.
-
static
__new__
(_cls, originator_id: uuid.UUID, originator_version: int, topic: str, state: bytes)¶ Create new instance of StoredEvent(originator_id, originator_version, topic, state)
-
__repr__
()¶ Return a nicely formatted representation string
-
_asdict
()¶ Return a new OrderedDict which maps field names to their values.
-
classmethod
_make
(iterable)¶ Make a new StoredEvent object from a sequence or iterable
-
_replace
(**kwds)¶ Return a new StoredEvent object replacing specified fields with new values
-
sequenceditemmapper¶
The sequenced item mapper maps sequenced items to application-level objects.
-
class
eventsourcing.infrastructure.sequenceditemmapper.
AbstractSequencedItemMapper
(**kwargs)[source]¶ Bases:
typing.Generic
,abc.ABC
-
item_from_event
(domain_event: TEvent) → NamedTuple[source]¶ Constructs and returns a sequenced item for given domain event.
-
event_from_item
(sequenced_item: NamedTuple) → TEvent[source]¶ Constructs and returns a domain event for given sequenced item.
-
-
class
eventsourcing.infrastructure.sequenceditemmapper.
SequencedItemMapper
(sequenced_item_class: Optional[Type[NamedTuple]] = None, sequence_id_attr_name: Optional[str] = None, position_attr_name: Optional[str] = None, json_encoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONDecoder]] = None, cipher: Optional[eventsourcing.utils.cipher.aes.AESCipher] = None, compressor: Any = None, other_attr_names: Tuple[str, ...] = ())[source]¶ Bases:
eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper
Uses JSON to transcode domain events.
-
__init__
(sequenced_item_class: Optional[Type[NamedTuple]] = None, sequence_id_attr_name: Optional[str] = None, position_attr_name: Optional[str] = None, json_encoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONDecoder]] = None, cipher: Optional[eventsourcing.utils.cipher.aes.AESCipher] = None, compressor: Any = None, other_attr_names: Tuple[str, ...] = ())[source]¶ Initialises mapper.
-
item_from_event
(domain_event: TEvent) → NamedTuple[source]¶ Constructs a sequenced item from a domain event.
-
construct_item_args
(domain_event: TEvent) → Tuple[source]¶ Constructs attributes of a sequenced item from the given domain event.
-
event_from_item
(sequenced_item: NamedTuple) → TEvent[source]¶ Reconstructs domain event from stored event topic and event attrs. Used in the event store when getting domain events.
-
base¶
Abstract base classes for the infrastructure layer.
-
class
eventsourcing.infrastructure.base.
AbstractRecordManager
(**kwargs)[source]¶ Bases:
abc.ABC
-
record_class
¶ Returns record class to be used by the record manager.
-
record_items
(sequenced_items: Iterable[NamedTuple]) → None[source]¶ Writes sequenced items into the datastore.
-
get_item
(sequence_id: uuid.UUID, position: int) → NamedTuple[source]¶ Gets sequenced item from the datastore.
-
get_items
(sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True) → Iterator[NamedTuple][source]¶ Iterates over records in sequence.
-
get_record
(sequence_id: uuid.UUID, position: int) → Any[source]¶ Gets record at position in sequence.
-
-
class
eventsourcing.infrastructure.base.
BaseRecordManager
(record_class: type, sequenced_item_class: Type[eventsourcing.infrastructure.sequenceditem.SequencedItem] = <class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids: bool = False, application_name: str = '', pipeline_id: int = 0, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.base.AbstractRecordManager
-
__init__
(record_class: type, sequenced_item_class: Type[eventsourcing.infrastructure.sequenceditem.SequencedItem] = <class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids: bool = False, application_name: str = '', pipeline_id: int = 0, **kwargs)[source]¶ Initialises record manager.
-
record_class
¶ Returns record class to be used by the record manager.
-
get_item
(sequence_id: uuid.UUID, position: int) → eventsourcing.infrastructure.sequenceditem.SequencedItem[source]¶ Gets sequenced item from the datastore.
-
get_items
(sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True) → Iterator[eventsourcing.infrastructure.sequenceditem.SequencedItem][source]¶ Returns sequenced item generator.
-
list_items
(*args, **kwargs) → List[eventsourcing.infrastructure.sequenceditem.SequencedItem][source]¶ Returns list of sequenced items.
-
-
class
eventsourcing.infrastructure.base.
RecordManagerWithNotifications
(record_class: type, sequenced_item_class: Type[eventsourcing.infrastructure.sequenceditem.SequencedItem] = <class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids: bool = False, application_name: str = '', pipeline_id: int = 0, **kwargs)[source]¶
-
class
eventsourcing.infrastructure.base.
RecordManagerWithTracking
(tracking_record_class: Optional[type] = None, *args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.base.RecordManagerWithNotifications
ACID record managers can write tracking records and event records in an atomic transaction, needed for atomic processing in process applications.
-
__init__
(tracking_record_class: Optional[type] = None, *args, **kwargs) → None[source]¶ Initialises record manager.
-
write_records
(records: Iterable[Any], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, orm_objs_pending_save: Optional[Sequence[Any]] = None, orm_objs_pending_delete: Optional[Sequence[Any]] = None) → None[source]¶ Writes tracking, event and notification records for a process event. :param orm_objs_pending_delete: :param orm_objs_pending_save:
-
get_max_tracking_record_id
(upstream_application_name: str) → int[source]¶ Return maximum tracking record ID for notification from upstream application in pipeline.
-
-
class
eventsourcing.infrastructure.base.
SQLRecordManager
(*args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.base.RecordManagerWithTracking
Common aspects of SQL record managers, such as SQLAlchemy and Django record managers.
-
record_items
(sequenced_items: Iterable[NamedTuple]) → None[source]¶ Writes sequenced items into the datastore.
-
insert_select_max
¶ SQL statement that inserts records with contiguous IDs, by selecting max ID from indexed table records.
-
_prepare_insert
(tmpl: str, record_class: type, field_names: List[str], placeholder_for_id: bool = False) → Any[source]¶ With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, using an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.
-
make_placeholder
(field_name: str) → str[source]¶ Returns “placeholder” string for late binding of values to query.
Depends on record manager’s adapted database system or adapted ORM.
-
insert_values
¶ SQL statement that inserts records without ID.
-
insert_tracking_record
¶ SQL statement that inserts tracking records.
-
-
class
eventsourcing.infrastructure.base.
AbstractEventStore
(record_manager: TRecordManager, event_mapper: eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper)[source]¶ Bases:
abc.ABC
,typing.Generic
Abstract base class for event stores. Defines the methods expected of an event store by other classes in the library.
-
__init__
(record_manager: TRecordManager, event_mapper: eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper)[source]¶ Initialises event store object.
Parameters: - record_manager – record manager
- event_mapper – sequenced item mapper
-
store_events
(events: Iterable[TEvent]) → None[source]¶ Put domain event in event store for later retrieval.
-
iter_events
(originator_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True, page_size: Optional[int] = None) → Iterable[TEvent][source]¶ Returns iterable of domain events for given entity ID.
-
list_events
(*args, **kwargs) → List[TEvent][source]¶ Returns list of domain events for given entity ID.
-
get_most_recent_event
(originator_id: uuid.UUID, lt: Optional[int] = None, lte: Optional[int] = None) → Optional[TEvent][source]¶ Returns most recent domain event for given entity ID.
-
all_events
() → Iterable[TEvent][source]¶ Returns all domain events in the event store.
This works by iterating over all sequences, so doesn’t return events in order. Use a Notification Log to project application state.
-
get_domain_events
(originator_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True, page_size: Optional[int] = None) → Iterable[TEvent][source]¶ Deprecated. Please use iter_events() instead.
Gets domain events from the sequence identified by originator_id.
Parameters: - originator_id – ID of a sequence of events
- gt – get items after this position
- gte – get items at or after this position
- lt – get items before this position
- lte – get items before or at this position
- limit – get limited number of items
- is_ascending – get items from lowest position
- page_size – restrict and repeat database query
Returns: list of domain events
-
datastore¶
Base classes for concrete datastore classes.
-
class
eventsourcing.infrastructure.datastore.
DatastoreSettings
[source]¶ Bases:
object
Settings for Datastore.
-
class
eventsourcing.infrastructure.datastore.
AbstractDatastore
(settings: TDatastoreSettings)[source]¶ Bases:
abc.ABC
,typing.Generic
-
can_drop_tables
= True¶ Datastores hold stored event records, used by a record manager.
-
-
exception
eventsourcing.infrastructure.datastore.
DatastoreConnectionError
[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreError
-
exception
eventsourcing.infrastructure.datastore.
DatastoreTableError
[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreError
cassandra¶
Classes for event sourcing with Apache Cassandra.
-
class
eventsourcing.infrastructure.cassandra.datastore.
CassandraSettings
(hosts=None, port=None, protocol_version=None, default_keyspace=None, consistency=None, replication_factor=None, username=None, password=None)[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreSettings
-
class
eventsourcing.infrastructure.cassandra.datastore.
CassandraDatastore
(tables, *args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.datastore.AbstractDatastore
-
class
eventsourcing.infrastructure.cassandra.factory.
CassandraInfrastructureFactory
(record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.base.AbstractEventStore]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper]] = None, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, integer_sequenced_record_class: Optional[type] = None, timestamp_sequenced_record_class: Optional[type] = None, snapshot_record_class: Optional[type] = None, contiguous_record_ids: bool = False, application_name: Optional[str] = None, pipeline_id: int = 0)[source]¶ Bases:
eventsourcing.infrastructure.factory.InfrastructureFactory
Infrastructure factory for Cassandra.
-
record_manager_class
¶ alias of
eventsourcing.infrastructure.cassandra.manager.CassandraRecordManager
-
integer_sequenced_record_class
¶ alias of
eventsourcing.infrastructure.cassandra.records.IntegerSequencedRecord
-
timestamp_sequenced_record_class
¶ alias of
eventsourcing.infrastructure.cassandra.records.TimestampSequencedRecord
-
snapshot_record_class
¶ alias of
eventsourcing.infrastructure.cassandra.records.SnapshotRecord
-
-
class
eventsourcing.infrastructure.cassandra.manager.
CassandraRecordManager
(record_class: type, sequenced_item_class: Type[eventsourcing.infrastructure.sequenceditem.SequencedItem] = <class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids: bool = False, application_name: str = '', pipeline_id: int = 0, **kwargs)[source]¶
-
class
eventsourcing.infrastructure.cassandra.records.
IntegerSequencedRecord
(**values)[source]¶ Bases:
cassandra.cqlengine.models.Model
Stores integer-sequenced items in Cassandra.
-
exception
DoesNotExist
¶ Bases:
cassandra.cqlengine.models.DoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
cassandra.cqlengine.models.MultipleObjectsReturned
-
exception
-
class
eventsourcing.infrastructure.cassandra.records.
TimestampSequencedRecord
(**values)[source]¶ Bases:
cassandra.cqlengine.models.Model
Stores timestamp-sequenced items in Cassandra.
-
exception
DoesNotExist
¶ Bases:
cassandra.cqlengine.models.DoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
cassandra.cqlengine.models.MultipleObjectsReturned
-
exception
-
class
eventsourcing.infrastructure.cassandra.records.
TimeuuidSequencedRecord
(**values)[source]¶ Bases:
cassandra.cqlengine.models.Model
Stores timeuuid-sequenced items in Cassandra.
-
exception
DoesNotExist
¶ Bases:
cassandra.cqlengine.models.DoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
cassandra.cqlengine.models.MultipleObjectsReturned
-
exception
-
class
eventsourcing.infrastructure.cassandra.records.
SnapshotRecord
(**values)[source]¶ Bases:
cassandra.cqlengine.models.Model
Stores snapshots in Cassandra.
-
exception
DoesNotExist
¶ Bases:
cassandra.cqlengine.models.DoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
cassandra.cqlengine.models.MultipleObjectsReturned
-
exception
-
class
eventsourcing.infrastructure.cassandra.records.
StoredEventRecord
(**values)[source]¶ Bases:
cassandra.cqlengine.models.Model
Stores integer-sequenced items in Cassandra.
-
exception
DoesNotExist
¶ Bases:
cassandra.cqlengine.models.DoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
cassandra.cqlengine.models.MultipleObjectsReturned
-
exception
django¶
Infrastructure for event sourcing with the Django ORM. This package functions as a Django application. It can be included in “INSTALLED_APPS” in settings.py in your Django project. There is just one migration, to create tables that do not exist.
-
class
eventsourcing.infrastructure.django.factory.
DjangoInfrastructureFactory
(tracking_record_class: Optional[type] = None, *args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.factory.InfrastructureFactory
Infrastructure factory for Django.
-
record_manager_class
¶ alias of
eventsourcing.infrastructure.django.manager.DjangoRecordManager
-
__init__
(tracking_record_class: Optional[type] = None, *args, **kwargs)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
construct_integer_sequenced_record_manager
(**kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]¶ Constructs Django record manager.
Returns: A Django record manager. Return type: DjangoRecordManager
-
-
class
eventsourcing.infrastructure.django.manager.
DjangoRecordManager
(*args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.base.SQLRecordManager
-
write_records
(records: Iterable[Any], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, orm_objs_pending_save: Optional[Sequence[Any]] = None, orm_objs_pending_delete: Optional[Sequence[Any]] = None) → None[source]¶ Writes tracking, event and notification records for a process event. :param orm_objs_pending_delete: :param orm_objs_pending_save:
-
make_placeholder
(_: str) → str[source]¶ Returns “placeholder” string for late binding of values to query.
Depends on record manager’s adapted database system or adapted ORM.
-
get_record_table_name
(record_class: type) → str[source]¶ Returns table name from SQLAlchemy record class.
-
get_record
(sequence_id: uuid.UUID, position: int) → Any[source]¶ Gets record at position in sequence.
-
get_records
(sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True) → Sequence[Any][source]¶ Returns records for a sequence.
-
get_notification_records
(start: Optional[int] = None, stop: Optional[int] = None, *args, **kwargs) → Iterable[T_co][source]¶ Returns all records in the table.
-
get_max_tracking_record_id
(upstream_application_name: str) → int[source]¶ Return maximum tracking record ID for notification from upstream application in pipeline.
-
-
class
eventsourcing.infrastructure.django.models.
IntegerSequencedRecord
(id, sequence_id, position, topic, state)[source]¶ Bases:
django.db.models.base.Model
-
exception
DoesNotExist
¶ Bases:
django.core.exceptions.ObjectDoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
django.core.exceptions.MultipleObjectsReturned
-
exception
-
class
eventsourcing.infrastructure.django.models.
TimestampSequencedRecord
(id, sequence_id, position, topic, state)[source]¶ Bases:
django.db.models.base.Model
-
exception
DoesNotExist
¶ Bases:
django.core.exceptions.ObjectDoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
django.core.exceptions.MultipleObjectsReturned
-
exception
-
class
eventsourcing.infrastructure.django.models.
SnapshotRecord
(uid, sequence_id, position, topic, state)[source]¶ Bases:
django.db.models.base.Model
-
exception
DoesNotExist
¶ Bases:
django.core.exceptions.ObjectDoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
django.core.exceptions.MultipleObjectsReturned
-
exception
-
class
eventsourcing.infrastructure.django.models.
EntitySnapshotRecord
(uid, application_name, originator_id, originator_version, topic, state)[source]¶ Bases:
django.db.models.base.Model
-
exception
DoesNotExist
¶ Bases:
django.core.exceptions.ObjectDoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
django.core.exceptions.MultipleObjectsReturned
-
exception
-
class
eventsourcing.infrastructure.django.models.
StoredEventRecord
(uid, application_name, originator_id, originator_version, pipeline_id, notification_id, topic, state, causal_dependencies)[source]¶ Bases:
django.db.models.base.Model
-
exception
DoesNotExist
¶ Bases:
django.core.exceptions.ObjectDoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
django.core.exceptions.MultipleObjectsReturned
-
exception
-
class
eventsourcing.infrastructure.django.models.
NotificationTrackingRecord
(uid, application_name, upstream_application_name, pipeline_id, notification_id)[source]¶ Bases:
django.db.models.base.Model
-
exception
DoesNotExist
¶ Bases:
django.core.exceptions.ObjectDoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
django.core.exceptions.MultipleObjectsReturned
-
exception
sqlalchemy¶
Classes for event sourcing with SQLAlchemy.
-
class
eventsourcing.infrastructure.sqlalchemy.datastore.
SQLAlchemySettings
(uri: Optional[str] = None, pool_size: Optional[int] = None)[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreSettings
-
class
eventsourcing.infrastructure.sqlalchemy.datastore.
SQLAlchemyDatastore
(settings: eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemySettings, base: sqlalchemy.ext.declarative.api.DeclarativeMeta = <class 'sqlalchemy.ext.declarative.api.Base'>, tables: Optional[Sequence[T_co]] = None, connection_strategy: str = 'plain', session: Union[sqlalchemy.orm.session.Session, sqlalchemy.orm.scoping.scoped_session, None] = None)[source]¶ Bases:
eventsourcing.infrastructure.datastore.AbstractDatastore
-
__init__
(settings: eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemySettings, base: sqlalchemy.ext.declarative.api.DeclarativeMeta = <class 'sqlalchemy.ext.declarative.api.Base'>, tables: Optional[Sequence[T_co]] = None, connection_strategy: str = 'plain', session: Union[sqlalchemy.orm.session.Session, sqlalchemy.orm.scoping.scoped_session, None] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
eventsourcing.infrastructure.sqlalchemy.factory.
SQLAlchemyInfrastructureFactory
(session: Any, uri: Optional[str] = None, pool_size: Optional[int] = None, tracking_record_class: Optional[type] = None, *args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.factory.InfrastructureFactory
Infrastructure factory for SQLAlchemy infrastructure.
-
record_manager_class
¶ alias of
eventsourcing.infrastructure.sqlalchemy.manager.SQLAlchemyRecordManager
-
integer_sequenced_record_class
¶ alias of
eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord
-
timestamp_sequenced_record_class
¶ alias of
eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord
-
snapshot_record_class
¶ alias of
eventsourcing.infrastructure.sqlalchemy.records.SnapshotRecord
-
tracking_record_class
¶ alias of
eventsourcing.infrastructure.sqlalchemy.records.NotificationTrackingRecord
-
__init__
(session: Any, uri: Optional[str] = None, pool_size: Optional[int] = None, tracking_record_class: Optional[type] = None, *args, **kwargs)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
construct_integer_sequenced_record_manager
(**kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]¶ Constructs SQLAlchemy record manager.
Returns: An SQLAlchemy record manager. Return type: SQLAlchemyRecordManager
-
construct_record_manager
(record_class: Optional[type], sequenced_item_class: Optional[Type[NamedTuple]] = None, **kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]¶ Constructs SQLAlchemy record manager.
Returns: An SQLAlchemy record manager. Return type: SQLAlchemyRecordManager
-
construct_datastore
() → Optional[eventsourcing.infrastructure.datastore.AbstractDatastore][source]¶ Constructs SQLAlchemy datastore.
Return type: SQLAlchemyDatastore
-
-
class
eventsourcing.infrastructure.sqlalchemy.manager.
SQLAlchemyRecordManager
(session: Any, *args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.base.SQLRecordManager
-
_prepare_insert
(tmpl: Any, record_class: type, field_names: List[str], placeholder_for_id: bool = False) → Any[source]¶ With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, assumes an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.
-
make_placeholder
(field_name: str) → str[source]¶ Returns “placeholder” string for late binding of values to query.
Depends on record manager’s adapted database system or adapted ORM.
-
write_records
(records: Iterable[Any], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, orm_objs_pending_save: Optional[Sequence[Any]] = None, orm_objs_pending_delete: Optional[Sequence[Any]] = None) → None[source]¶ Writes tracking, event and notification records for a process event. :param orm_objs_pending_delete: :param orm_objs_pending_save:
-
get_records
(sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True) → Sequence[Any][source]¶ Returns records for a sequence.
-
get_notification_records
(start: Optional[int] = None, stop: Optional[int] = None, *args, **kwargs) → Iterable[T_co][source]¶ Returns records sequenced by notification ID, from application, for pipeline, in given range.
Args ‘start’ and ‘stop’ are positions in a zero-based integer sequence.
-
get_record
(sequence_id: uuid.UUID, position: int) → Any[source]¶ Gets record at position in sequence.
-
get_max_tracking_record_id
(upstream_application_name: str) → int[source]¶ Return maximum tracking record ID for notification from upstream application in pipeline.
-
-
class
eventsourcing.infrastructure.sqlalchemy.records.
IntegerSequencedWithIDRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
-
class
eventsourcing.infrastructure.sqlalchemy.records.
IntegerSequencedNoIDRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
-
eventsourcing.infrastructure.sqlalchemy.records.
IntegerSequencedRecord
¶ alias of
eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord
-
class
eventsourcing.infrastructure.sqlalchemy.records.
TimestampSequencedWithIDRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
-
class
eventsourcing.infrastructure.sqlalchemy.records.
TimestampSequencedNoIDRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
-
eventsourcing.infrastructure.sqlalchemy.records.
TimestampSequencedRecord
¶ alias of
eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord
-
class
eventsourcing.infrastructure.sqlalchemy.records.
SnapshotRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
-
class
eventsourcing.infrastructure.sqlalchemy.records.
EntitySnapshotRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
-
class
eventsourcing.infrastructure.sqlalchemy.records.
StoredEventRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
-
class
eventsourcing.infrastructure.sqlalchemy.records.
NotificationTrackingRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
popo¶
Infrastructure for event sourcing with “plain old Python objects”.
-
class
eventsourcing.infrastructure.popo.factory.
PopoInfrastructureFactory
(record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.base.AbstractEventStore]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper]] = None, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, integer_sequenced_record_class: Optional[type] = None, timestamp_sequenced_record_class: Optional[type] = None, snapshot_record_class: Optional[type] = None, contiguous_record_ids: bool = False, application_name: Optional[str] = None, pipeline_id: int = 0)[source]¶ Bases:
eventsourcing.infrastructure.factory.InfrastructureFactory
-
record_manager_class
¶ alias of
eventsourcing.infrastructure.popo.manager.PopoRecordManager
-
integer_sequenced_record_class
¶ alias of
eventsourcing.infrastructure.popo.records.IntegerSequencedRecord
-
snapshot_record_class
¶ alias of
eventsourcing.infrastructure.popo.records.SnapshotRecord
-
-
class
eventsourcing.infrastructure.popo.manager.
PopoRecordManager
(*args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.base.RecordManagerWithTracking
-
get_notification_records
(start: Optional[int] = None, stop: Optional[int] = None, *args, **kwargs) → Iterable[T_co][source]¶ Returns records sequenced by notification ID, from application, for pipeline, in given range.
Args ‘start’ and ‘stop’ are positions in a zero-based integer sequence.
-
get_max_tracking_record_id
(upstream_application_name: str) → int[source]¶ Return maximum tracking record ID for notification from upstream application in pipeline.
-
get_record
(sequence_id: uuid.UUID, position: int) → Any[source]¶ Gets record at position in sequence.
-
get_records
(sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True) → Sequence[Any][source]¶ Returns records for a sequence.
-
has_tracking_record
(upstream_application_name: str, pipeline_id: int, notification_id: int) → bool[source]¶ True if tracking record exists for notification from upstream in pipeline.
-
record_items
(sequenced_items: Iterable[NamedTuple]) → None[source]¶ Writes sequenced items into the datastore.
-
write_records
(records: Iterable[Any], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, orm_objs_pending_save: Optional[Sequence[Any]] = None, orm_objs_pending_delete: Optional[Sequence[Any]] = None) → None[source]¶ Writes tracking, event and notification records for a process event. :param orm_objs_pending_delete: :param orm_objs_pending_save:
-
-
class
eventsourcing.infrastructure.popo.mapper.
SequencedItemMapperForPopo
(sequenced_item_class: Optional[Type[NamedTuple]] = None, sequence_id_attr_name: Optional[str] = None, position_attr_name: Optional[str] = None, json_encoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONDecoder]] = None, cipher: Optional[eventsourcing.utils.cipher.aes.AESCipher] = None, compressor: Any = None, other_attr_names: Tuple[str, ...] = ())[source]¶ Bases:
eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper
-
class
eventsourcing.infrastructure.popo.records.
IntegerSequencedRecord
(sequenced_item: NamedTuple)[source]¶ Bases:
object
Encapsulates sequenced item tuple (containing real event object).
-
class
eventsourcing.infrastructure.popo.records.
SnapshotRecord
(sequenced_item: NamedTuple)[source]¶ Bases:
eventsourcing.infrastructure.popo.records.IntegerSequencedRecord
-
class
eventsourcing.infrastructure.popo.records.
StoredEventRecord
(sequenced_item: NamedTuple)[source]¶ Bases:
eventsourcing.infrastructure.popo.records.IntegerSequencedRecord
Encapsulates sequenced item tuple (containing real event object).
Allows other attributes to be set, such as notification ID.
eventstore¶
The event store provides the interface to the event sourcing persistence mechanism that is used by applications.
-
class
eventsourcing.infrastructure.eventstore.
EventStore
(record_manager: TRecordManager, event_mapper: eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper)[source]¶ Bases:
eventsourcing.infrastructure.base.AbstractEventStore
Event store appends domain events to stored sequences. It uses a record manager to map named tuples to database records, and it uses a sequenced item mapper to map named tuples to application-level objects.
-
iterator_class
¶ alias of
eventsourcing.infrastructure.iterators.SequencedItemIterator
-
store_events
(events: Iterable[TEvent]) → None[source]¶ Appends given domain event, or list of domain events, to their sequence.
Parameters: events – domain event, or list of domain events
-
items_from_events
(events: Iterable[TEvent]) → Iterable[NamedTuple][source]¶ Maps domain event to sequenced item namedtuple.
An iterable of events.
-
iter_events
(originator_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True, page_size: Optional[int] = None) → Iterable[TEvent][source]¶ Gets domain events from the sequence identified by originator_id.
Parameters: - originator_id – ID of a sequence of events
- gt – get items after this position
- gte – get items at or after this position
- lt – get items before this position
- lte – get items before or at this position
- limit – get limited number of items
- is_ascending – get items from lowest position
- page_size – restrict and repeat database query
Returns: list of domain events
-
get_event
(originator_id: uuid.UUID, position: int) → TEvent[source]¶ Gets a domain event from the sequence identified by originator_id at position eq.
Parameters: - originator_id – ID of a sequence of events
- position – get item at this position
Returns: domain event
-
get_most_recent_event
(originator_id: uuid.UUID, lt: Optional[int] = None, lte: Optional[int] = None) → Optional[TEvent][source]¶ Gets a domain event from the sequence identified by originator_id at the highest position.
Parameters: - originator_id – ID of a sequence of events
- lt – get highest before this position
- lte – get highest at or before this position
Returns: domain event
-
all_events
() → Iterable[TEvent][source]¶ Yields all domain events in the event store.
This method iterates over the sequence IDs, and returns all the events for each sequence effectively concatenated together.
Use a notification log to propagate events from an application as a stable append-only sequence.
-
eventsourcedrepository¶
Base classes for event sourced repositories (not abstract, can be used directly).
-
class
eventsourcing.infrastructure.eventsourcedrepository.
EventSourcedRepository
(event_store: eventsourcing.infrastructure.base.AbstractEventStore, use_cache: bool = False, snapshot_strategy: Optional[eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy] = None, mutator_func: Optional[Callable[[Optional[TVersionedEntity], TVersionedEvent], Optional[TVersionedEntity]]] = None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.repository.AbstractEntityRepository
-
__init__
(event_store: eventsourcing.infrastructure.base.AbstractEventStore, use_cache: bool = False, snapshot_strategy: Optional[eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy] = None, mutator_func: Optional[Callable[[Optional[TVersionedEntity], TVersionedEvent], Optional[TVersionedEntity]]] = None, **kwargs)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
event_store
¶ Returns event store object used by this repository.
-
__contains__
(entity_id: uuid.UUID) → bool[source]¶ Returns a boolean value according to whether entity with given ID exists.
-
__getitem__
(entity_id: uuid.UUID) → TVersionedEntity[source]¶ Returns entity with given ID.
Parameters: entity_id – ID of entity in the repository. Raises: RepositoryKeyError – If the entity is not found.
-
get_entity
(entity_id: uuid.UUID, at: Optional[int] = None) → Optional[TVersionedEntity][source]¶ Returns entity with given ID, optionally at a version.
Returns None if entity not found.
-
get_and_project_events
(entity_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, initial_state: Optional[TVersionedEntity] = None, query_descending: bool = False) → Optional[TVersionedEntity][source]¶ Reconstitutes requested domain entity from domain events found in event store.
-
project_events
(initial_state: Optional[TVersionedEntity], domain_events: Iterable[TVersionedEvent]) → Optional[TVersionedEntity][source]¶ Evolves initial_state using the domain_events and a mutator function.
Applies a mutator function cumulatively to a sequence of domain events, so as to mutate the initial value to a mutated value.
This class’s mutate() method is used as the default mutator function, but custom behaviour can be introduced by passing in a ‘mutator_func’ argument when constructing this class, or by overridding the mutate() method.
-
static
mutate
(initial: Optional[TVersionedEntity], event: TVersionedEvent) → Optional[TVersionedEntity][source]¶ Default mutator function, which uses __mutate__() method on event object to mutate initial state.
Parameters: - initial – Initial state to be mutated by this function.
- event – Event that causes the initial state to be mutated.
Returns: Returns the mutated state.
-
take_snapshot
(entity_id: uuid.UUID, lt: Optional[int] = None, lte: Optional[int] = None) → Optional[eventsourcing.domain.model.events.AbstractSnapshot][source]¶ Takes a snapshot of the entity as it existed after the most recent event, optionally less than, or less than or equal to, a particular position.
-
iterators¶
Different ways of getting sequenced items from a datastore.
-
class
eventsourcing.infrastructure.iterators.
AbstractSequencedItemIterator
(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, page_size: Optional[int] = None, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True)[source]¶ Bases:
collections.abc.Iterable
,typing.Generic
-
__init__
(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, page_size: Optional[int] = None, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True)[source]¶ Initialises sequenced item iterator.
Parameters: - record_manager – The record manager used to get sequenced items.
- sequence_id – The id of the sequence being iterated over.
- page_size – The number of items requested from the record manager.
- gt – Exclusive lower bound on position of items returned.
- gte – Inclusive lower bound on position of items returned.
- lt – Exclusive upper bound on position of items returned.
- lte – Inclusive upper bound on position of items returned.
- limit – Limit to the number of items returned.
- is_ascending – Whether or not to iterate in ascending order.
-
_inc_page_counter
() → None[source]¶ Increments the page counter.
Each query result as a page, even if there are no items in the page. This really counts queries.
- it is easy to divide the number of events by the page size if the
“correct” answer is required - there will be a difference in the counts when the number of events can be exactly divided by the page
size, because there is no way to know in advance that a full page is also the last page.
-
-
class
eventsourcing.infrastructure.iterators.
SequencedItemIterator
(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, page_size: Optional[int] = None, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True)[source]¶ Bases:
eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator
-
class
eventsourcing.infrastructure.iterators.
ThreadedSequencedItemIterator
(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, page_size: Optional[int] = None, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True)[source]¶ Bases:
eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator
-
class
eventsourcing.infrastructure.iterators.
GetEntityEventsThread
(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, page_size: Optional[int] = None, is_ascending: bool = True, *args, **kwargs)[source]¶ Bases:
threading.Thread
-
__init__
(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, page_size: Optional[int] = None, is_ascending: bool = True, *args, **kwargs)[source]¶ This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
-
run
() → None[source]¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
factory¶
Infrastructure factory.
-
class
eventsourcing.infrastructure.factory.
InfrastructureFactory
(record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.base.AbstractEventStore]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper]] = None, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, integer_sequenced_record_class: Optional[type] = None, timestamp_sequenced_record_class: Optional[type] = None, snapshot_record_class: Optional[type] = None, contiguous_record_ids: bool = False, application_name: Optional[str] = None, pipeline_id: int = 0)[source]¶ Bases:
typing.Generic
Base class for infrastructure factories.
-
__init__
(record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.base.AbstractEventStore]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper]] = None, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, integer_sequenced_record_class: Optional[type] = None, timestamp_sequenced_record_class: Optional[type] = None, snapshot_record_class: Optional[type] = None, contiguous_record_ids: bool = False, application_name: Optional[str] = None, pipeline_id: int = 0)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
sequenced_item_class
¶ alias of
eventsourcing.infrastructure.sequenceditem.SequencedItem
-
sequenced_item_mapper_class
¶ alias of
eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper
-
construct_integer_sequenced_record_manager
(integer_sequenced_record_class=None, **kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]¶ Constructs an integer sequenced record manager.
-
construct_timestamp_sequenced_record_manager
(**kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]¶ Constructs a timestamp sequenced record manager.
-
construct_snapshot_record_manager
(**kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]¶ Constructs a snapshot record manager.
-
construct_record_manager
(record_class: Optional[type], sequenced_item_class: Optional[Type[NamedTuple]] = None, **kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]¶ Constructs an record manager.
-
construct_sequenced_item_mapper
(cipher: Optional[eventsourcing.utils.cipher.aes.AESCipher], compressor: Any) → eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper[source]¶ Constructs sequenced item mapper object.
Returns: Sequenced item mapper object. Return type: eventsourcing.infrastructure.sequenceditemmapper .AbstractSequencedItemMapper
-
snapshotting¶
Snapshotting avoids having to replay an entire sequence of events to obtain the current state of a projection.
-
class
eventsourcing.infrastructure.snapshotting.
AbstractSnapshotStrategy
[source]¶ Bases:
abc.ABC
-
class
eventsourcing.infrastructure.snapshotting.
EventSourcedSnapshotStrategy
(snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager][eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager])[source]¶ Bases:
eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy
Snapshot strategy that uses an event sourced snapshot.
-
__init__
(snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager][eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager])[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
timebucketedlog_reader¶
Reader for timebucketed logs.
repositories¶
Repository base classes for entity classes defined in the library.
-
class
eventsourcing.infrastructure.repositories.array.
ArrayRepository
(array_size=10000, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.array.AbstractArrayRepository
,eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
-
class
eventsourcing.infrastructure.repositories.array.
BigArrayRepository
(array_size: int = 10000, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.array.AbstractBigArrayRepository
,eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
-
subrepo_class
¶ alias of
ArrayRepository
-
__init__
(array_size: int = 10000, *args, **kwargs)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
subrepo
¶ Sub-sequence repository.
-
-
class
eventsourcing.infrastructure.repositories.collection_repo.
CollectionRepository
(event_store: eventsourcing.infrastructure.base.AbstractEventStore, use_cache: bool = False, snapshot_strategy: Optional[eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy] = None, mutator_func: Optional[Callable[[Optional[TVersionedEntity], TVersionedEvent], Optional[TVersionedEntity]]] = None, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
,eventsourcing.domain.model.collection.AbstractCollectionRepository
Event sourced repository for the Collection domain model entity.
-
class
eventsourcing.infrastructure.repositories.timebucketedlog_repo.
TimebucketedlogRepo
(event_store: eventsourcing.infrastructure.base.AbstractEventStore, use_cache: bool = False, snapshot_strategy: Optional[eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy] = None, mutator_func: Optional[Callable[[Optional[TVersionedEntity], TVersionedEvent], Optional[TVersionedEntity]]] = None, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
,eventsourcing.domain.model.timebucketedlog.TimebucketedlogRepository
Event sourced repository for the Example domain model entity.
integersequencegenerators¶
Different ways of generating sequences of integers.
-
class
eventsourcing.infrastructure.integersequencegenerators.base.
AbstractIntegerSequenceGenerator
[source]¶ Bases:
object
Abstract base class for generating a sequence of integers.
-
class
eventsourcing.infrastructure.integersequencegenerators.base.
SimpleIntegerSequenceGenerator
(i: int = 0)[source]¶ Bases:
eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator
Generates a sequence of integers, by simply incrementing a Python int.
-
class
eventsourcing.infrastructure.integersequencegenerators.redisincr.
RedisIncr
(redis: Optional[redis.client.Redis] = None, key: Optional[str] = None)[source]¶ Bases:
eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator
Generates a sequence of integers, using Redis’ INCR command.
Maximum number is 2**63, or 9223372036854775807, the maximum value of a 64 bit signed integer.