infrastructure

The infrastructure layer adapts external devices in ways that are useful for the application, such as the way an event store encapsulates a database.

sequenceditem

The persistence model for storing events.

class eventsourcing.infrastructure.sequenceditem.SequencedItem(sequence_id, position, topic, state)[source]

Bases: tuple

sequence_id

Alias for field number 0

position

Alias for field number 1

topic

Alias for field number 2

state

Alias for field number 3

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, sequence_id: uuid.UUID, position: int, topic: str, state: bytes)

Create new instance of SequencedItem(sequence_id, position, topic, state)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values.

classmethod _make(iterable)

Make a new SequencedItem object from a sequence or iterable

_replace(**kwds)

Return a new SequencedItem object replacing specified fields with new values

class eventsourcing.infrastructure.sequenceditem.StoredEvent(originator_id, originator_version, topic, state)[source]

Bases: tuple

originator_id

Alias for field number 0

originator_version

Alias for field number 1

topic

Alias for field number 2

state

Alias for field number 3

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, originator_id: uuid.UUID, originator_version: int, topic: str, state: bytes)

Create new instance of StoredEvent(originator_id, originator_version, topic, state)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values.

classmethod _make(iterable)

Make a new StoredEvent object from a sequence or iterable

_replace(**kwds)

Return a new StoredEvent object replacing specified fields with new values

sequenceditemmapper

The sequenced item mapper maps sequenced items to application-level objects.

class eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper(**kwargs)[source]

Bases: typing.Generic, abc.ABC

__init__(**kwargs)[source]

Initialises mapper.

item_from_event(domain_event: TEvent) → NamedTuple[source]

Constructs and returns a sequenced item for given domain event.

event_from_item(sequenced_item: NamedTuple) → TEvent[source]

Constructs and returns a domain event for given sequenced item.

json_dumps(o: object) → bytes[source]

Encodes given object as JSON.

json_loads(s: str) → object[source]

Decodes given JSON as object.

event_from_topic_and_state(topic: str, state: bytes) → TEvent[source]

Resolves topic to an event class, decodes state, and constructs an event.

class eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper(sequenced_item_class: Optional[Type[NamedTuple]] = None, sequence_id_attr_name: Optional[str] = None, position_attr_name: Optional[str] = None, json_encoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONDecoder]] = None, cipher: Optional[eventsourcing.utils.cipher.aes.AESCipher] = None, compressor: Any = None, other_attr_names: Tuple[str, ...] = ())[source]

Bases: eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper

Uses JSON to transcode domain events.

__init__(sequenced_item_class: Optional[Type[NamedTuple]] = None, sequence_id_attr_name: Optional[str] = None, position_attr_name: Optional[str] = None, json_encoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONDecoder]] = None, cipher: Optional[eventsourcing.utils.cipher.aes.AESCipher] = None, compressor: Any = None, other_attr_names: Tuple[str, ...] = ())[source]

Initialises mapper.

item_from_event(domain_event: TEvent) → NamedTuple[source]

Constructs a sequenced item from a domain event.

construct_item_args(domain_event: TEvent) → Tuple[source]

Constructs attributes of a sequenced item from the given domain event.

json_dumps(o: object) → bytes[source]

Encodes given object as JSON.

event_from_item(sequenced_item: NamedTuple) → TEvent[source]

Reconstructs domain event from stored event topic and event attrs. Used in the event store when getting domain events.

event_from_topic_and_state(topic: str, state: bytes) → TEvent[source]

Resolves topic to an event class, decodes state, and constructs an event.

json_loads(s: str) → Dict[KT, VT][source]

Decodes given JSON as object.

base

Abstract base classes for the infrastructure layer.

class eventsourcing.infrastructure.base.AbstractRecordManager(**kwargs)[source]

Bases: abc.ABC

__init__(**kwargs)[source]

Initialises record manager.

record_class

Returns record class to be used by the record manager.

record_items(sequenced_items: Iterable[NamedTuple]) → None[source]

Writes sequenced items into the datastore.

record_item(sequenced_item: NamedTuple) → None[source]

Writes sequenced item into the datastore.

get_item(sequence_id: uuid.UUID, position: int) → NamedTuple[source]

Gets sequenced item from the datastore.

get_items(sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True) → Iterator[NamedTuple][source]

Iterates over records in sequence.

get_record(sequence_id: uuid.UUID, position: int) → Any[source]

Gets record at position in sequence.

get_records(sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True) → Sequence[Any][source]

Returns records for a sequence.

all_sequence_ids() → Iterable[uuid.UUID][source]

Returns all sequence IDs.

delete_record(record: Any) → None[source]

Removes permanently given record from the table.

class eventsourcing.infrastructure.base.BaseRecordManager(record_class: type, sequenced_item_class: Type[NamedTuple] = <class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids: bool = False, application_name: str = '', pipeline_id: int = 0, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.AbstractRecordManager

__init__(record_class: type, sequenced_item_class: Type[NamedTuple] = <class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids: bool = False, application_name: str = '', pipeline_id: int = 0, **kwargs)[source]

Initialises record manager.

record_class

Returns record class to be used by the record manager.

get_item(sequence_id: uuid.UUID, position: int) → NamedTuple[source]

Gets sequenced item from the datastore.

get_items(sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True) → Iterator[NamedTuple][source]

Returns sequenced item generator.

list_items(*args, **kwargs) → List[NamedTuple][source]

Returns list of sequenced items.

to_record(sequenced_item: NamedTuple) → object[source]

Constructs a record object from given sequenced item object.

from_record(record: object) → NamedTuple[source]

Constructs and returns a sequenced item object, from given ORM object.

class eventsourcing.infrastructure.base.RecordManagerWithNotifications(record_class: type, sequenced_item_class: Type[NamedTuple] = <class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids: bool = False, application_name: str = '', pipeline_id: int = 0, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.BaseRecordManager

get_max_notification_id() → int[source]

Return maximum notification ID in pipeline.

get_notification_records(start: Optional[int] = None, stop: Optional[int] = None, *args, **kwargs) → Iterable[T_co][source]

Returns records sequenced by notification ID, from application, for pipeline, in given range.

Args ‘start’ and ‘stop’ are positions in a zero-based integer sequence.

class eventsourcing.infrastructure.base.RecordManagerWithTracking(tracking_record_class: Optional[type] = None, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.RecordManagerWithNotifications

ACID record managers can write tracking records and event records in an atomic transaction, needed for atomic processing in process applications.

__init__(tracking_record_class: Optional[type] = None, *args, **kwargs) → None[source]

Initialises record manager.

write_records(records: Iterable[Any], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, orm_objs_pending_save: Optional[Sequence[Any]] = None, orm_objs_pending_delete: Optional[Sequence[Any]] = None) → None[source]

Writes tracking, event and notification records for a process event. :param orm_objs_pending_delete: :param orm_objs_pending_save:

get_max_tracking_record_id(upstream_application_name: str) → int[source]

Return maximum tracking record ID for notification from upstream application in pipeline.

has_tracking_record(upstream_application_name: str, pipeline_id: int, notification_id: int) → bool[source]

True if tracking record exists for notification from upstream in pipeline.

get_pipeline_and_notification_id(sequence_id: uuid.UUID, position: int) → Tuple[source]

Returns pipeline ID and notification ID for event at given position in given sequence.

class eventsourcing.infrastructure.base.SQLRecordManager(*args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.RecordManagerWithTracking

Common aspects of SQL record managers, such as SQLAlchemy and Django record managers.

__init__(*args, **kwargs)[source]

Initialises record manager.

record_items(sequenced_items: Iterable[NamedTuple]) → None[source]

Writes sequenced items into the datastore.

insert_select_max

SQL statement that inserts records with contiguous IDs, by selecting max ID from indexed table records.

_prepare_insert(tmpl: str, record_class: type, field_names: List[str], placeholder_for_id: bool = False) → Any[source]

With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, using an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.

make_placeholder(field_name: str) → str[source]

Returns “placeholder” string for late binding of values to query.

Depends on record manager’s adapted database system or adapted ORM.

insert_values

SQL statement that inserts records without ID.

insert_tracking_record

SQL statement that inserts tracking records.

get_record_table_name(record_class: type) → str[source]

Returns table name - used in raw queries.

Return type:str
class eventsourcing.infrastructure.base.AbstractEventStore(record_manager: TRecordManager, event_mapper: eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper)[source]

Bases: abc.ABC, typing.Generic

Abstract base class for event stores. Defines the methods expected of an event store by other classes in the library.

__init__(record_manager: TRecordManager, event_mapper: eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper)[source]

Initialises event store object.

Parameters:
  • record_manager – record manager
  • event_mapper – sequenced item mapper
store_events(events: Iterable[TEvent]) → None[source]

Put domain event in event store for later retrieval.

iter_events(originator_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True, page_size: Optional[int] = None) → Iterable[TEvent][source]

Returns iterable of domain events for given entity ID.

list_events(*args, **kwargs) → List[TEvent][source]

Returns list of domain events for given entity ID.

get_event(originator_id: uuid.UUID, position: int) → TEvent[source]

Returns a single domain event.

get_most_recent_event(originator_id: uuid.UUID, lt: Optional[int] = None, lte: Optional[int] = None) → Optional[TEvent][source]

Returns most recent domain event for given entity ID.

all_events() → Iterable[TEvent][source]

Returns all domain events in the event store.

This works by iterating over all sequences, so doesn’t return events in order. Use a Notification Log to project application state.

get_domain_events(originator_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True, page_size: Optional[int] = None) → Iterable[TEvent][source]

Deprecated. Please use iter_events() instead.

Gets domain events from the sequence identified by originator_id.

Parameters:
  • originator_id – ID of a sequence of events
  • gt – get items after this position
  • gte – get items at or after this position
  • lt – get items before this position
  • lte – get items before or at this position
  • limit – get limited number of items
  • is_ascending – get items from lowest position
  • page_size – restrict and repeat database query
Returns:

list of domain events

items_from_events(events: Iterable[TEvent]) → Iterable[NamedTuple][source]

Maps domain event to sequenced item namedtuple.

Parameters:events – An iterable of events.

datastore

Base classes for concrete datastore classes.

class eventsourcing.infrastructure.datastore.DatastoreSettings[source]

Bases: object

Settings for Datastore.

class eventsourcing.infrastructure.datastore.AbstractDatastore(settings: TDatastoreSettings)[source]

Bases: abc.ABC, typing.Generic

can_drop_tables = True

Datastores hold stored event records, used by a record manager.

__init__(settings: TDatastoreSettings)[source]

Initialize self. See help(type(self)) for accurate signature.

setup_connection() → None[source]

Sets up a connection to a datastore.

close_connection() → None[source]

Drops connection to a datastore.

setup_tables() → None[source]

Sets up tables used to store events.

setup_table(table: Any) → None[source]

Sets up given table.

drop_tables() → None[source]

Drops tables used to store events.

drop_table(table: Any) → None[source]

Drops given table.

truncate_tables() → None[source]

Truncates tables used to store events.

exception eventsourcing.infrastructure.datastore.DatastoreError[source]

Bases: Exception

exception eventsourcing.infrastructure.datastore.DatastoreConnectionError[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreError

exception eventsourcing.infrastructure.datastore.DatastoreTableError[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreError

cassandra

Classes for event sourcing with Apache Cassandra.

class eventsourcing.infrastructure.cassandra.datastore.CassandraSettings(hosts=None, port=None, protocol_version=None, default_keyspace=None, consistency=None, replication_factor=None, username=None, password=None)[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreSettings

__init__(hosts=None, port=None, protocol_version=None, default_keyspace=None, consistency=None, replication_factor=None, username=None, password=None)[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.infrastructure.cassandra.datastore.CassandraDatastore(tables, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.datastore.AbstractDatastore

__init__(tables, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

setup_connection()[source]

Sets up a connection to a datastore.

close_connection()[source]

Drops connection to a datastore.

setup_tables()[source]

Sets up tables used to store events.

setup_table(table) → None[source]

Sets up given table.

drop_tables()[source]

Drops tables used to store events.

drop_table(*_)[source]

Drops given table.

truncate_tables()[source]

Truncates tables used to store events.

class eventsourcing.infrastructure.cassandra.factory.CassandraInfrastructureFactory(record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.base.AbstractEventStore]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper]] = None, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, integer_sequenced_record_class: Optional[type] = None, timestamp_sequenced_record_class: Optional[type] = None, snapshot_record_class: Optional[type] = None, contiguous_record_ids: bool = False, application_name: Optional[str] = None, pipeline_id: int = 0)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

Infrastructure factory for Cassandra.

record_manager_class

alias of eventsourcing.infrastructure.cassandra.manager.CassandraRecordManager

integer_sequenced_record_class

alias of eventsourcing.infrastructure.cassandra.records.IntegerSequencedRecord

timestamp_sequenced_record_class

alias of eventsourcing.infrastructure.cassandra.records.TimestampSequencedRecord

snapshot_record_class

alias of eventsourcing.infrastructure.cassandra.records.SnapshotRecord

class eventsourcing.infrastructure.cassandra.manager.CassandraRecordManager(record_class: type, sequenced_item_class: Type[NamedTuple] = <class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids: bool = False, application_name: str = '', pipeline_id: int = 0, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.BaseRecordManager

record_items(sequenced_items)[source]

Writes sequenced items into the datastore.

get_record(sequence_id, position)[source]

Gets record at position in sequence.

get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

all_sequence_ids()[source]

Returns all sequence IDs.

delete_record(record)[source]

Removes permanently given record from the table.

class eventsourcing.infrastructure.cassandra.records.IntegerSequencedRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores integer-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

class eventsourcing.infrastructure.cassandra.records.TimestampSequencedRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores timestamp-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

class eventsourcing.infrastructure.cassandra.records.TimeuuidSequencedRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores timeuuid-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

class eventsourcing.infrastructure.cassandra.records.SnapshotRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores snapshots in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

class eventsourcing.infrastructure.cassandra.records.StoredEventRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores integer-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

django

Infrastructure for event sourcing with the Django ORM. This package functions as a Django application. It can be included in “INSTALLED_APPS” in settings.py in your Django project. There is just one migration, to create tables that do not exist.

class eventsourcing.infrastructure.django.factory.DjangoInfrastructureFactory(tracking_record_class: Optional[type] = None, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

Infrastructure factory for Django.

record_manager_class

alias of eventsourcing.infrastructure.django.manager.DjangoRecordManager

__init__(tracking_record_class: Optional[type] = None, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

construct_integer_sequenced_record_manager(**kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]

Constructs Django record manager.

Returns:A Django record manager.
Return type:DjangoRecordManager
class eventsourcing.infrastructure.django.manager.DjangoRecordManager(*args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.SQLRecordManager

write_records(records: Iterable[Any], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, orm_objs_pending_save: Optional[Sequence[Any]] = None, orm_objs_pending_delete: Optional[Sequence[Any]] = None) → None[source]

Writes tracking, event and notification records for a process event. :param orm_objs_pending_delete: :param orm_objs_pending_save:

make_placeholder(_: str) → str[source]

Returns “placeholder” string for late binding of values to query.

Depends on record manager’s adapted database system or adapted ORM.

get_record_table_name(record_class: type) → str[source]

Returns table name from SQLAlchemy record class.

get_record(sequence_id: uuid.UUID, position: int) → Any[source]

Gets record at position in sequence.

get_records(sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True) → Sequence[Any][source]

Returns records for a sequence.

get_notification_records(start: Optional[int] = None, stop: Optional[int] = None, *args, **kwargs) → Iterable[T_co][source]

Returns all records in the table.

delete_record(record: Any) → None[source]

Permanently removes record from table.

get_max_notification_id() → int[source]

Return maximum notification ID in pipeline.

get_max_tracking_record_id(upstream_application_name: str) → int[source]

Return maximum tracking record ID for notification from upstream application in pipeline.

has_tracking_record(upstream_application_name: str, pipeline_id: int, notification_id: int) → bool[source]

True if tracking record exists for notification from upstream in pipeline.

all_sequence_ids() → Iterable[uuid.UUID][source]

Returns all sequence IDs.

class eventsourcing.infrastructure.django.models.IntegerSequencedRecord(id, sequence_id, position, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

class eventsourcing.infrastructure.django.models.TimestampSequencedRecord(id, sequence_id, position, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

class eventsourcing.infrastructure.django.models.SnapshotRecord(uid, sequence_id, position, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

class eventsourcing.infrastructure.django.models.EntitySnapshotRecord(uid, application_name, originator_id, originator_version, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

class eventsourcing.infrastructure.django.models.StoredEventRecord(uid, application_name, originator_id, originator_version, pipeline_id, notification_id, topic, state, causal_dependencies)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

class eventsourcing.infrastructure.django.models.NotificationTrackingRecord(uid, application_name, upstream_application_name, pipeline_id, notification_id)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

sqlalchemy

Classes for event sourcing with SQLAlchemy.

class eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemySettings(uri: Optional[str] = None, pool_size: Optional[int] = None)[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreSettings

__init__(uri: Optional[str] = None, pool_size: Optional[int] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemyDatastore(settings: eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemySettings, base: sqlalchemy.ext.declarative.api.DeclarativeMeta = <class 'sqlalchemy.ext.declarative.api.Base'>, tables: Optional[Sequence[T_co]] = None, connection_strategy: str = 'plain', session: Union[sqlalchemy.orm.session.Session, sqlalchemy.orm.scoping.scoped_session, None] = None)[source]

Bases: eventsourcing.infrastructure.datastore.AbstractDatastore

__init__(settings: eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemySettings, base: sqlalchemy.ext.declarative.api.DeclarativeMeta = <class 'sqlalchemy.ext.declarative.api.Base'>, tables: Optional[Sequence[T_co]] = None, connection_strategy: str = 'plain', session: Union[sqlalchemy.orm.session.Session, sqlalchemy.orm.scoping.scoped_session, None] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

setup_connection() → None[source]

Sets up a connection to a datastore.

setup_tables() → None[source]

Sets up tables used to store events.

setup_table(table: Any) → None[source]

Sets up given table.

drop_tables() → None[source]

Drops tables used to store events.

drop_table(table: Any) → None[source]

Drops given table.

truncate_tables() → None[source]

Truncates tables used to store events.

close_connection() → None[source]

Drops connection to a datastore.

class eventsourcing.infrastructure.sqlalchemy.factory.SQLAlchemyInfrastructureFactory(session: Any, uri: Optional[str] = None, pool_size: Optional[int] = None, tracking_record_class: Optional[type] = None, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

Infrastructure factory for SQLAlchemy infrastructure.

record_manager_class

alias of eventsourcing.infrastructure.sqlalchemy.manager.SQLAlchemyRecordManager

integer_sequenced_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord

timestamp_sequenced_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord

snapshot_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.SnapshotRecord

tracking_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.NotificationTrackingRecord

__init__(session: Any, uri: Optional[str] = None, pool_size: Optional[int] = None, tracking_record_class: Optional[type] = None, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

construct_integer_sequenced_record_manager(**kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]

Constructs SQLAlchemy record manager.

Returns:An SQLAlchemy record manager.
Return type:SQLAlchemyRecordManager
construct_record_manager(record_class: Optional[type], sequenced_item_class: Optional[Type[NamedTuple]] = None, **kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]

Constructs SQLAlchemy record manager.

Returns:An SQLAlchemy record manager.
Return type:SQLAlchemyRecordManager
construct_datastore() → Optional[eventsourcing.infrastructure.datastore.AbstractDatastore][source]

Constructs SQLAlchemy datastore.

Return type:SQLAlchemyDatastore
class eventsourcing.infrastructure.sqlalchemy.manager.SQLAlchemyRecordManager(session: Any, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.SQLRecordManager

__init__(session: Any, *args, **kwargs)[source]

Initialises record manager.

_prepare_insert(tmpl: Any, record_class: type, field_names: List[str], placeholder_for_id: bool = False) → Any[source]

With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, assumes an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.

make_placeholder(field_name: str) → str[source]

Returns “placeholder” string for late binding of values to query.

Depends on record manager’s adapted database system or adapted ORM.

write_records(records: Iterable[Any], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, orm_objs_pending_save: Optional[Sequence[Any]] = None, orm_objs_pending_delete: Optional[Sequence[Any]] = None) → None[source]

Writes tracking, event and notification records for a process event. :param orm_objs_pending_delete: :param orm_objs_pending_save:

get_records(sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True) → Sequence[Any][source]

Returns records for a sequence.

get_notification_records(start: Optional[int] = None, stop: Optional[int] = None, *args, **kwargs) → Iterable[T_co][source]

Returns records sequenced by notification ID, from application, for pipeline, in given range.

Args ‘start’ and ‘stop’ are positions in a zero-based integer sequence.

get_record(sequence_id: uuid.UUID, position: int) → Any[source]

Gets record at position in sequence.

get_max_notification_id() → int[source]

Return maximum notification ID in pipeline.

get_max_tracking_record_id(upstream_application_name: str) → int[source]

Return maximum tracking record ID for notification from upstream application in pipeline.

has_tracking_record(upstream_application_name: str, pipeline_id: int, notification_id: int) → bool[source]

True if tracking record exists for notification from upstream in pipeline.

all_sequence_ids() → Iterable[uuid.UUID][source]

Returns all sequence IDs.

delete_record(record: Any) → None[source]

Permanently removes record from table.

get_record_table_name(record_class: type) → str[source]

Returns table name - used in raw queries.

Return type:str
class eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedNoIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedRecord

alias of eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord

class eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedWithIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedRecord

alias of eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord

class eventsourcing.infrastructure.sqlalchemy.records.SnapshotRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class eventsourcing.infrastructure.sqlalchemy.records.EntitySnapshotRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class eventsourcing.infrastructure.sqlalchemy.records.StoredEventRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class eventsourcing.infrastructure.sqlalchemy.records.NotificationTrackingRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

popo

Infrastructure for event sourcing with “plain old Python objects”.

class eventsourcing.infrastructure.popo.factory.PopoInfrastructureFactory(record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.base.AbstractEventStore]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper]] = None, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, integer_sequenced_record_class: Optional[type] = None, timestamp_sequenced_record_class: Optional[type] = None, snapshot_record_class: Optional[type] = None, contiguous_record_ids: bool = False, application_name: Optional[str] = None, pipeline_id: int = 0)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

record_manager_class

alias of eventsourcing.infrastructure.popo.manager.PopoRecordManager

class eventsourcing.infrastructure.popo.manager.PopoRecordManager(*args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.RecordManagerWithTracking

__init__(*args, **kwargs)[source]

Initialises record manager.

all_sequence_ids() → List[uuid.UUID][source]

Returns all sequence IDs.

delete_record(record: Any) → None[source]

Removes permanently given record from the table.

get_max_notification_id() → int[source]

Return maximum notification ID in pipeline.

get_notification_records(start: Optional[int] = None, stop: Optional[int] = None, *args, **kwargs) → Iterable[T_co][source]

Returns records sequenced by notification ID, from application, for pipeline, in given range.

Args ‘start’ and ‘stop’ are positions in a zero-based integer sequence.

get_max_tracking_record_id(upstream_application_name: str) → int[source]

Return maximum tracking record ID for notification from upstream application in pipeline.

get_record(sequence_id: uuid.UUID, position: int) → Any[source]

Gets record at position in sequence.

get_records(sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, query_ascending: bool = True, results_ascending: bool = True) → Sequence[Any][source]

Returns records for a sequence.

has_tracking_record(upstream_application_name: str, pipeline_id: int, notification_id: int) → bool[source]

True if tracking record exists for notification from upstream in pipeline.

record_items(sequenced_items: Iterable[NamedTuple]) → None[source]

Writes sequenced items into the datastore.

write_records(records: Iterable[Any], tracking_kwargs: Optional[Dict[str, Union[str, int]]] = None, orm_objs_pending_save: Optional[Sequence[Any]] = None, orm_objs_pending_delete: Optional[Sequence[Any]] = None) → None[source]

Writes tracking, event and notification records for a process event. :param orm_objs_pending_delete: :param orm_objs_pending_save:

class eventsourcing.infrastructure.popo.manager.PopoStoredEventRecord(sequenced_item: NamedTuple)[source]

Bases: object

Encapsulates sequenced item tuple (containing real event object).

Allows other attributes to be set, such as notification ID.

__init__(sequenced_item: NamedTuple)[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.infrastructure.popo.mapper.SequencedItemMapperForPopo(sequenced_item_class: Optional[Type[NamedTuple]] = None, sequence_id_attr_name: Optional[str] = None, position_attr_name: Optional[str] = None, json_encoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[eventsourcing.utils.transcoding.ObjectJSONDecoder]] = None, cipher: Optional[eventsourcing.utils.cipher.aes.AESCipher] = None, compressor: Any = None, other_attr_names: Tuple[str, ...] = ())[source]

Bases: eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper

eventstore

The event store provides the interface to the event sourcing persistence mechanism that is used by applications.

class eventsourcing.infrastructure.eventstore.EventStore(record_manager: TRecordManager, event_mapper: eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper)[source]

Bases: eventsourcing.infrastructure.base.AbstractEventStore

Event store appends domain events to stored sequences. It uses a record manager to map named tuples to database records, and it uses a sequenced item mapper to map named tuples to application-level objects.

iterator_class

alias of eventsourcing.infrastructure.iterators.SequencedItemIterator

store_events(events: Iterable[TEvent]) → None[source]

Appends given domain event, or list of domain events, to their sequence.

Parameters:events – domain event, or list of domain events
items_from_events(events: Iterable[TEvent]) → Iterable[NamedTuple][source]

Maps domain event to sequenced item namedtuple.

An iterable of events.

iter_events(originator_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True, page_size: Optional[int] = None) → Iterable[TEvent][source]

Gets domain events from the sequence identified by originator_id.

Parameters:
  • originator_id – ID of a sequence of events
  • gt – get items after this position
  • gte – get items at or after this position
  • lt – get items before this position
  • lte – get items before or at this position
  • limit – get limited number of items
  • is_ascending – get items from lowest position
  • page_size – restrict and repeat database query
Returns:

list of domain events

get_event(originator_id: uuid.UUID, position: int) → TEvent[source]

Gets a domain event from the sequence identified by originator_id at position eq.

Parameters:
  • originator_id – ID of a sequence of events
  • position – get item at this position
Returns:

domain event

get_most_recent_event(originator_id: uuid.UUID, lt: Optional[int] = None, lte: Optional[int] = None) → Optional[TEvent][source]

Gets a domain event from the sequence identified by originator_id at the highest position.

Parameters:
  • originator_id – ID of a sequence of events
  • lt – get highest before this position
  • lte – get highest at or before this position
Returns:

domain event

all_events() → Iterable[TEvent][source]

Yields all domain events in the event store.

This method iterates over the sequence IDs, and returns all the events for each sequence effectively concatenated together.

Use a notification log to propagate events from an application as a stable append-only sequence.

eventsourcedrepository

Base classes for event sourced repositories (not abstract, can be used directly).

class eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository(event_store: eventsourcing.infrastructure.base.AbstractEventStore, use_cache: bool = False, snapshot_strategy: Optional[eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy] = None, mutator_func: Optional[Callable[[Optional[TVersionedEntity], TVersionedEvent], Optional[TVersionedEntity]]] = None, **kwargs)[source]

Bases: eventsourcing.domain.model.repository.AbstractEntityRepository

__init__(event_store: eventsourcing.infrastructure.base.AbstractEventStore, use_cache: bool = False, snapshot_strategy: Optional[eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy] = None, mutator_func: Optional[Callable[[Optional[TVersionedEntity], TVersionedEvent], Optional[TVersionedEntity]]] = None, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

event_store

Returns event store object used by this repository.

__contains__(entity_id: uuid.UUID) → bool[source]

Returns a boolean value according to whether entity with given ID exists.

__getitem__(entity_id: uuid.UUID) → TVersionedEntity[source]

Returns entity with given ID.

Parameters:entity_id – ID of entity in the repository.
Raises:RepositoryKeyError – If the entity is not found.
get_entity(entity_id: uuid.UUID, at: Optional[int] = None) → Optional[TVersionedEntity][source]

Returns entity with given ID, optionally at a version.

Returns None if entity not found.

get_and_project_events(entity_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, initial_state: Optional[TVersionedEntity] = None, query_descending: bool = False) → Optional[TVersionedEntity][source]

Reconstitutes requested domain entity from domain events found in event store.

project_events(initial_state: Optional[TVersionedEntity], domain_events: Iterable[TVersionedEvent]) → Optional[TVersionedEntity][source]

Evolves initial_state using the domain_events and a mutator function.

Applies a mutator function cumulatively to a sequence of domain events, so as to mutate the initial value to a mutated value.

This class’s mutate() method is used as the default mutator function, but custom behaviour can be introduced by passing in a ‘mutator_func’ argument when constructing this class, or by overridding the mutate() method.

static mutate(initial: Optional[TVersionedEntity], event: TVersionedEvent) → Optional[TVersionedEntity][source]

Default mutator function, which uses __mutate__() method on event object to mutate initial state.

Parameters:
  • initial – Initial state to be mutated by this function.
  • event – Event that causes the initial state to be mutated.
Returns:

Returns the mutated state.

take_snapshot(entity_id: uuid.UUID, lt: Optional[int] = None, lte: Optional[int] = None) → Optional[eventsourcing.domain.model.events.AbstractSnapshot][source]

Takes a snapshot of the entity as it existed after the most recent event, optionally less than, or less than or equal to, a particular position.

iterators

Different ways of getting sequenced items from a datastore.

class eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, page_size: Optional[int] = None, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True)[source]

Bases: collections.abc.Iterable, typing.Generic

__init__(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, page_size: Optional[int] = None, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True)[source]

Initialises sequenced item iterator.

Parameters:
  • record_manager – The record manager used to get sequenced items.
  • sequence_id – The id of the sequence being iterated over.
  • page_size – The number of items requested from the record manager.
  • gt – Exclusive lower bound on position of items returned.
  • gte – Inclusive lower bound on position of items returned.
  • lt – Exclusive upper bound on position of items returned.
  • lte – Inclusive upper bound on position of items returned.
  • limit – Limit to the number of items returned.
  • is_ascending – Whether or not to iterate in ascending order.
_inc_page_counter() → None[source]

Increments the page counter.

Each query result as a page, even if there are no items in the page. This really counts queries.

  • it is easy to divide the number of events by the page size if the

“correct” answer is required - there will be a difference in the counts when the number of events can be exactly divided by the page

size, because there is no way to know in advance that a full page is also the last page.
_inc_query_counter() → None[source]

Increments the query counter.

__iter__() → Iterator[NamedTuple][source]

Yields a continuous sequence of items.

class eventsourcing.infrastructure.iterators.SequencedItemIterator(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, page_size: Optional[int] = None, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True)[source]

Bases: eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator

__iter__() → Iterator[NamedTuple][source]

Yields a continuous sequence of items from “pages” of sequenced items retrieved using the record manager.

class eventsourcing.infrastructure.iterators.ThreadedSequencedItemIterator(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, page_size: Optional[int] = None, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True)[source]

Bases: eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator

__iter__() → Iterator[NamedTuple][source]

Yields a continuous sequence of items.

class eventsourcing.infrastructure.iterators.GetEntityEventsThread(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, page_size: Optional[int] = None, is_ascending: bool = True, *args, **kwargs)[source]

Bases: threading.Thread

__init__(record_manager: eventsourcing.infrastructure.base.AbstractRecordManager, sequence_id: uuid.UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, page_size: Optional[int] = None, is_ascending: bool = True, *args, **kwargs)[source]

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

run() → None[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

factory

Infrastructure factory.

class eventsourcing.infrastructure.factory.InfrastructureFactory(record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.base.AbstractEventStore]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper]] = None, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, integer_sequenced_record_class: Optional[type] = None, timestamp_sequenced_record_class: Optional[type] = None, snapshot_record_class: Optional[type] = None, contiguous_record_ids: bool = False, application_name: Optional[str] = None, pipeline_id: int = 0)[source]

Bases: typing.Generic

Base class for infrastructure factories.

__init__(record_manager_class: Optional[Type[eventsourcing.infrastructure.base.AbstractRecordManager]] = None, sequenced_item_class: Optional[Type[NamedTuple]] = None, event_store_class: Optional[Type[eventsourcing.infrastructure.base.AbstractEventStore]] = None, sequenced_item_mapper_class: Optional[Type[eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper]] = None, json_encoder_class: Optional[Type[json.encoder.JSONEncoder]] = None, sort_keys: bool = False, json_decoder_class: Optional[Type[json.decoder.JSONDecoder]] = None, integer_sequenced_record_class: Optional[type] = None, timestamp_sequenced_record_class: Optional[type] = None, snapshot_record_class: Optional[type] = None, contiguous_record_ids: bool = False, application_name: Optional[str] = None, pipeline_id: int = 0)[source]

Initialize self. See help(type(self)) for accurate signature.

sequenced_item_class

alias of eventsourcing.infrastructure.sequenceditem.SequencedItem

sequenced_item_mapper_class

alias of eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper

construct_integer_sequenced_record_manager(**kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]

Constructs an integer sequenced record manager.

construct_timestamp_sequenced_record_manager(**kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]

Constructs a timestamp sequenced record manager.

construct_snapshot_record_manager(**kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]

Constructs a snapshot record manager.

construct_record_manager(record_class: Optional[type], sequenced_item_class: Optional[Type[NamedTuple]] = None, **kwargs) → eventsourcing.infrastructure.base.AbstractRecordManager[source]

Constructs an record manager.

construct_sequenced_item_mapper(cipher: Optional[eventsourcing.utils.cipher.aes.AESCipher], compressor: Any) → eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper[source]

Constructs sequenced item mapper object.

Returns:Sequenced item mapper object.
Return type:eventsourcing.infrastructure.sequenceditemmapper

.AbstractSequencedItemMapper

construct_integer_sequenced_event_store(cipher: Optional[eventsourcing.utils.cipher.aes.AESCipher], compressor: Any) → eventsourcing.infrastructure.base.AbstractEventStore[source]

Constructs an integer sequenced event store.

construct_datastore() → Optional[eventsourcing.infrastructure.datastore.AbstractDatastore][source]

Constructs datastore object.

Returns:Concrete datastore object object.

snapshotting

Snapshotting avoids having to replay an entire sequence of events to obtain the current state of a projection.

class eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy[source]

Bases: abc.ABC

get_snapshot(entity_id: uuid.UUID, lt: Optional[int] = None, lte: Optional[int] = None) → Optional[eventsourcing.domain.model.events.AbstractSnapshot][source]

Gets the last snapshot for entity, optionally until a particular version number.

Return type:Snapshot
take_snapshot(entity_id: uuid.UUID, entity: object, last_event_version: int) → eventsourcing.domain.model.snapshot.Snapshot[source]

Takes a snapshot of entity, using given ID, state and version number.

class eventsourcing.infrastructure.snapshotting.EventSourcedSnapshotStrategy(snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager][eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager])[source]

Bases: eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy

Snapshot strategy that uses an event sourced snapshot.

__init__(snapshot_store: eventsourcing.infrastructure.base.AbstractEventStore[eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager][eventsourcing.domain.model.events.AbstractSnapshot, eventsourcing.infrastructure.base.AbstractRecordManager])[source]

Initialize self. See help(type(self)) for accurate signature.

get_snapshot(entity_id: uuid.UUID, lt: Optional[int] = None, lte: Optional[int] = None) → Optional[eventsourcing.domain.model.events.AbstractSnapshot][source]

Gets the last snapshot for entity, optionally until a particular version number.

Return type:Snapshot
take_snapshot(entity_id: uuid.UUID, entity: object, last_event_version: int) → eventsourcing.domain.model.snapshot.Snapshot[source]

Creates a Snapshot from the given state, and appends it to the snapshot store.

Return type:Snapshot

timebucketedlog_reader

Reader for timebucketed logs.

repositories

Repository base classes for entity classes defined in the library.

class eventsourcing.infrastructure.repositories.array.ArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.array.AbstractArrayRepository, eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

class eventsourcing.infrastructure.repositories.array.BigArrayRepository(array_size: int = 10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.array.AbstractBigArrayRepository, eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

subrepo_class

alias of ArrayRepository

__init__(array_size: int = 10000, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

subrepo

Sub-sequence repository.

class eventsourcing.infrastructure.repositories.collection_repo.CollectionRepository(event_store: eventsourcing.infrastructure.base.AbstractEventStore, use_cache: bool = False, snapshot_strategy: Optional[eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy] = None, mutator_func: Optional[Callable[[Optional[TVersionedEntity], TVersionedEvent], Optional[TVersionedEntity]]] = None, **kwargs)[source]

Bases: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository, eventsourcing.domain.model.collection.AbstractCollectionRepository

Event sourced repository for the Collection domain model entity.

class eventsourcing.infrastructure.repositories.timebucketedlog_repo.TimebucketedlogRepo(event_store: eventsourcing.infrastructure.base.AbstractEventStore, use_cache: bool = False, snapshot_strategy: Optional[eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy] = None, mutator_func: Optional[Callable[[Optional[TVersionedEntity], TVersionedEvent], Optional[TVersionedEntity]]] = None, **kwargs)[source]

Bases: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository, eventsourcing.domain.model.timebucketedlog.TimebucketedlogRepository

Event sourced repository for the Example domain model entity.

integersequencegenerators

Different ways of generating sequences of integers.

class eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator[source]

Bases: object

Abstract base class for generating a sequence of integers.

__next__() → int[source]

Returns the next item in the container.

class eventsourcing.infrastructure.integersequencegenerators.base.SimpleIntegerSequenceGenerator(i: int = 0)[source]

Bases: eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator

Generates a sequence of integers, by simply incrementing a Python int.

__init__(i: int = 0)[source]

Initialize self. See help(type(self)) for accurate signature.

__next__() → int[source]

Returns the next item in the container.

class eventsourcing.infrastructure.integersequencegenerators.redisincr.RedisIncr(redis: Optional[redis.client.Redis] = None, key: Optional[str] = None)[source]

Bases: eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator

Generates a sequence of integers, using Redis’ INCR command.

Maximum number is 2**63, or 9223372036854775807, the maximum value of a 64 bit signed integer.

__init__(redis: Optional[redis.client.Redis] = None, key: Optional[str] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

__next__() → int[source]

Returns the next item in the container.