infrastructure

The infrastructure layer adapts external devices in ways that are useful for the application, such as the way an event store encapsulates a database.

base

Abstract base classes for the infrastructure layer.

class eventsourcing.infrastructure.base.AbstractSequencedItemRecordManager(record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]

Bases: abc.ABC

__init__(record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]

Initialize self. See help(type(self)) for accurate signature.

record_sequenced_items(sequenced_item_or_items)[source]

Writes sequenced item(s) into the datastore.

record_sequenced_item(sequenced_item)[source]

Writes sequenced item into the datastore.

get_item(sequence_id, position)[source]

Gets sequenced item from the datastore.

get_record(sequence_id, position)[source]

Gets record at position in sequence.

get_items(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns sequenced item generator.

list_items(*args, **kwargs)[source]

Returns list of sequenced items.

get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

to_record(sequenced_item)[source]

Constructs a record object from given sequenced item object.

from_record(record)[source]

Constructs and returns a sequenced item object, from given ORM object.

get_notifications(start=None, stop=None, *args, **kwargs)[source]

Returns records sequenced by notification ID, from application, for pipeline, in given range.

Args ‘start’ and ‘stop’ are positions in a zero-based integer sequence.

all_sequence_ids()[source]

Returns all sequence IDs.

delete_record(record)[source]

Removes permanently given record from the table.

class eventsourcing.infrastructure.base.ACIDRecordManager(tracking_record_class=None, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.AbstractSequencedItemRecordManager

ACID record managers can write tracking records and event records in an atomic transaction, needed for atomic processing in process applications.

__init__(tracking_record_class=None, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

write_records(records, tracking_kwargs=None, orm_objs=None)[source]

Writes tracking, event and notification records for a process event. :param orm_objs:

get_max_record_id()[source]

Return maximum notification ID in pipeline.

get_max_tracking_record_id(upstream_application_name)[source]

Return maximum tracking record ID for notification from upstream application in pipeline.

has_tracking_record(upstream_application_name, pipeline_id, notification_id)[source]

True if tracking record exists for notification from upstream in pipeline.

get_pipeline_and_notification_id(sequence_id, position)[source]

Returns pipeline ID and notification ID for event at given position in given sequence.

class eventsourcing.infrastructure.base.SQLRecordManager(*args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.ACIDRecordManager

This is has code common to (extracted from) the SQLAlchemy and Django record managers.

This makes the subclasses harder to read and probably more brittle. So it might be better to inline this with the subclasses, so that each looks more like normal Django or SQLAlchemy code. Also, the record manager test cases don’t cover the notification log and tracking record functionality needed by ProcessApplication, and should so that other record managers can more easily be developed.

__init__(*args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

record_sequenced_items(sequenced_item_or_items)[source]

Writes sequenced item(s) into the datastore.

insert_select_max

SQL statement that inserts records with contiguous IDs, by selecting max ID from indexed table records.

_prepare_insert(tmpl, record_class, field_names, placeholder_for_id=False)[source]

Compile SQL statement with placeholders for bind parameters.

insert_values

SQL statement that inserts records without ID.

insert_tracking_record

SQL statement that inserts tracking records.

get_record_table_name(record_class)[source]

Returns table name - used in raw queries.

Return type:str

eventstore

The event store provides the application-level interface to the event sourcing persistence mechanism.

class eventsourcing.infrastructure.eventstore.AbstractEventStore[source]

Bases: abc.ABC

Abstract base class for event stores. Defines the methods expected of an event store by other classes in the library.

store(domain_event_or_events)[source]

Put domain event in event store for later retrieval.

get_domain_events(originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True, page_size=None)[source]

Returns domain events for given entity ID.

get_domain_event(originator_id, position)[source]

Returns a single domain event.

get_most_recent_event(originator_id, lt=None, lte=None)[source]

Returns most recent domain event for given entity ID.

all_domain_events()[source]

Returns all domain events in the event store.

class eventsourcing.infrastructure.eventstore.EventStore(record_manager, sequenced_item_mapper)[source]

Bases: eventsourcing.infrastructure.eventstore.AbstractEventStore

Event store appends domain events to stored sequences. It uses a record manager to map named tuples to database records, and it uses a sequenced item mapper to map named tuples to application-level objects.

iterator_class

alias of eventsourcing.infrastructure.iterators.SequencedItemIterator

__init__(record_manager, sequenced_item_mapper)[source]

Initialises event store object.

Parameters:
  • record_manager – record manager
  • sequenced_item_mapper – sequenced item mapper
store(domain_event_or_events)[source]

Appends given domain event, or list of domain events, to their sequence.

Parameters:domain_event_or_events – domain event, or list of domain events
item_from_event(domain_event_or_events)[source]

Maps domain event to sequenced item namedtuple.

Parameters:domain_event_or_events – application-level object (or list)
Returns:namedtuple: sequence item namedtuple (or list)
get_domain_events(originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True, page_size=None)[source]

Gets domain events from the sequence identified by originator_id.

Parameters:
  • originator_id – ID of a sequence of events
  • gt – get items after this position
  • gte – get items at or after this position
  • lt – get items before this position
  • lte – get items before or at this position
  • limit – get limited number of items
  • is_ascending – get items from lowest position
  • page_size – restrict and repeat database query
Returns:

list of domain events

get_domain_event(originator_id, position)[source]

Gets a domain event from the sequence identified by originator_id at position eq.

Parameters:
  • originator_id – ID of a sequence of events
  • position – get item at this position
Returns:

domain event

get_most_recent_event(originator_id, lt=None, lte=None)[source]

Gets a domain event from the sequence identified by originator_id at the highest position.

Parameters:
  • originator_id – ID of a sequence of events
  • lt – get highest before this position
  • lte – get highest at or before this position
Returns:

domain event

all_domain_events()[source]

Yields all domain events in the event store.

cassandra

Classes for event sourcing with Apache Cassandra.

class eventsourcing.infrastructure.cassandra.datastore.CassandraSettings(hosts=None, port=None, protocol_version=None, default_keyspace=None, consistency=None, replication_factor=None, username=None, password=None)[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreSettings

__init__(hosts=None, port=None, protocol_version=None, default_keyspace=None, consistency=None, replication_factor=None, username=None, password=None)[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.infrastructure.cassandra.datastore.CassandraDatastore(tables, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.datastore.AbstractDatastore

__init__(tables, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

setup_connection()[source]

Sets up a connection to a datastore.

close_connection()[source]

Drops connection to a datastore.

setup_tables()[source]

Sets up tables used to store events.

drop_tables()[source]

Drops tables used to store events.

truncate_tables()[source]

Truncates tables used to store events.

class eventsourcing.infrastructure.cassandra.factory.CassandraInfrastructureFactory(record_manager_class=None, sequenced_item_class=None, event_store_class=None, sequenced_item_mapper_class=None, json_encoder_class=None, json_decoder_class=None, integer_sequenced_record_class=None, timestamp_sequenced_record_class=None, snapshot_record_class=None, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

Infrastructure factory for Cassandra.

record_manager_class

alias of eventsourcing.infrastructure.cassandra.manager.CassandraRecordManager

integer_sequenced_record_class

alias of eventsourcing.infrastructure.cassandra.records.IntegerSequencedRecord

timestamp_sequenced_record_class

alias of eventsourcing.infrastructure.cassandra.records.TimestampSequencedRecord

snapshot_record_class

alias of eventsourcing.infrastructure.cassandra.records.SnapshotRecord

class eventsourcing.infrastructure.cassandra.manager.CassandraRecordManager(record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]

Bases: eventsourcing.infrastructure.base.AbstractSequencedItemRecordManager

record_sequenced_items(sequenced_item_or_items)[source]

Writes sequenced item(s) into the datastore.

get_record(sequence_id, position)[source]

Gets record at position in sequence.

get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

get_notifications(start=None, stop=None, *args, **kwargs)[source]

Not implemented.

all_sequence_ids()[source]

Returns all sequence IDs.

delete_record(record)[source]

Removes permanently given record from the table.

class eventsourcing.infrastructure.cassandra.records.IntegerSequencedRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores integer-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

class eventsourcing.infrastructure.cassandra.records.TimestampSequencedRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores timestamp-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

class eventsourcing.infrastructure.cassandra.records.TimeuuidSequencedRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores timeuuid-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

class eventsourcing.infrastructure.cassandra.records.SnapshotRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores snapshots in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

class eventsourcing.infrastructure.cassandra.records.StoredEventRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores integer-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

datastore

Base classes for concrete datastore classes.

class eventsourcing.infrastructure.datastore.DatastoreSettings[source]

Bases: object

Base class for settings for database connection used by a stored event repository.

class eventsourcing.infrastructure.datastore.AbstractDatastore(settings)[source]

Bases: abc.ABC

__init__(settings)[source]

Initialize self. See help(type(self)) for accurate signature.

setup_connection()[source]

Sets up a connection to a datastore.

close_connection()[source]

Drops connection to a datastore.

setup_tables()[source]

Sets up tables used to store events.

drop_tables()[source]

Drops tables used to store events.

truncate_tables()[source]

Truncates tables used to store events.

exception eventsourcing.infrastructure.datastore.DatastoreError[source]

Bases: Exception

exception eventsourcing.infrastructure.datastore.DatastoreConnectionError[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreError

exception eventsourcing.infrastructure.datastore.DatastoreTableError[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreError

django

A Django application for event sourcing with the Django ORM.

class eventsourcing.infrastructure.django.factory.DjangoInfrastructureFactory(tracking_record_class=None, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

Infrastructure factory for Django.

record_manager_class

alias of eventsourcing.infrastructure.django.manager.DjangoRecordManager

__init__(tracking_record_class=None, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

construct_integer_sequenced_record_manager(**kwargs)[source]

Constructs Django record manager.

Returns:A Django record manager.
Return type:DjangoRecordManager
class eventsourcing.infrastructure.django.manager.DjangoRecordManager(*args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.SQLRecordManager

write_records(records, tracking_kwargs=None, orm_objs=None)[source]

Writes tracking, event and notification records for a process event. :param orm_objs:

_prepare_insert(tmpl, record_class, field_names, placeholder_for_id=False)[source]

With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, using an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.

get_record_table_name(record_class)[source]

Returns table name from SQLAlchemy record class.

get_record(sequence_id, position)[source]

Gets record at position in sequence.

get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

get_notifications(start=None, stop=None, *args, **kwargs)[source]

Returns all records in the table.

delete_record(record)[source]

Permanently removes record from table.

get_max_record_id()[source]

Return maximum notification ID in pipeline.

get_max_tracking_record_id(upstream_application_name)[source]

Return maximum tracking record ID for notification from upstream application in pipeline.

has_tracking_record(upstream_application_name, pipeline_id, notification_id)[source]

True if tracking record exists for notification from upstream in pipeline.

all_sequence_ids()[source]

Returns all sequence IDs.

class eventsourcing.infrastructure.django.models.IntegerSequencedRecord(id, sequence_id, position, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

class eventsourcing.infrastructure.django.models.TimestampSequencedRecord(id, sequence_id, position, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

class eventsourcing.infrastructure.django.models.SnapshotRecord(uid, sequence_id, position, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

class eventsourcing.infrastructure.django.models.EntitySnapshotRecord(uid, application_name, originator_id, originator_version, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

class eventsourcing.infrastructure.django.models.StoredEventRecord(uid, application_name, originator_id, originator_version, pipeline_id, notification_id, topic, state, causal_dependencies)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

class eventsourcing.infrastructure.django.models.NotificationTrackingRecord(uid, application_name, upstream_application_name, pipeline_id, notification_id)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

eventplayer

Base classes for event players of different kinds.

eventsourcedrepository

Base classes for event sourced repositories (not abstract, can be used directly).

integersequencegenerators

Different ways of generating sequences of integers.

class eventsourcing.infrastructure.integersequencegenerators.redisincr.RedisIncr(redis=None, key=None)[source]

Bases: eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator

Generates a sequence of integers, using Redis’ INCR command.

Maximum number is 2**63, or 9223372036854775807, the maximum value of a 64 bit signed integer.

__init__(redis=None, key=None)[source]

Initialize self. See help(type(self)) for accurate signature.

__next__()[source]

Returns the next item in the container.

iterators

Different ways of getting sequenced items from a datastore.

class eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: abc.ABC

__init__(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Initialize self. See help(type(self)) for accurate signature.

_inc_page_counter()[source]

Increments the page counter.

Each query result as a page, even if there are no items in the page. This really counts queries.
  • it is easy to divide the number of events by the page size if the “correct” answer is required
  • there will be a difference in the counts when the number of events can be exactly divided by the page size, because there is no way to know in advance that a full page is also the last page.
_inc_query_counter()[source]

Increments the query counter.

__iter__()[source]

Yields a continuous sequence of items.

class eventsourcing.infrastructure.iterators.SequencedItemIterator(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator

__iter__()[source]

Yields a continuous sequence of items from “pages” of sequenced items retrieved using the record manager.

class eventsourcing.infrastructure.iterators.ThreadedSequencedItemIterator(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator

__iter__()[source]

Yields a continuous sequence of items.

class eventsourcing.infrastructure.iterators.GetEntityEventsThread(record_manager, sequence_id, gt=None, gte=None, lt=None, lte=None, page_size=None, is_ascending=True, *args, **kwargs)[source]

Bases: threading.Thread

__init__(record_manager, sequence_id, gt=None, gte=None, lt=None, lte=None, page_size=None, is_ascending=True, *args, **kwargs)[source]

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

repositories

Repository base classes for entity classes defined in the library.

class eventsourcing.infrastructure.repositories.array.ArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.array.AbstractArrayRepository, eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

class eventsourcing.infrastructure.repositories.array.BigArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.array.AbstractBigArrayRepository, eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

subrepo_class

alias of ArrayRepository

__init__(array_size=10000, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

subrepo

Sub-sequence repository.

class eventsourcing.infrastructure.repositories.collection_repo.CollectionRepository(event_store, use_cache=False, **kwargs)[source]

Bases: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository, eventsourcing.domain.model.collection.AbstractCollectionRepository

Event sourced repository for the Collection domain model entity.

class eventsourcing.infrastructure.repositories.timebucketedlog_repo.TimebucketedlogRepo(event_store, use_cache=False, **kwargs)[source]

Bases: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository, eventsourcing.domain.model.timebucketedlog.TimebucketedlogRepository

Event sourced repository for the Example domain model entity.

sequenceditem

The persistence model for storing events.

class eventsourcing.infrastructure.sequenceditem.SequencedItem(sequence_id, position, topic, state)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, sequence_id, position, topic, state)

Create new instance of SequencedItem(sequence_id, position, topic, state)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values.

classmethod _make(iterable)

Make a new SequencedItem object from a sequence or iterable

_replace(**kwds)

Return a new SequencedItem object replacing specified fields with new values

position

Alias for field number 1

sequence_id

Alias for field number 0

state

Alias for field number 3

topic

Alias for field number 2

class eventsourcing.infrastructure.sequenceditem.StoredEvent(originator_id, originator_version, topic, state)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, originator_id, originator_version, topic, state)

Create new instance of StoredEvent(originator_id, originator_version, topic, state)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values.

classmethod _make(iterable)

Make a new StoredEvent object from a sequence or iterable

_replace(**kwds)

Return a new StoredEvent object replacing specified fields with new values

originator_id

Alias for field number 0

originator_version

Alias for field number 1

state

Alias for field number 3

topic

Alias for field number 2

sequenceditemmapper

The sequenced item mapper maps sequenced items to application-level objects.

class eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper[source]

Bases: abc.ABC

item_from_event(domain_event)[source]

Constructs and returns a sequenced item for given domain event.

event_from_item(sequenced_item)[source]

Constructs and returns a domain event for given sequenced item.

class eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper(sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=None, json_decoder_class=None, cipher=None, other_attr_names=())[source]

Bases: eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper

Uses JSON to transcode domain events.

__init__(sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=None, json_decoder_class=None, cipher=None, other_attr_names=())[source]

Initialize self. See help(type(self)) for accurate signature.

item_from_event(domain_event)[source]

Constructs a sequenced item from a domain event.

construct_item_args(domain_event)[source]

Constructs attributes of a sequenced item from the given domain event.

event_from_item(sequenced_item)[source]

Reconstructs domain event from stored event topic and event attrs. Used in the event store when getting domain events.

snapshotting

Snapshotting avoids having to replay an entire sequence of events to obtain the current state of a projection.

class eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy[source]

Bases: abc.ABC

get_snapshot(entity_id, lt=None, lte=None)[source]

Gets the last snapshot for entity, optionally until a particular version number.

Return type:Snapshot
take_snapshot(entity_id, entity, last_event_version)[source]

Takes a snapshot of entity, using given ID, state and version number.

Return type:AbstractSnapshop
class eventsourcing.infrastructure.snapshotting.EventSourcedSnapshotStrategy(snapshot_store)[source]

Bases: eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy

Snapshot strategy that uses an event sourced snapshot.

__init__(snapshot_store)[source]

Initialize self. See help(type(self)) for accurate signature.

get_snapshot(entity_id, lt=None, lte=None)[source]

Gets the last snapshot for entity, optionally until a particular version number.

Return type:Snapshot
take_snapshot(entity_id, entity, last_event_version)[source]

Creates a Snapshot from the given state, and appends it to the snapshot store.

Return type:Snapshot
eventsourcing.infrastructure.snapshotting.entity_from_snapshot(snapshot)[source]

Reconstructs domain entity from given snapshot.

sqlalchemy

Classes for event sourcing with SQLAlchemy.

class eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemySettings(uri=None, pool_size=None)[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreSettings

__init__(uri=None, pool_size=None)[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemyDatastore(base=<class 'sqlalchemy.ext.declarative.api.Base'>, tables=None, connection_strategy='plain', session=None, **kwargs)[source]

Bases: eventsourcing.infrastructure.datastore.AbstractDatastore

__init__(base=<class 'sqlalchemy.ext.declarative.api.Base'>, tables=None, connection_strategy='plain', session=None, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

setup_connection()[source]

Sets up a connection to a datastore.

setup_tables(tables=None)[source]

Sets up tables used to store events.

drop_tables()[source]

Drops tables used to store events.

truncate_tables()[source]

Truncates tables used to store events.

close_connection()[source]

Drops connection to a datastore.

class eventsourcing.infrastructure.sqlalchemy.factory.SQLAlchemyInfrastructureFactory(session, uri=None, pool_size=None, tracking_record_class=None, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

Infrastructure factory for SQLAlchemy infrastructure.

record_manager_class

alias of eventsourcing.infrastructure.sqlalchemy.manager.SQLAlchemyRecordManager

integer_sequenced_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord

timestamp_sequenced_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord

snapshot_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.SnapshotRecord

tracking_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.NotificationTrackingRecord

__init__(session, uri=None, pool_size=None, tracking_record_class=None, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

construct_integer_sequenced_record_manager(**kwargs)[source]

Constructs SQLAlchemy record manager.

Returns:An SQLAlchemy record manager.
Return type:SQLAlchemyRecordManager
construct_record_manager(record_class, **kwargs)[source]

Constructs SQLAlchemy record manager.

Returns:An SQLAlchemy record manager.
Return type:SQLAlchemyRecordManager
construct_datastore()[source]

Constructs SQLAlchemy datastore.

Return type:SQLAlchemyDatastore
class eventsourcing.infrastructure.sqlalchemy.manager.SQLAlchemyRecordManager(session, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.SQLRecordManager

__init__(session, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

_prepare_insert(tmpl, record_class, field_names, placeholder_for_id=False)[source]

With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, assumes an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.

write_records(records, tracking_kwargs=None, orm_objs=None)[source]

Writes tracking, event and notification records for a process event. :param orm_objs:

get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

get_notifications(start=None, stop=None, *args, **kwargs)[source]

Returns records sequenced by notification ID, from application, for pipeline, in given range.

Args ‘start’ and ‘stop’ are positions in a zero-based integer sequence.

get_record(sequence_id, position)[source]

Gets record at position in sequence.

get_max_record_id()[source]

Return maximum notification ID in pipeline.

get_max_tracking_record_id(upstream_application_name)[source]

Return maximum tracking record ID for notification from upstream application in pipeline.

has_tracking_record(upstream_application_name, pipeline_id, notification_id)[source]

True if tracking record exists for notification from upstream in pipeline.

all_sequence_ids()[source]

Returns all sequence IDs.

delete_record(record)[source]

Permanently removes record from table.

get_record_table_name(record_class)[source]

Returns table name - used in raw queries.

Return type:str
class eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedNoIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedRecord

alias of eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord

class eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedWithIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedRecord

alias of eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord

class eventsourcing.infrastructure.sqlalchemy.records.SnapshotRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class eventsourcing.infrastructure.sqlalchemy.records.EntitySnapshotRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class eventsourcing.infrastructure.sqlalchemy.records.StoredEventRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

class eventsourcing.infrastructure.sqlalchemy.records.NotificationTrackingRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

timebucketedlog_reader

Reader for timebucketed logs.

eventsourcing.infrastructure.timebucketedlog_reader.get_timebucketedlog_reader(log, event_store)[source]
Return type:TimebucketedlogReader

factory

Infrastructure factory.

class eventsourcing.infrastructure.factory.InfrastructureFactory(record_manager_class=None, sequenced_item_class=None, event_store_class=None, sequenced_item_mapper_class=None, json_encoder_class=None, json_decoder_class=None, integer_sequenced_record_class=None, timestamp_sequenced_record_class=None, snapshot_record_class=None, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]

Bases: object

Base class for infrastructure factories.

__init__(record_manager_class=None, sequenced_item_class=None, event_store_class=None, sequenced_item_mapper_class=None, json_encoder_class=None, json_decoder_class=None, integer_sequenced_record_class=None, timestamp_sequenced_record_class=None, snapshot_record_class=None, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]

Initialize self. See help(type(self)) for accurate signature.

event_store_class

alias of eventsourcing.infrastructure.eventstore.EventStore

sequenced_item_class

alias of eventsourcing.infrastructure.sequenceditem.SequencedItem

sequenced_item_mapper_class

alias of eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper

construct_integer_sequenced_record_manager(**kwargs)[source]

Constructs an integer sequenced record manager.

construct_timestamp_sequenced_record_manager(**kwargs)[source]

Constructs a timestamp sequenced record manager.

construct_snapshot_record_manager(**kwargs)[source]

Constructs a snapshot record manager.

construct_record_manager(record_class, sequenced_item_class=None, **kwargs)[source]

Constructs an record manager.

construct_sequenced_item_mapper(cipher)[source]

Constructs sequenced item mapper object.

Returns:Sequenced item mapper object.
Return type:AbstractSequencedItemMapper
construct_integer_sequenced_event_store(cipher)[source]

Constructs an integer sequenced event store.

construct_datastore()[source]

Constructs datastore object.

Returns:Concrete datastore object object.
Return type:AbstractDatastore