Module docs

This document describes the packages, modules, classes, functions and other code details of the library.

eventsourcing

The eventsourcing package contains packages for the application layer, the domain layer, the infrastructure layer, and the interface layer. There is also a module for exceptions, an example package, and a utils package.

application

The application layer brings together the domain and infrastructure layers.

base

policies

class eventsourcing.application.policies.PersistencePolicy(event_store, event_type=None)[source]

Bases: object

Stores events of given type to given event store, whenever they are published.

close()[source]
is_event(event)[source]
store_event(event)[source]
class eventsourcing.application.policies.SnapshottingPolicy(repository, snapshot_store, persist_event_type=<class 'eventsourcing.domain.model.events.EventWithOriginatorVersion'>, period=2)[source]

Bases: object

close()[source]
condition(event)[source]
take_snapshot(event)[source]

simple

class eventsourcing.application.simple.SimpleApplication(name='', persistence_policy=None, persist_event_type=None, cipher_key=None, sequenced_item_class=None, sequenced_item_mapper_class=None, infrastructure_factory_class=None, record_manager_class=None, stored_event_record_class=None, snapshot_record_class=None, setup_table=True, contiguous_record_ids=True, pipeline_id=-1, json_encoder_class=None, json_decoder_class=None, notification_log_section_size=None)[source]

Bases: object

change_pipeline(pipeline_id)[source]
close()[source]
construct_infrastructure_factory(*args, **kwargs)[source]
Return type:InfrastructureFactory
drop_table()[source]
infrastructure_factory_class = None
is_constructed_with_session = False
json_decoder_class = None
json_encoder_class = None
persist_event_type = None
record_manager_class = None
sequenced_item_class = None
sequenced_item_mapper_class = None
setup_cipher(cipher_key)[source]
setup_event_store()[source]
setup_infrastructure(*args, **kwargs)[source]
setup_notification_log()[source]
setup_persistence_policy()[source]
setup_repository(**kwargs)[source]
setup_table()[source]
snapshot_record_class = None
stored_event_record_class = None

domain.model

The domain layer contains a domain model, and optionally services that work across different entities or aggregates.

The domain model package contains classes and functions that can help develop an event sourced domain model.

aggregate

Base classes for aggregates in a domain driven design.

class eventsourcing.domain.model.aggregate.AggregateRoot(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.AggregateRootWithHashchainedEvents

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.AttributeChanged

Published when an AggregateRoot is changed.

class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.Created

Published when an AggregateRoot is created.

class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.Discarded

Published when an AggregateRoot is discarded.

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event

Supertype for aggregate events.

class eventsourcing.domain.model.aggregate.AggregateRootWithHashchainedEvents(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.EntityWithHashchain, eventsourcing.domain.model.aggregate.BaseAggregateRoot

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.AttributeChanged

Published when an AggregateRoot is changed.

class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.aggregate.Created

Published when an AggregateRoot is created.

class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.Discarded

Published when an AggregateRoot is discarded.

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.aggregate.Event

Supertype for aggregate events.

class eventsourcing.domain.model.aggregate.BaseAggregateRoot(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

Root entity for an aggregate in a domain driven design.

class AttributeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when an AggregateRoot is changed.

class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Created

Published when an AggregateRoot is created.

class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Discarded

Published when an AggregateRoot is discarded.

class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Supertype for aggregate events.

__publish__(event)[source]

Appends event to internal collection of pending events.

__save__()[source]

Publishes pending events for others in application.

array

A kind of collection, indexed by integer. Doesn’t need to replay all events to exist.

class eventsourcing.domain.model.array.AbstractArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

Repository for sequence objects.

__getitem__(array_id)[source]

Returns sequence for given ID.

class eventsourcing.domain.model.array.AbstractBigArrayRepository[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

Repository for compound sequence objects.

__getitem__(array_id)[source]

Returns sequence for given ID.

subrepo

Sub-sequence repository.

class eventsourcing.domain.model.array.Array(array_id, repo)[source]

Bases: object

__getitem__(item)[source]

Returns item at index, or items in slice.

__len__()[source]

Returns length of array.

__setitem__(index, item)[source]

Sets item in array, at given index.

Won’t overrun the end of the array, because the position is fixed to be less than base_size.

append(item)[source]

Sets item in next position after the last item.

get_item_assigned(index)[source]
get_items_assigned(start_index=None, stop_index=None, limit=None, is_ascending=True)[source]
get_last_item_and_next_position()[source]
get_next_position()[source]
class eventsourcing.domain.model.array.BigArray(array_id, repo)[source]

Bases: eventsourcing.domain.model.array.Array

A virtual array holding items in indexed positions, across a number of Array instances.

Getting and setting items at index position is supported. Slices are supported, and operate across the underlying arrays. Appending is also supported.

BigArray is designed to overcome the concern of needing a single large sequence that may not be suitably stored in any single partiton. In simple terms, if events of an aggregate can fit in a partition, we can use the same size partition to make a tree of arrays that will certainly be capable of sequencing all the events of the application in a single stream.

With normal size base arrays, enterprise applications can expect read and write time to be approximately constant with respect to the number of items in the array.

The array is composed of a tree of arrays, which gives the capacity equal to the size of each array to the power of the size of each array. If the arrays are limited to be about the maximum size of an aggregate event stream (a large number but not too many that would cause there to be too much data in any one partition, let’s say 1000s to be safe) then it would be possible to fit such a large number of aggregates in the corresponding BigArray, that we can be confident it would be full.

Write access time in the worst case, and the time to identify the index of the last item in the big array, is proportional to the log of the highest assigned index to base the underlying array size. Write time on average, and read time given an index, is constant with respect to the number of items in a BigArray.

Items can be appended in log time in a single thread. However, the time between reading the current last index and claiming the next position leads to contention and retries when there are lots of threads of execution all attempting to append items, which inherently limits throughput.

Todo: Not possible in Cassandra, but maybe do it in a transaction in SQLAlchemy?

An alternative to reading the last item before writing the next is to use an integer sequence generator to generate a stream of integers. Items can be assigned to index positions in a big array, according to the integers that are issued. Throughput will then be much better, and will be limited only by the rate at which the database can have events written to it (unless the number generator is quite slow).

An external integer sequence generator, such as Redis’ INCR command, or an auto-incrementing database column, may constitute a single point of failure.

__len__()[source]

Returns length of array.

calc_parent(i, j, h)[source]

Returns get_big_array and end of span of parent sequence that contains given child.

calc_required_height(n, size)[source]
create_array_id(i, j)[source]
get_item(position)[source]
get_last_array()[source]

Returns last array in compound.

Return type:CompoundSequenceReader
get_last_item_and_next_position()[source]
get_slice(start, stop)[source]
class eventsourcing.domain.model.array.ItemAssigned(item, index, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Occurs when an item is set at a position in an array.

index
item

collection

Decorators useful in domain models based on the classes in this library.

eventsourcing.domain.model.decorators.attribute(getter)[source]

When used as a method decorator, returns a property object with the method as the getter and a setter defined to call instance method change_attribute(), which publishes an AttributeChanged event.

eventsourcing.domain.model.decorators.mutator(arg=None)[source]

Structures mutator functions by allowing handlers to be registered for different types of event. When the decorated function is called with an initial value and an event, it will call the handler that has been registered for that type of event.

It works like singledispatch, which it uses. The difference is that when the decorated function is called, this decorator dispatches according to the type of last call arg, which fits better with reduce(). The builtin Python function reduce() is used by the library to replay a sequence of events against an initial state. If a mutator function is given to reduce(), along with a list of events and an initializer, reduce() will call the mutator function once for each event in the list, but the initializer will be the first value, and the event will be the last argument, and we want to dispatch according to the type of the event. It happens that singledispatch is coded to switch on the type of the first argument, which makes it unsuitable for structuring a mutator function without the modifications introduced here.

The other aspect introduced by this decorator function is the option to set the type of the handled entity in the decorator. When an entity is replayed from scratch, in other words when all its events are replayed, the initial state is None. The handler which handles the first event in the sequence will probably construct an object instance. It is possible to write the type into the handler, but that makes the entity more difficult to subclass because you will also need to write a handler for it. If the decorator is invoked with the type, when the initial value passed as a call arg to the mutator function is None, the handler will instead receive the type of the entity, which it can use to construct the entity object.

class Entity(object):
    class Created(object):
        pass

@mutator(Entity)
def mutate(initial, event):
    raise NotImplementedError(type(event))

@mutate.register(Entity.Created)
def _(initial, event):
    return initial(**event.__dict__)

entity = mutate(None, Entity.Created())
eventsourcing.domain.model.decorators.random() → x in the interval [0, 1).
eventsourcing.domain.model.decorators.retry(exc=<class 'Exception'>, max_attempts=1, wait=0)[source]
eventsourcing.domain.model.decorators.subscribe_to(*event_classes)[source]

Decorator for making a custom event handler function subscribe to a certain class of event.

The decorated function will be called once for each matching event that is published, and will be given one argument, the event, when it is called. If events are published in lists, for example the AggregateRoot publishes a list of pending events when its __save__() method is called, then the decorated function will be called once for each event that is an instance of the given event_class.

Please note, this decorator isn’t suitable for use with object class methods. The decorator receives in Python 3 an unbound function, and defines a handler which it subscribes that calls the decorated function for each matching event. However the method isn’t called on the object, so the object instance is never available in the decorator, so the decorator can’t call a normal object method because it doesn’t have a value for ‘self’.

event_class: type used to match published events, an event matches if it is an instance of this type

The following example shows a custom handler that reacts to Todo.Created event and saves a projection of a Todo model object.

@subscribe_to(Todo.Created)
def new_todo_projection(event):
    todo = TodoProjection(id=event.originator_id, title=event.title)
    todo.save()

entity

Base classes for domain entities of different kinds.

The entity module provides base classes for domain entities.

class eventsourcing.domain.model.entity.AbstractEntityRepository[source]

Bases: eventsourcing.domain.model.entity.AbstractEventPlayer

__contains__(entity_id)[source]

Returns True or False, according to whether or not entity exists.

__getitem__(entity_id)[source]

Returns entity for given ID.

event_store

Returns event store object used by this repository.

get_entity(entity_id, at=None)[source]

Returns entity for given ID.

take_snapshot(entity_id, lt=None, lte=None)[source]

Takes snapshot of entity state, using stored events. :return: Snapshot

class eventsourcing.domain.model.entity.AbstractEventPlayer[source]

Bases: object

class eventsourcing.domain.model.entity.DomainEntity(id)[source]

Bases: eventsourcing.domain.model.events.QualnameABC

Base class for domain entities.

class AttributeChanged(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.AttributeChanged

Published when a DomainEntity is discarded.

class Created(originator_topic, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.Created

Published when an entity is created.

originator_topic
class Discarded(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.events.Discarded, eventsourcing.domain.model.entity.Event

Published when a DomainEntity is discarded.

class Event(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.events.DomainEvent

Supertype for events of domain entities.

__check_obj__(obj)[source]

Checks obj state before mutating.

__assert_not_discarded__()[source]

Raises exception if entity has been discarded already.

__change_attribute__(name, value)[source]

Changes named attribute with the given value, by triggering an AttributeChanged event.

__discard__()[source]

Discards self, by triggering a Discarded event.

__publish__(event)[source]

Publishes given event for subscribers in the application.

Parameters:event – domain event or list of events
__publish_to_subscribers__(event)[source]

Actually dispatches given event to publish-subscribe mechanism.

Parameters:event – domain event or list of events
__trigger_event__(event_class, **kwargs)[source]

Constructs, applies, and publishes a domain event.

id

Entity ID allows an entity instance to be referenced and distinguished from others, even though its state may change over time.

class eventsourcing.domain.model.entity.EntityWithHashchain(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Created

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithHash, eventsourcing.domain.model.entity.Event

Supertype for events of domain entities.

__check_obj__(obj)[source]

Extends superclass method by checking the __previous_hash__ of this event matches the __head__ hash of the entity obj.

class eventsourcing.domain.model.entity.TimestampedEntity(__created_on__=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class AttributeChanged(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when a TimestampedEntity is changed.

class Created(originator_topic, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a TimestampedEntity is created.

class Discarded(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

Published when a TimestampedEntity is discarded.

class Event(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.EventWithTimestamp

Supertype for events of timestamped entities.

__mutate__(obj)[source]

Update obj with values from self.

class eventsourcing.domain.model.entity.TimestampedVersionedEntity(__created_on__=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedEntity, eventsourcing.domain.model.entity.VersionedEntity

class AttributeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged, eventsourcing.domain.model.entity.AttributeChanged

Published when a TimestampedVersionedEntity is created.

class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a TimestampedVersionedEntity is created.

class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded, eventsourcing.domain.model.entity.Discarded

Published when a TimestampedVersionedEntity is discarded.

class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Event

Supertype for events of timestamped, versioned entities.

class eventsourcing.domain.model.entity.TimeuuidedEntity(event_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class eventsourcing.domain.model.entity.TimeuuidedVersionedEntity(event_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimeuuidedEntity, eventsourcing.domain.model.entity.VersionedEntity

class eventsourcing.domain.model.entity.VersionedEntity(__version__=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class AttributeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when a VersionedEntity is changed.

class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a VersionedEntity is created.

class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

Published when a VersionedEntity is discarded.

class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithOriginatorVersion, eventsourcing.domain.model.entity.Event

Supertype for events of versioned entities.

__check_obj__(obj)[source]

Extends superclass method by checking the event’s originator version follows (1 +) this entity’s version.

__trigger_event__(event_class, **kwargs)[source]

Triggers domain event with entity’s next version number.

The event carries the version number that the originator will have when the originator is mutated with this event. (The event’s originator version isn’t the version of the originator that triggered the event. The Created event has version 0, and so a newly created instance is at version 0. The second event has originator version 1, and so will the originator when the second event has been applied.)

events

Base classes for domain events of different kinds.

class eventsourcing.domain.model.events.AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Can be published when an attribute of an entity is created.

name
value
class eventsourcing.domain.model.events.Created(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Can be published when an entity is created.

class eventsourcing.domain.model.events.Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Published when something is discarded.

class eventsourcing.domain.model.events.DomainEvent(**kwargs)[source]

Bases: eventsourcing.domain.model.events.QualnameABC

Base class for domain events.

Implements methods to make instances read-only, comparable for equality, have recognisable representations, and hashable.

__eq__(other)[source]

Tests for equality of two event objects.

__hash__()[source]

Computes a Python integer hash for an event, using its event hash string.

Supports equality and inequality comparisons.

__init__(**kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__mutate__(obj)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
__ne__(other)[source]

Negates the equality test.

__repr__()[source]

Returns string representing the type and attribute values of the event.

__setattr__(key, value)[source]

Inhibits event attributes from being updated by assignment.

mutate(obj)[source]

Convenience for use in custom models, to update obj with values from self without needing to call super method and return obj (two extra lines).

Can be overridden by subclasses. Any value returned by this method will be ignored.

Please note, subclasses that extend mutate() might not have fully completed that method before this method is called. To ensure all base classes have completed their mutate behaviour before mutating an event in a concrete class, extend mutate() instead of overriding this method.

Parameters:obj – object to be mutated
exception eventsourcing.domain.model.events.EventHandlersNotEmptyError[source]

Bases: Exception

class eventsourcing.domain.model.events.EventWithHash(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Base class for domain events.

Implements methods to make instances read-only, comparable for equality, have recognisable representations, and hashable.

__hash__()[source]

Computes a Python integer hash for an event, using its event hash string.

Supports equality and inequality comparisons.

__init__(**kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__mutate__(obj)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
class eventsourcing.domain.model.events.EventWithOriginatorID(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

originator_id
class eventsourcing.domain.model.events.EventWithOriginatorVersion(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an originator version number.

originator_version
class eventsourcing.domain.model.events.EventWithTimestamp(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have a timestamp value.

timestamp
class eventsourcing.domain.model.events.EventWithTimeuuid(event_id=None, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an UUIDv1 event ID.

event_id
class eventsourcing.domain.model.events.Logged(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Published when something is logged.

class eventsourcing.domain.model.events.QualnameABC[source]

Bases: object

Base class that introduces __qualname__ for objects in Python 2.7.

class eventsourcing.domain.model.events.QualnameABCMeta[source]

Bases: abc.ABCMeta

Supplies __qualname__ to object classes with this metaclass.

eventsourcing.domain.model.events.assert_event_handlers_empty()[source]
eventsourcing.domain.model.events.clear_event_handlers()[source]
eventsourcing.domain.model.events.create_timesequenced_event_id()[source]
eventsourcing.domain.model.events.publish(event)[source]
eventsourcing.domain.model.events.subscribe(handler, predicate=None)[source]
eventsourcing.domain.model.events.unsubscribe(handler, predicate=None)[source]

snapshot

Snapshotting is implemented in the domain layer as an event.

class eventsourcing.domain.model.snapshot.AbstractSnapshop[source]

Bases: object

originator_id

ID of the snapshotted entity.

originator_version

Version of the last event applied to the entity.

state

State of the snapshotted entity.

topic

Path to the class of the snapshotted entity.

class eventsourcing.domain.model.snapshot.Snapshot(originator_id, originator_version, topic, state)[source]

Bases: eventsourcing.domain.model.events.EventWithTimestamp, eventsourcing.domain.model.events.EventWithOriginatorVersion, eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.snapshot.AbstractSnapshop

state

State of the snapshotted entity.

topic

Path to the class of the snapshotted entity.

timebucketedlog

Time-bucketed logs allow a sequence of the items that is sequenced by timestamp to be split across a number of different database partitions, which avoids one partition becoming very large (and then unworkable).

class eventsourcing.domain.model.timebucketedlog.MessageLogged(message, originator_id)[source]

Bases: eventsourcing.domain.model.events.EventWithTimestamp, eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.events.Logged

message
class eventsourcing.domain.model.timebucketedlog.Timebucketedlog(name, bucket_size=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

class BucketSizeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.timebucketedlog.Event, eventsourcing.domain.model.entity.AttributeChanged

class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Supertype for events of time-bucketed log.

class Started(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.timebucketedlog.Event

append_message(message)[source]
bucket_size
name
started_on
class eventsourcing.domain.model.timebucketedlog.TimebucketedlogRepository[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

get_or_create(log_name, bucket_size)[source]

Gets or creates a log.

Return type:Timebucketedlog
eventsourcing.domain.model.timebucketedlog.bucket_duration(bucket_size)[source]
eventsourcing.domain.model.timebucketedlog.bucket_starts(timestamp, bucket_size)[source]
eventsourcing.domain.model.timebucketedlog.make_timebucket_id(log_id, timestamp, bucket_size)[source]
eventsourcing.domain.model.timebucketedlog.next_bucket_starts(timestamp, bucket_size)[source]
eventsourcing.domain.model.timebucketedlog.previous_bucket_starts(timestamp, bucket_size)[source]
eventsourcing.domain.model.timebucketedlog.start_new_timebucketedlog(name, bucket_size=None)[source]
eventsourcing.domain.model.timebucketedlog.timestamp_from_datetime(dt)[source]

infrastructure

The infrastructure layer adapts external devices in ways that are useful for the application, such as the way an event store encapsulates a database.

base

Abstract base classes for the infrastructure layer.

class eventsourcing.infrastructure.base.ACIDRecordManager(record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids=False, application_id=None, pipeline_id=-1)[source]

Bases: eventsourcing.infrastructure.base.AbstractSequencedItemRecordManager

ACID record managers can write tracking records and event records in an atomic transaction, needed for atomic processing in process applications.

get_max_record_id()[source]

Return maximum notification ID in pipeline.

tracking_record_class = None
tracking_record_field_names = ['application_id', 'upstream_application_id', 'pipeline_id', 'notification_id']
class eventsourcing.infrastructure.base.AbstractSequencedItemRecordManager(record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids=False, application_id=None, pipeline_id=-1)[source]

Bases: object

all_sequence_ids()[source]

Returns all sequence IDs.

append(sequenced_item_or_items)[source]

Writes sequenced item into the datastore.

delete_record(record)[source]

Removes permanently given record from the table.

from_record(record)[source]

Constructs and returns a sequenced item object, from given ORM object.

get_field_kwargs(item)[source]
get_item(sequence_id, position)[source]

Gets sequenced item from the datastore.

get_items(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns sequenced items.

get_notifications(start=None, stop=None, *args, **kwargs)[source]

Returns records sequenced by notification ID, from application, for pipeline, in given range.

get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

list_items(*args, **kwargs)[source]
list_sequence_ids()[source]
raise_index_error(position)[source]
raise_operational_error(e)[source]
raise_record_integrity_error(e)[source]
raise_sequenced_item_conflict()[source]
to_record(sequenced_item)[source]

Constructs and returns an ORM object, from given sequenced item object.

class eventsourcing.infrastructure.base.AbstractTrackingRecordManager[source]

Bases: object

get_max_record_id(application_name, upstream_application_name, pipeline_id)[source]

Returns maximum record ID for given application name.

record_class

Returns tracking record class.

class eventsourcing.infrastructure.base.RelationalRecordManager(*args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.ACIDRecordManager

_prepare_insert(tmpl, record_class, field_names, placeholder_for_id=False)[source]

Compile SQL statement with placeholders for bind parameters.

append(sequenced_item_or_items)[source]

Writes sequenced item into the datastore.

clone(application_id, pipeline_id, **kwargs)[source]
get_record_table_name(record_class)[source]

Returns table name - used in raw queries.

Return type:str
insert_select_max

SQL statement that inserts records with contiguous IDs, by selecting max ID from indexed table records.

insert_tracking_record

SQL statement that inserts tracking records.

insert_values

SQL statement that inserts records without ID.

to_records(sequenced_item_or_items)[source]
write_records(records, tracking_kwargs=None)[source]

Creates records in the database. :param tracking_kwargs:

cassandra

Classes for event sourcing with Apache Cassandra.

class eventsourcing.infrastructure.cassandra.datastore.CassandraDatastore(tables, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.datastore.Datastore

close_connection()[source]

Drops connection to a datastore.

drop_table(*_)[source]
drop_tables()[source]

Drops tables used to store events.

setup_connection()[source]

Sets up a connection to a datastore.

setup_tables()[source]

Sets up tables used to store events.

truncate_tables()[source]

Truncates tables used to store events.

class eventsourcing.infrastructure.cassandra.datastore.CassandraSettings(hosts=None, port=None, protocol_version=None, default_keyspace=None, consistency=None, replication_factor=None, username=None, password=None)[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreSettings

CONSISTENCY_LEVEL = 'LOCAL_QUORUM'
DEFAULT_KEYSPACE = 'eventsourcing'
HOSTS = ['127.0.0.1']
PORT = 9042
PROTOCOL_VERSION = 3
REPLICATION_FACTOR = 1
class eventsourcing.infrastructure.cassandra.factory.CassandraInfrastructureFactory(record_manager_class=None, sequenced_item_class=None, integer_sequenced_record_class=None, timestamp_sequenced_record_class=None, tracking_record_manager_class=None, snapshot_record_class=None, contiguous_record_ids=False, application_id=None, pipeline_id=-1)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

integer_sequenced_record_class

alias of eventsourcing.infrastructure.cassandra.records.IntegerSequencedRecord

record_manager_class

alias of eventsourcing.infrastructure.cassandra.manager.CassandraRecordManager

snapshot_record_class

alias of eventsourcing.infrastructure.cassandra.records.SnapshotRecord

timestamp_sequenced_record_class

alias of eventsourcing.infrastructure.cassandra.records.TimestampSequencedRecord

class eventsourcing.infrastructure.cassandra.manager.CassandraRecordManager(record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids=False, application_id=None, pipeline_id=-1)[source]

Bases: eventsourcing.infrastructure.base.AbstractSequencedItemRecordManager

all_sequence_ids()[source]

Returns all sequence IDs.

append(sequenced_item_or_items)[source]

Writes sequenced item into the datastore.

delete_record(record)[source]

Removes permanently given record from the table.

filter(**kwargs)[source]
get_item(sequence_id, position)[source]

Gets sequenced item from the datastore.

get_notifications(start=None, stop=None, *args, **kwargs)[source]

Not implemented.

get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

datastore

Base classes for concrete datastore classes.

class eventsourcing.infrastructure.datastore.Datastore(settings)[source]

Bases: object

close_connection()[source]

Drops connection to a datastore.

drop_tables()[source]

Drops tables used to store events.

setup_connection()[source]

Sets up a connection to a datastore.

setup_tables()[source]

Sets up tables used to store events.

truncate_tables()[source]

Truncates tables used to store events.

exception eventsourcing.infrastructure.datastore.DatastoreConnectionError[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreError

exception eventsourcing.infrastructure.datastore.DatastoreError[source]

Bases: Exception

class eventsourcing.infrastructure.datastore.DatastoreSettings[source]

Bases: object

Base class for settings for database connection used by a stored event repository.

exception eventsourcing.infrastructure.datastore.DatastoreTableError[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreError

django

A Django application for event sourcing with the Django ORM.

class eventsourcing.infrastructure.django.manager.DjangoRecordManager(*args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.RelationalRecordManager

_prepare_insert(tmpl, record_class, field_names, placeholder_for_id=False)[source]

With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, using an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.

all_sequence_ids()[source]

Returns all sequence IDs.

delete_record(record)[source]

Permanently removes record from table.

get_item(sequence_id, position)[source]

Gets sequenced item from the datastore.

get_max_record_id()[source]

Return maximum notification ID in pipeline.

get_notifications(start=None, stop=None, *args, **kwargs)[source]

Returns all records in the table.

get_record_table_name(record_class)[source]

Returns table name from SQLAlchemy record class.

get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

write_records(records, tracking_kwargs=None)[source]

Creates records in the database. :param tracking_kwargs:

eventplayer

Base classes for event players of different kinds.

class eventsourcing.infrastructure.eventplayer.EventPlayer(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]

Bases: eventsourcing.domain.model.entity.AbstractEventPlayer

event_store
static mutate(initial, event)[source]
replay_events(initial_state, domain_events)[source]

Evolves initial state using the sequence of domain events and a mutator function.

eventsourcedrepository

Base classes for event sourced repositories (not abstract, can be used directly).

class eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]

Bases: eventsourcing.infrastructure.eventplayer.EventPlayer, eventsourcing.domain.model.entity.AbstractEntityRepository

__contains__(entity_id)[source]

Returns a boolean value according to whether entity with given ID exists.

__getitem__(entity_id)[source]

Returns entity with given ID.

get_entity(entity_id, at=None)[source]

Returns entity with given ID, optionally until position.

replay_entity(entity_id, gt=None, gte=None, lt=None, lte=None, limit=None, initial_state=None, query_descending=False)[source]

Reconstitutes requested domain entity from domain events found in event store.

take_snapshot(entity_id, lt=None, lte=None)[source]

Takes a snapshot of the entity as it existed after the most recent event, optionally less than, or less than or equal to, a particular position.

eventstore

The event store provides the application-level interface to the event sourcing persistence mechanism.

class eventsourcing.infrastructure.eventstore.AbstractEventStore[source]

Bases: object

Abstract base class for event stores. Defines the methods expected of an event store by other classes in the library.

all_domain_events()[source]

Returns all domain events in the event store.

append(domain_event_or_events)[source]

Put domain event in event store for later retrieval.

get_domain_event(originator_id, position)[source]

Returns a single domain event.

get_domain_events(originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True, page_size=None)[source]

Returns domain events for given entity ID.

get_most_recent_event(originator_id, lt=None, lte=None)[source]

Returns most recent domain event for given entity ID.

class eventsourcing.infrastructure.eventstore.EventStore(record_manager, sequenced_item_mapper)[source]

Bases: eventsourcing.infrastructure.eventstore.AbstractEventStore

Event store appends domain events to stored sequences. It uses a record manager to map named tuples to database records, and it uses a sequenced item mapper to map named tuples to application-level objects.

__init__(record_manager, sequenced_item_mapper)[source]

Initialises event store object.

Parameters:
  • record_manager – record manager
  • sequenced_item_mapper – sequenced item mapper
all_domain_events()[source]

Yields all domain events in the event store.

append(domain_event_or_events)[source]

Appends given domain event, or list of domain events, to their sequence.

Parameters:domain_event_or_events – domain event, or list of domain events
get_domain_event(originator_id, position)[source]

Gets a domain event from the sequence identified by originator_id at position eq.

Parameters:
  • originator_id – ID of a sequence of events
  • position – get item at this position
Returns:

domain event

get_domain_events(originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True, page_size=None)[source]

Gets domain events from the sequence identified by originator_id.

Parameters:
  • originator_id – ID of a sequence of events
  • gt – get items after this position
  • gte – get items at or after this position
  • lt – get items before this position
  • lte – get items before or at this position
  • limit – get limited number of items
  • is_ascending – get items from lowest position
  • page_size – restrict and repeat database query
Returns:

list of domain events

get_most_recent_event(originator_id, lt=None, lte=None)[source]

Gets a domain event from the sequence identified by originator_id at the highest position.

Parameters:
  • originator_id – ID of a sequence of events
  • lt – get highest before this position
  • lte – get highest at or before this position
Returns:

domain event

iterator_class

alias of eventsourcing.infrastructure.iterators.SequencedItemIterator

to_sequenced_item(domain_event_or_events)[source]

Maps domain event to sequenced item namedtuple.

Parameters:domain_event_or_events – application-level object (or list)
Returns:namedtuple: sequence item namedtuple (or list)

integersequencegenerators

Different ways of generating sequences of integers.

class eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator[source]

Bases: object

__next__()[source]

Returns the next item in the container.

next()[source]

Python 2.7 version of the iterator protocol.

class eventsourcing.infrastructure.integersequencegenerators.base.SimpleIntegerSequenceGenerator(i=0)[source]

Bases: eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator

class eventsourcing.infrastructure.integersequencegenerators.redisincr.RedisIncr(redis=None, key=None)[source]

Bases: eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator

Generates a sequence of integers, using Redis’ INCR command.

Maximum number is 2**63, or 9223372036854775807, the maximum value of a 64 bit signed integer.

iterators

Different ways of getting sequenced items from a datastore.

class eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: object

DEFAULT_PAGE_SIZE = 1000
__iter__()[source]

Yields a continuous sequence of items.

_inc_page_counter()[source]

Increments the page counter.

Each query result as a page, even if there are no items in the page. This really counts queries.
  • it is easy to divide the number of events by the page size if the “correct” answer is required
  • there will be a difference in the counts when the number of events can be exactly divided by the page size, because there is no way to know in advance that a full page is also the last page.
_inc_query_counter()[source]

Increments the query counter.

class eventsourcing.infrastructure.iterators.GetEntityEventsThread(record_manager, sequence_id, gt=None, gte=None, lt=None, lte=None, page_size=None, is_ascending=True, *args, **kwargs)[source]

Bases: threading.Thread

run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class eventsourcing.infrastructure.iterators.SequencedItemIterator(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator

__iter__()[source]

Yields a continuous sequence of items from “pages” of sequenced items retrieved using the record manager.

class eventsourcing.infrastructure.iterators.ThreadedSequencedItemIterator(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator

start_thread()[source]

repositories

Repository base classes for entity classes defined in the library.

class eventsourcing.infrastructure.repositories.array.ArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.array.AbstractArrayRepository, eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

class eventsourcing.infrastructure.repositories.array.BigArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.array.AbstractBigArrayRepository, eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

subrepo

Sub-sequence repository.

subrepo_class

alias of ArrayRepository

class eventsourcing.infrastructure.repositories.collection_repo.CollectionRepository(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]

Bases: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository, eventsourcing.domain.model.collection.AbstractCollectionRepository

Event sourced repository for the Collection domain model entity.

class eventsourcing.infrastructure.repositories.timebucketedlog_repo.TimebucketedlogRepo(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]

Bases: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository, eventsourcing.domain.model.timebucketedlog.TimebucketedlogRepository

Event sourced repository for the Example domain model entity.

sequenceditem

The persistence model for storing events.

class eventsourcing.infrastructure.sequenceditem.SequencedItem(sequence_id, position, topic, data)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, sequence_id, position, topic, data)

Create new instance of SequencedItem(sequence_id, position, topic, data)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values.

classmethod _make(iterable, new=<built-in method __new__ of type object at 0xa385c0>, len=<built-in function len>)

Make a new SequencedItem object from a sequence or iterable

_replace(**kwds)

Return a new SequencedItem object replacing specified fields with new values

data

Alias for field number 3

position

Alias for field number 1

sequence_id

Alias for field number 0

topic

Alias for field number 2

class eventsourcing.infrastructure.sequenceditem.SequencedItemFieldNames(sequenced_item_class)[source]

Bases: object

data
other_names
position
sequence_id
topic
class eventsourcing.infrastructure.sequenceditem.StoredEvent(originator_id, originator_version, event_type, state)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, originator_id, originator_version, event_type, state)

Create new instance of StoredEvent(originator_id, originator_version, event_type, state)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values.

classmethod _make(iterable, new=<built-in method __new__ of type object at 0xa385c0>, len=<built-in function len>)

Make a new StoredEvent object from a sequence or iterable

_replace(**kwds)

Return a new StoredEvent object replacing specified fields with new values

event_type

Alias for field number 2

originator_id

Alias for field number 0

originator_version

Alias for field number 1

state

Alias for field number 3

sequenceditemmapper

The sequenced item mapper maps sequenced items to application-level objects.

class eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper[source]

Bases: object

from_sequenced_item(sequenced_item)[source]

Return domain event from given sequenced item.

to_sequenced_item(domain_event)[source]

Returns sequenced item for given domain event.

class eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper(sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=None, json_decoder_class=None, cipher=None, other_attr_names=())[source]

Bases: eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper

Uses JSON to transcode domain events.

construct_item_args(domain_event)[source]

Constructs attributes of a sequenced item from the given domain event.

construct_sequenced_item(item_args)[source]
from_sequenced_item(sequenced_item)[source]

Reconstructs domain event from stored event topic and event attrs. Used in the event store when getting domain events.

from_topic_and_data(topic, data)[source]
to_sequenced_item(domain_event)[source]

Constructs a sequenced item from a domain event.

eventsourcing.infrastructure.sequenceditemmapper.reconstruct_object(obj_class, obj_state)[source]

snapshotting

Snapshotting avoids having to replay an entire sequence of events to obtain the current state of a projection.

class eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy[source]

Bases: object

get_snapshot(entity_id, lt=None, lte=None)[source]

Gets the last snapshot for entity, optionally until a particular version number.

Return type:Snapshot
take_snapshot(entity_id, entity, last_event_version)[source]

Takes a snapshot of entity, using given ID, state and version number.

Return type:AbstractSnapshop
class eventsourcing.infrastructure.snapshotting.EventSourcedSnapshotStrategy(snapshot_store)[source]

Bases: eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy

Snapshot strategy that uses an event sourced snapshot.

get_snapshot(entity_id, lt=None, lte=None)[source]

Gets the last snapshot for entity, optionally until a particular version number.

Return type:Snapshot
take_snapshot(entity_id, entity, last_event_version)[source]

Creates a Snapshot from the given state, and appends it to the snapshot store.

Return type:Snapshot
eventsourcing.infrastructure.snapshotting.entity_from_snapshot(snapshot)[source]

Reconstructs domain entity from given snapshot.

sqlalchemy

Classes for event sourcing with SQLAlchemy.

class eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemyDatastore(base=<class 'sqlalchemy.ext.declarative.api.Base'>, tables=None, connection_strategy='plain', session=None, **kwargs)[source]

Bases: eventsourcing.infrastructure.datastore.Datastore

close_connection()[source]

Drops connection to a datastore.

drop_table(table)[source]
drop_tables()[source]

Drops tables used to store events.

is_sqlite()[source]
session
setup_connection()[source]

Sets up a connection to a datastore.

setup_table(table)[source]
setup_tables(tables=None)[source]

Sets up tables used to store events.

truncate_tables()[source]

Truncates tables used to store events.

class eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemySettings(uri=None, pool_size=5)[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreSettings

class eventsourcing.infrastructure.sqlalchemy.factory.SQLAlchemyInfrastructureFactory(session, uri=None, pool_size=5, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

construct_datastore()[source]
construct_record_manager(**kwargs)[source]
construct_tracking_record_manager()[source]
integer_sequenced_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord

record_manager_class

alias of eventsourcing.infrastructure.sqlalchemy.manager.SQLAlchemyRecordManager

snapshot_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.SnapshotRecord

timestamp_sequenced_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord

tracking_record_manager_class

alias of eventsourcing.infrastructure.sqlalchemy.manager.TrackingRecordManager

eventsourcing.infrastructure.sqlalchemy.factory.construct_sqlalchemy_eventstore(session, sequenced_item_class=None, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=None, json_decoder_class=None, cipher=None, record_class=None, contiguous_record_ids=False, application_id=None, pipeline_id=-1)[source]
class eventsourcing.infrastructure.sqlalchemy.manager.SQLAlchemyRecordManager(session, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.RelationalRecordManager

_prepare_insert(tmpl, record_class, field_names, placeholder_for_id=False)[source]

With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, assumes an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.

all_sequence_ids()[source]

Returns all sequence IDs.

clone(**kwargs)[source]
delete_record(record)[source]

Permanently removes record from table.

filter_by(**kwargs)[source]
get_item(sequence_id, position)[source]

Gets sequenced item from the datastore.

get_max_record_id()[source]

Return maximum notification ID in pipeline.

get_notifications(start=None, stop=None, *args, **kwargs)[source]

Returns records sequenced by notification ID, from application, for pipeline, in given range.

get_pipeline_and_notification_id(sequence_id, position)[source]
get_record(sequence_id, position)[source]
get_record_table_name(record_class)[source]

Returns table name - used in raw queries.

Return type:str
get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

orm_query()[source]
tracking_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.NotificationTrackingRecord

write_records(records, tracking_kwargs=None)[source]

Creates records in the database. :param tracking_kwargs:

class eventsourcing.infrastructure.sqlalchemy.manager.TrackingRecordManager(session)[source]

Bases: eventsourcing.infrastructure.base.AbstractTrackingRecordManager

get_max_record_id(application_name, upstream_application_name, pipeline_id)[source]

Returns maximum record ID for given application name.

has_tracking_record(application_id, upstream_application_name, pipeline_id, notification_id)[source]
record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.NotificationTrackingRecord

class eventsourcing.infrastructure.sqlalchemy.records.EntitySnapshotRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

event_type
originator_id
originator_version
state
class eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedNoIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

data
position
sequence_id
topic
eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedRecord

alias of eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord

class eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

data
id
position
sequence_id
topic
class eventsourcing.infrastructure.sqlalchemy.records.NotificationTrackingRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

application_id
notification_id
pipeline_id
upstream_application_id
class eventsourcing.infrastructure.sqlalchemy.records.SnapshotRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

data
position
sequence_id
topic
class eventsourcing.infrastructure.sqlalchemy.records.StoredEventRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

application_id
causal_dependencies
event_type
id
originator_id
originator_version
pipeline_id
state
class eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

data
position
sequence_id
topic
eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedRecord

alias of eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord

class eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedWithIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

data
id
position
sequence_id
topic

timebucketedlog_reader

Reader for timebucketed logs.

class eventsourcing.infrastructure.timebucketedlog_reader.TimebucketedlogReader(log, event_store, page_size=50)[source]

Bases: object

get_events(gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=False, page_size=None)[source]
get_messages(gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=False, page_size=None)[source]
eventsourcing.infrastructure.timebucketedlog_reader.get_timebucketedlog_reader(log, event_store)[source]
Return type:TimebucketedlogReader

interface

The interface layer uses an application to service client requests.

notificationlog

Notification log is a pull-based mechanism for updating other applications.

class eventsourcing.interface.notificationlog.AbstractNotificationLog[source]

Bases: object

Presents a sequence of sections from a sequence of notifications.

__getitem__(section_id)[source]

Get section of notification log.

Return type:Section
class eventsourcing.interface.notificationlog.BigArrayNotificationLog(big_array, section_size)[source]

Bases: eventsourcing.interface.notificationlog.LocalNotificationLog

get_items(start, stop, next_position=None)[source]

Returns items for section.

Return type:list
get_next_position()[source]

Returns items for section.

Return type:int
class eventsourcing.interface.notificationlog.LocalNotificationLog(section_size=None)[source]

Bases: eventsourcing.interface.notificationlog.AbstractNotificationLog

Presents a sequence of sections from a sequence of notifications.

static format_section_id(first_item_number, last_item_number)[source]
get_items(start, stop, next_position=None)[source]

Returns items for section.

Return type:list
get_next_position()[source]

Returns items for section.

Return type:int
class eventsourcing.interface.notificationlog.NotificationLogReader(notification_log)[source]

Bases: object

next()[source]

Python 2.7 version of the iterator protocol.

read(advance_by=None)[source]
read_items(stop_index=None, advance_by=None)[source]
read_list(advance_by=None)[source]
seek(position)[source]
class eventsourcing.interface.notificationlog.NotificationLogView(notification_log, json_encoder_class=None)[source]

Bases: object

present_section(section_id)[source]
class eventsourcing.interface.notificationlog.RecordManagerNotificationLog(record_manager, section_size)[source]

Bases: eventsourcing.interface.notificationlog.LocalNotificationLog

get_end_position()[source]
get_items(start, stop, next_position=None)[source]

Returns items for section.

Return type:list
get_next_position()[source]

Returns items for section.

Return type:int
class eventsourcing.interface.notificationlog.RemoteNotificationLog(base_url, json_decoder_class=None)[source]

Bases: eventsourcing.interface.notificationlog.AbstractNotificationLog

deserialize_section(section_json)[source]
get_json(section_id)[source]
get_resource(doc_url)[source]
make_notification_log_url(section_id)[source]
class eventsourcing.interface.notificationlog.Section(section_id, items, previous_id=None, next_id=None)[source]

Bases: object

Section of a notification log.

Contains items, and has an ID.

May also have either IDs of previous and next sections of the notification log.

utils

The utils package contains common functions that are used in more than one layer.

cipher

class eventsourcing.utils.cipher.aes.AESCipher(cipher_key)[source]

Bases: object

Cipher strategy that uses Crypto library AES cipher in GCM mode.

__init__(cipher_key)[source]

Initialises AES cipher strategy with cipher_key.

Parameters:cipher_key – 16, 24, or 32 random bytes
decrypt(ciphertext)[source]

Return plaintext for given ciphertext.

encrypt(plaintext)[source]

Return ciphertext for given plaintext.

time

eventsourcing.utils.times.datetime_from_timestamp(t)[source]

Returns naive UTC datetime from decimal UNIX timestamps such as time.time().

Parameters:t – timestamp, either Decimal or float
Returns:datetime.datetime object
eventsourcing.utils.times.decimaltimestamp(t=None)[source]

A UNIX timestamp as a Decimal object (exact number type).

Returns current time when called without args, otherwise converts given floating point number t to a Decimal with 9 decimal places.

Parameters:t – Floating point UNIX timestamp (“seconds since epoch”).
Returns:A Decimal with 6 decimal places, representing the given floating point or the value returned by time.time().
eventsourcing.utils.times.decimaltimestamp_from_uuid(uuid_arg)[source]

Return a floating point unix timestamp.

Parameters:uuid_arg
Returns:Unix timestamp in seconds, with microsecond precision.
Return type:float
eventsourcing.utils.times.timestamp_long_from_uuid(uuid_arg)[source]

Returns an integer value representing a unix timestamp in tenths of microseconds.

Parameters:uuid_arg
Returns:Unix timestamp integer in tenths of microseconds.
Return type:int

topic

eventsourcing.utils.topic.get_topic(domain_class)[source]

Returns a string describing a class.

Args:
domain_class: A class.
Returns:
A string describing the class.
eventsourcing.utils.topic.resolve_attr(obj, path)[source]

A recursive version of getattr for navigating dotted paths.

Args:
obj: An object for which we want to retrieve a nested attribute. path: A dot separated string containing zero or more attribute names.
Returns:
The attribute referred to by obj.a1.a2.a3…
Raises:
AttributeError: If there is no such attribute.
eventsourcing.utils.topic.resolve_topic(topic)[source]

Return class described by given topic.

Args:
topic: A string describing a class.
Returns:
A class.
Raises:
TopicResolutionError: If there is no such class.

transcoding

class eventsourcing.utils.transcoding.ObjectJSONDecoder(object_hook=None, **kwargs)[source]

Bases: json.decoder.JSONDecoder

classmethod from_jsonable(d)[source]
class eventsourcing.utils.transcoding.ObjectJSONEncoder(sort_keys=True, *args, **kwargs)[source]

Bases: json.encoder.JSONEncoder

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
eventsourcing.utils.transcoding.json_dumps(obj, cls=None)[source]
eventsourcing.utils.transcoding.json_loads(s, cls=None)[source]

exceptions

A few exception classes are defined by the library to indicate particular kinds of error.

exception eventsourcing.exceptions.ArrayIndexError[source]

Bases: IndexError, eventsourcing.exceptions.EventSourcingError

Raised when appending item to an array that is full.

exception eventsourcing.exceptions.CausalDependencyFailed[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when a causal dependency fails (after its tracking record not found).

exception eventsourcing.exceptions.ConcurrencyError[source]

Bases: eventsourcing.exceptions.RecordConflictError

Raised when a record conflict is due to concurrency.

exception eventsourcing.exceptions.ConsistencyError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when applying an event stream to a versioned entity.

exception eventsourcing.exceptions.DataIntegrityError[source]

Bases: ValueError, eventsourcing.exceptions.EventSourcingError

Raised when a sequenced item data is damaged (hash doesn’t match data)

exception eventsourcing.exceptions.DatasourceSettingsError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when an error is detected in settings for a datasource.

exception eventsourcing.exceptions.EntityIsDiscarded[source]

Bases: AssertionError

Raised when access to a recently discarded entity object is attempted.

exception eventsourcing.exceptions.EntityVersionNotFound[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raise when accessing an entity version that does not exist.

exception eventsourcing.exceptions.EventHashError[source]

Bases: eventsourcing.exceptions.DataIntegrityError

Raised when an event’s seal hash doesn’t match the hash of the state of the event.

exception eventsourcing.exceptions.EventRecordNotFound[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when an event record is not found.

exception eventsourcing.exceptions.EventSourcingError[source]

Bases: Exception

Base eventsourcing exception.

exception eventsourcing.exceptions.HeadHashError[source]

Bases: eventsourcing.exceptions.DataIntegrityError, eventsourcing.exceptions.MismatchedOriginatorError

Raised when applying an event with hash different from aggregate head.

exception eventsourcing.exceptions.MismatchedOriginatorError[source]

Bases: eventsourcing.exceptions.ConsistencyError

Raised when applying an event to an inappropriate object.

exception eventsourcing.exceptions.MutatorRequiresTypeNotInstance[source]

Bases: eventsourcing.exceptions.ConsistencyError

Raised when mutator function received a class rather than an entity.

exception eventsourcing.exceptions.OperationalError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when an operational error is encountered.

exception eventsourcing.exceptions.OriginatorIDError[source]

Bases: eventsourcing.exceptions.MismatchedOriginatorError

Raised when applying an event to the wrong entity or aggregate.

exception eventsourcing.exceptions.OriginatorVersionError[source]

Bases: eventsourcing.exceptions.MismatchedOriginatorError

Raised when applying an event to the wrong version of an entity or aggregate.

exception eventsourcing.exceptions.ProgrammingError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when programming errors are encountered.

exception eventsourcing.exceptions.PromptFailed[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when prompt fails.

exception eventsourcing.exceptions.RecordConflictError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when database raises an integrity error.

exception eventsourcing.exceptions.RepositoryKeyError[source]

Bases: KeyError, eventsourcing.exceptions.EventSourcingError

Raised when using entity repository’s dictionary like interface to get an entity that does not exist.

exception eventsourcing.exceptions.TimeSequenceError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when a time sequence error occurs e.g. trying to save a timestamp that already exists.

exception eventsourcing.exceptions.TopicResolutionError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when unable to resolve a topic to a Python class.

exception eventsourcing.exceptions.TrackingRecordNotFound[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when a tracking record is not found.

example

A simple, unit-tested, event sourced application.

application

class eventsourcing.example.application.ApplicationWithEventStores(entity_record_manager=None, log_record_manager=None, snapshot_record_manager=None, cipher=None)[source]

Bases: object

Event sourced application object class.

Can construct event stores using given database records. Supports three different event stores: for log events, for entity events, and for snapshot events.

close()[source]
construct_event_store(event_sequence_id_attr, event_position_attr, record_manager, cipher=None)[source]
construct_sequenced_item_mapper(sequenced_item_class, event_sequence_id_attr, event_position_attr, cipher=None, json_encoder_class=<class 'eventsourcing.utils.transcoding.ObjectJSONEncoder'>, json_decoder_class=<class 'eventsourcing.utils.transcoding.ObjectJSONDecoder'>)[source]
class eventsourcing.example.application.ApplicationWithPersistencePolicies(**kwargs)[source]

Bases: eventsourcing.example.application.ApplicationWithEventStores

close()[source]
construct_entity_persistence_policy()[source]
construct_log_persistence_policy()[source]
construct_snapshot_persistence_policy()[source]
class eventsourcing.example.application.ExampleApplication(**kwargs)[source]

Bases: eventsourcing.example.application.ApplicationWithPersistencePolicies

Example event sourced application with entity factory and repository.

create_new_example(foo='', a='', b='')[source]

Entity object factory.

eventsourcing.example.application.close_example_application()[source]

Shuts down single global instance of application.

To be called when tearing down, perhaps between tests, in order to allow a subsequent call to init_example_application().

eventsourcing.example.application.construct_example_application(**kwargs)[source]

Application object factory.

eventsourcing.example.application.get_example_application()[source]

Returns single global instance of application.

To be called when handling a worker request, if required.

eventsourcing.example.application.init_example_application(**kwargs)[source]

Constructs single global instance of application.

To be called when initialising a worker process.

domainmodel

class eventsourcing.example.domainmodel.AbstractExampleRepository[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

class eventsourcing.example.domainmodel.Example(foo='', a='', b='', **kwargs)[source]

Bases: eventsourcing.domain.model.entity.EntityWithHashchain, eventsourcing.domain.model.entity.TimestampedVersionedEntity

An example event sourced domain model entity.

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when an Example is created.

class Created(**kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Created

Published when an Example is created.

class Discarded(**kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.Discarded

Published when an Example is discarded.

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Event

Supertype for events of example entities.

class Heartbeat(**kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.Event

Published when a heartbeat in the entity occurs (see below).

mutate(obj)[source]

Update obj with values from self.

a

An example attribute.

b

Another example attribute.

beat_heart(number_of_beats=1)[source]
count_heartbeats()[source]
foo

An example attribute.

eventsourcing.example.domainmodel.create_new_example(foo='', a='', b='')[source]

Factory method for example entities.

Return type:Example

infrastructure

class eventsourcing.example.infrastructure.ExampleRepository(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]

Bases: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository, eventsourcing.example.domainmodel.AbstractExampleRepository

Event sourced repository for the Example domain model entity.

interface

class eventsourcing.example.interface.flaskapp.IntegerSequencedItem(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Model

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

data
id
position
sequence_id
topic
eventsourcing.example.interface.flaskapp.hello()[source]
eventsourcing.example.interface.flaskapp.init_example_application_with_sqlalchemy()[source]