eventsourcing

Interface layer

eventsourcing.interface.notificationlog

class eventsourcing.interface.notificationlog.AbstractNotificationLog[source]

Bases: object

Presents a sequence of sections from a sequence of notifications.

class eventsourcing.interface.notificationlog.NotificationLog(big_array, section_size)[source]

Bases: eventsourcing.interface.notificationlog.AbstractNotificationLog

static format_section_id(first_item_number, last_item_number)[source]
class eventsourcing.interface.notificationlog.NotificationLogReader(notification_log)[source]

Bases: object

get_items(stop_index=None)[source]
seek(position)[source]
class eventsourcing.interface.notificationlog.RemoteNotificationLog(base_url, notification_log_id)[source]

Bases: eventsourcing.interface.notificationlog.AbstractNotificationLog

get_json(section_id)[source]
get_resource(doc_url)[source]
make_notification_log_url(notification_log_id)[source]
class eventsourcing.interface.notificationlog.Section(section_id, items, previous_id=None, next_id=None)[source]

Bases: object

Section of a notification log.

Contains items, and has an ID.

May also have either IDs of previous and next sections of the notification log.

eventsourcing.interface.notificationlog.deserialize_section(section_json)[source]
eventsourcing.interface.notificationlog.present_section(big_array, section_id, section_size)[source]
eventsourcing.interface.notificationlog.serialize_section(section)[source]

Application layer

eventsourcing.application.base

class eventsourcing.application.base.ApplicationWithEventStores(entity_active_record_strategy=None, log_active_record_strategy=None, snapshot_active_record_strategy=None, always_encrypt=False, cipher=None)[source]

Bases: object

close()[source]
construct_event_store(event_sequence_id_attr, event_position_attr, active_record_strategy, always_encrypt=False, cipher=None)[source]
construct_sequenced_item_mapper(sequenced_item_class, event_sequence_id_attr, event_position_attr, json_encoder_class=<class 'eventsourcing.infrastructure.transcoding.ObjectJSONEncoder'>, json_decoder_class=<class 'eventsourcing.infrastructure.transcoding.ObjectJSONDecoder'>, always_encrypt=False, cipher=None)[source]
class eventsourcing.application.base.ApplicationWithPersistencePolicies(**kwargs)[source]

Bases: eventsourcing.application.base.ApplicationWithEventStores

close()[source]
construct_entity_persistence_policy()[source]
construct_log_persistence_policy()[source]
construct_snapshot_persistence_policy()[source]

eventsourcing.application.policies

class eventsourcing.application.policies.PersistencePolicy(event_store, event_type=None)[source]

Bases: object

Stores events of given type to given event store, whenever they are published.

close()[source]
is_event(event)[source]
store_event(event)[source]

Domain layer

eventsourcing.domain.model.aggregate

class eventsourcing.domain.model.aggregate.AggregateRoot(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

Root entity for an aggregate in a domain driven design.

class AttributeChanged(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when an AggregateRoot is changed.

class Created(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Created

Published when an AggregateRoot is created.

class Discarded(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Discarded

Published when an AggregateRoot is discarded.

class Event(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Layer supertype.

save()[source]

Publishes pending events for others in application.

eventsourcing.domain.model.array

class eventsourcing.domain.model.array.AbstractArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

Repository for sequence objects.

class eventsourcing.domain.model.array.AbstractBigArrayRepository(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

Repository for compound sequence objects.

subrepo

Sub-sequence repository.

class eventsourcing.domain.model.array.Array(array_id, repo)[source]

Bases: object

append(item)[source]

Sets item in next position after the last item.

get_item_assigned(index)[source]
get_items_assigned(start_index=None, stop_index=None, limit=None, is_ascending=True)[source]
get_last_item_and_next_position()[source]
get_next_position()[source]
class eventsourcing.domain.model.array.BigArray(array_id, repo)[source]

Bases: eventsourcing.domain.model.array.Array

A virtual array holding items in indexed positions, across a number of Array instances.

Getting and setting items at index position is supported. Slices are supported, and operate across the underlying arrays. Appending is also supported.

BigArray is designed to overcome the concern of needing a single large sequence that may not be suitably stored in any single partiton. In simple terms, if events of an aggregate can fit in a partition, we can use the same size partition to make a tree of arrays that will certainly be capable of sequencing all the events of the application in a single stream.

With normal size base arrays, enterprise applications can expect read and write time to be approximately constant with respect to the number of items in the array.

The array is composed of a tree of arrays, which gives the capacity equal to the size of each array to the power of the size of each array. If the arrays are limited to be about the maximum size of an aggregate event stream (a large number but not too many that would cause there to be too much data in any one partition, let’s say 1000s to be safe) then it would be possible to fit such a large number of aggregates in the corresponding BigArray, that we can be confident it would be full.

Write access time in the worst case, and the time to identify the index of the last item in the big array, is proportional to the log of the highest assigned index to the base of the underlying array size. Write time on average, and read time given an index, is contant with respect to the number of items in a BigArray.

Items can be appended in log time in a single thread. However, the time between reading the current last index and claiming the next position leads to contention and retries when there are lots of threads of execution all attempting to append items, which inherently limits throughput.

Todo: Not possible in Cassandra, but maybe do it in a transaction in SQLAlchemy?

An alternative to reading the last item before writing the next is to use an integer sequence generator to generate a stream of integers. Items can be assigned to index positions in a big array, according to the integers that are issued. Throughput will then be much better, and will be limited only by the rate at which the database can have events written to it (unless the number generator is quite slow).

An external integer sequence generator, such as Redis’ INCR command, or an auto-incrementing database column, may constitute a single point of failure.

calc_parent(i, j, h)[source]

Returns get_big_array and end of span of parent sequence that contains given child.

calc_required_height(n, size)[source]
create_array_id(i, j)[source]
get_item(position)[source]
get_last_array()[source]

Returns last array in compound.

Return type:CompoundSequenceReader
get_last_item_and_next_position()[source]
get_slice(start, stop)[source]
class eventsourcing.domain.model.array.ItemAssigned(item, index, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Occurs when an item is set at a position in an array.

index
item

eventsourcing.domain.model.decorators

eventsourcing.domain.model.decorators.attribute(getter)[source]

When used as a method decorator, returns a property object with the method as the getter and a setter defined to call instance method change_attribute(), which publishes an AttributeChanged event.

eventsourcing.domain.model.decorators.mutator(arg=None)[source]

Structures mutator functions by allowing handlers to be registered for different types of event. When the decorated function is called with an initial value and an event, it will call the handler that has been registered for that type of event.

It works like singledispatch, which it uses. The difference is that when the decorated function is called, this decorator dispatches according to the type of last call arg, which fits better with reduce(). The builtin Python function reduce() is used by the library to replay a sequence of events against an initial state. If a mutator function is given to reduce(), along with a list of events and an initializer, reduce() will call the mutator function once for each event in the list, but the initializer will be the first value, and the event will be the last argument, and we want to dispatch according to the type of the event. It happens that singledispatch is coded to switch on the type of the first argument, which makes it unsuitable for structuring a mutator function without the modifications introduced here.

The other aspect introduced by this decorator function is the option to set the type of the handled entity in the decorator. When an entity is replayed from scratch, in other words when all its events are replayed, the initial state is None. The handler which handles the first event in the sequence will probably construct an object instance. It is possible to write the type into the handler, but that makes the entity more difficult to subclass because you will also need to write a handler for it. If the decorator is invoked with the type, when the initial value passed as a call arg to the mutator function is None, the handler will instead receive the type of the entity, which it can use to construct the entity object.

class Entity(object):
    class Created(object):
        pass

@mutator(Entity)
def mutate(initial, event):
    raise NotImplementedError(type(event))

@mutate.register(Entity.Created)
def _(initial, event):
    return initial(**event.__dict__)

entity = mutate(None, Entity.Created())
eventsourcing.domain.model.decorators.random() → x in the interval [0, 1).
eventsourcing.domain.model.decorators.retry(exc=<class 'Exception'>, max_retries=1, wait=0)[source]
eventsourcing.domain.model.decorators.subscribe_to(event_class)[source]

Decorator for making a custom event handler function subscribe to a certain event type

event_class: DomainEvent class or its child classes that the handler function should subscribe to

The following example shows a custom handler that reacts to Todo.Created event and saves a projection of a Todo model object.

@subscribe_to(Todo.Created)
def new_todo_projection(event):
    todo = TodoProjection(id=event.originator_id, title=event.title)
    todo.save()

eventsourcing.domain.model.entity

class eventsourcing.domain.model.entity.AbstractEntityRepository(*args, **kwargs)[source]

Bases: object

event_store

Returns event store object used by this repository.

get_entity(entity_id)[source]

Returns entity for given ID.

class eventsourcing.domain.model.entity.DomainEntity(originator_id)[source]

Bases: eventsourcing.domain.model.events.QualnameABC

class AttributeChanged(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.AttributeChanged

Published when a DomainEntity is discarded.

class Created(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.Created

Published when a DomainEntity is created.

class Discarded(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.Discarded

Published when a DomainEntity is discarded.

class Event(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.events.DomainEvent

Layer supertype.

change_attribute(name, value, **kwargs)[source]

Changes given attribute of the entity, by constructing and applying an AttributeChanged event.

discard(**kwargs)[source]
id
class eventsourcing.domain.model.entity.TimestampedEntity(timestamp, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class AttributeChanged(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when a TimestampedEntity is changed.

class Created(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Created

Published when a TimestampedEntity is created.

class Discarded(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

Published when a TimestampedEntity is discarded.

class Event(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithTimestamp, eventsourcing.domain.model.entity.Event

Layer supertype.

created_on
last_modified_on
class eventsourcing.domain.model.entity.TimestampedVersionedEntity(timestamp, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedEntity, eventsourcing.domain.model.entity.VersionedEntity

class AttributeChanged(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged, eventsourcing.domain.model.entity.AttributeChanged

Published when a TimestampedVersionedEntity is created.

class Created(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Created

Published when a TimestampedVersionedEntity is created.

class Discarded(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded, eventsourcing.domain.model.entity.Discarded

Published when a TimestampedVersionedEntity is discarded.

class Event(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Event

Layer supertype.

class eventsourcing.domain.model.entity.TimeuuidedEntity(event_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

created_on
last_modified_on
class eventsourcing.domain.model.entity.TimeuuidedVersionedEntity(event_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimeuuidedEntity, eventsourcing.domain.model.entity.VersionedEntity

class eventsourcing.domain.model.entity.VersionedEntity(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class AttributeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when a VersionedEntity is changed.

class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Created

Published when a VersionedEntity is created.

class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

Published when a VersionedEntity is discarded.

class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithOriginatorVersion, eventsourcing.domain.model.entity.Event

Layer supertype.

change_attribute(name, value, **kwargs)[source]
discard(**kwargs)[source]
version
class eventsourcing.domain.model.entity.WithReflexiveMutator(originator_id)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

Implements an entity mutator function by dispatching to the event itself all calls to mutate an entity with an event.

This is an alternative to using an independent mutator function implemented with the @mutator decorator, or an if-else block.

eventsourcing.domain.model.entity.mutate_entity(initial, event)[source]

Entity mutator function. Mutates initial state by the event.

Different handlers are registered for different types of event.

eventsourcing.domain.model.events

class eventsourcing.domain.model.events.AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Can be published when an attribute of an entity is created.

name
value
class eventsourcing.domain.model.events.Created(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Can be published when an entity is created.

class eventsourcing.domain.model.events.Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Published when something is discarded.

class eventsourcing.domain.model.events.DomainEvent(**kwargs)[source]

Bases: eventsourcing.domain.model.events.QualnameABC

Base class for domain events.

Implements methods to make instances read-only, comparable for equality, have recognisable representations, and hashable.

exception eventsourcing.domain.model.events.EventHandlersNotEmptyError[source]

Bases: Exception

class eventsourcing.domain.model.events.EventWithOriginatorID(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

originator_id
class eventsourcing.domain.model.events.EventWithOriginatorVersion(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an originator version number.

originator_version
class eventsourcing.domain.model.events.EventWithTimestamp(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have a timestamp value.

timestamp
class eventsourcing.domain.model.events.EventWithTimeuuid(event_id=None, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an UUIDv1 event ID.

event_id
class eventsourcing.domain.model.events.Logged(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Published when something is logged.

class eventsourcing.domain.model.events.QualnameABC[source]

Bases: object

Base class that introduces __qualname__ for objects in Python 2.7.

class eventsourcing.domain.model.events.QualnameABCMeta[source]

Bases: abc.ABCMeta

Supplies __qualname__ to object classes with this metaclass.

eventsourcing.domain.model.events.assert_event_handlers_empty()[source]
eventsourcing.domain.model.events.create_timesequenced_event_id()[source]
eventsourcing.domain.model.events.publish(event)[source]
eventsourcing.domain.model.events.reconstruct_object(obj_class, obj_state)[source]
eventsourcing.domain.model.events.resolve_attr(obj, path)[source]

A recursive version of getattr for navigating dotted paths.

Args:
obj: An object for which we want to retrieve a nested attribute. path: A dot separated string containing zero or more attribute names.
Returns:
The attribute referred to by obj.a1.a2.a3…
Raises:
AttributeError: If there is no such attribute.
eventsourcing.domain.model.events.resolve_domain_topic(topic)[source]

Return domain class described by given topic.

Args:
topic: A string describing a domain class.
Returns:
A domain class.
Raises:
TopicResolutionError: If there is no such domain class.
eventsourcing.domain.model.events.subscribe(handler, predicate=None)[source]
eventsourcing.domain.model.events.topic_from_domain_class(domain_class)[source]

Returns a string describing a domain event class.

Args:
domain_class: A domain entity or event class.
Returns:
A string describing the class.
eventsourcing.domain.model.events.unsubscribe(handler, predicate=None)[source]

eventsourcing.domain.model.snapshot

class eventsourcing.domain.model.snapshot.AbstractSnapshop[source]

Bases: object

originator_id

ID of the snapshotted entity.

originator_version

Version of the last event applied to the entity.

state

State of the snapshotted entity.

topic

Path to the class of the snapshotted entity.

class eventsourcing.domain.model.snapshot.Snapshot(originator_id, originator_version, topic, state)[source]

Bases: eventsourcing.domain.model.events.EventWithTimestamp, eventsourcing.domain.model.events.EventWithOriginatorVersion, eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.snapshot.AbstractSnapshop

state

State of the snapshotted entity.

topic

Path to the class of the snapshotted entity.

eventsourcing.domain.services.aes_cipher

eventsourcing.domain.services.cipher

class eventsourcing.domain.services.cipher.AbstractCipher[source]

Bases: object

decrypt(ciphertext)[source]

Return plaintext for given ciphertext.

encrypt(plaintext)[source]

Return ciphertext for given plaintext.

Infrastructure layer

eventsourcing.infrastructure.eventsourcedrepository

class eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository(event_store, mutator=None, snapshot_strategy=None, use_cache=False, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

event_store
get_entity(entity_id, lt=None, lte=None)[source]

Returns entity with given ID, optionally until position.

mutator(initial, event)

Entity mutator function. Mutates initial state by the event.

Different handlers are registered for different types of event.

take_snapshot(entity_id, lt=None, lte=None)[source]

eventsourcing.infrastructure.eventplayer

class eventsourcing.infrastructure.eventplayer.EventPlayer(event_store, mutator, page_size=None, is_short=False, snapshot_strategy=None)[source]

Bases: object

Reconstitutes domain entities from domain events retrieved from the event store, optionally with snapshots.

get_domain_events(entity_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Returns domain events for given entity ID.

get_most_recent_event(entity_id, lt=None, lte=None)[source]

Returns the most recent event for the given entity ID.

get_snapshot(entity_id, lt=None, lte=None)[source]

Returns a snapshot for given entity ID, according to the snapshot strategy.

replay_entity(entity_id, gt=None, gte=None, lt=None, lte=None, limit=None, initial_state=None, query_descending=False)[source]

Reconstitutes requested domain entity from domain events found in event store.

replay_events(initial_state, domain_events)[source]

Mutates initial state using the sequence of domain events.

take_snapshot(entity_id, lt=None, lte=None)[source]

Takes a snapshot of the entity as it existed after the most recent event, optionally less than, or less than or equal to, a particular position.

eventsourcing.infrastructure.snapshotting

class eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy[source]

Bases: object

get_snapshot(entity_id, lt=None, lte=None)[source]

Gets the last snapshot for entity, optionally until a particular version number.

Return type:Snapshot
take_snapshot(entity_id, entity, last_event_version)[source]

Takes a snapshot of entity, using given ID, state and version number.

Return type:AbstractSnapshop
class eventsourcing.infrastructure.snapshotting.EventSourcedSnapshotStrategy(event_store)[source]

Bases: eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy

Snapshot strategy that uses an event sourced snapshot.

get_snapshot(entity_id, lt=None, lte=None)[source]

Gets the last snapshot for entity, optionally until a particular version number.

Return type:Snapshot
take_snapshot(entity_id, entity, last_event_version)[source]

Takes a snapshot by instantiating and publishing a Snapshot domain event.

Return type:Snapshot
eventsourcing.infrastructure.snapshotting.entity_from_snapshot(snapshot)[source]

Reconstructs domain entity from given snapshot.

eventsourcing.infrastructure.eventstore

class eventsourcing.infrastructure.eventstore.AbstractEventStore[source]

Bases: object

all_domain_events()[source]

Returns all domain events in the event store.

append(domain_event_or_events)[source]

Put domain event in event store for later retrieval.

get_domain_event(originator_id, eq)[source]

Returns a single domain event.

get_domain_events(originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True, page_size=None)[source]

Returns domain events for given entity ID.

get_most_recent_event(originator_id, lt=None, lte=None)[source]

Returns most recent domain event for given entity ID.

class eventsourcing.infrastructure.eventstore.EventStore(active_record_strategy, sequenced_item_mapper=None)[source]

Bases: eventsourcing.infrastructure.eventstore.AbstractEventStore

all_domain_events()[source]
append(domain_event_or_events)[source]
get_domain_event(originator_id, eq)[source]
get_domain_events(originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True, page_size=None)[source]
get_most_recent_event(originator_id, lt=None, lte=None)[source]
iterator_class

alias of SequencedItemIterator

to_sequenced_item(domain_event)[source]

eventsourcing.infrastructure.sequenceditem

class eventsourcing.infrastructure.sequenceditem.SequencedItem(sequence_id, position, topic, data)

Bases: tuple

data

Alias for field number 3

position

Alias for field number 1

sequence_id

Alias for field number 0

topic

Alias for field number 2

class eventsourcing.infrastructure.sequenceditem.SequencedItemFieldNames(sequenced_item_class)[source]

Bases: object

data
position
sequence_id
topic
class eventsourcing.infrastructure.sequenceditem.StoredEvent(originator_id, originator_version, event_type, state)

Bases: tuple

event_type

Alias for field number 2

originator_id

Alias for field number 0

originator_version

Alias for field number 1

state

Alias for field number 3

eventsourcing.infrastructure.sequenceditemmapper

class eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper[source]

Bases: object

from_sequenced_item(sequenced_item)[source]

Return domain event from given sequenced item.

to_sequenced_item(domain_event)[source]

Returns sequenced item for given domain event.

class eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper(sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=<class 'eventsourcing.infrastructure.transcoding.ObjectJSONEncoder'>, json_decoder_class=<class 'eventsourcing.infrastructure.transcoding.ObjectJSONDecoder'>, always_encrypt=False, cipher=None, other_attr_names=())[source]

Bases: eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper

Uses JSON to transcode domain events.

POSITION_FIELD_INDEX = 1
SEQUENCE_ID_FIELD_INDEX = 0
construct_item_args(domain_event)[source]

Constructs attributes of a sequenced item from the given domain event.

construct_sequenced_item(item_args)[source]
deserialize_event_attrs(event_attrs, is_encrypted)[source]

Deserialize event attributes from JSON, optionally with decryption.

from_sequenced_item(sequenced_item)[source]

Reconstructs domain event from stored event topic and event attrs. Used in the event store when getting domain events.

is_encrypted(domain_event_class)[source]
serialize_event_attrs(event_attrs, is_encrypted=False)[source]
to_sequenced_item(domain_event)[source]

Constructs a sequenced item from a domain event.

eventsourcing.infrastructure.transcoding

class eventsourcing.infrastructure.transcoding.ObjectJSONDecoder(object_hook=None, **kwargs)[source]

Bases: json.decoder.JSONDecoder

classmethod from_jsonable(d)[source]
class eventsourcing.infrastructure.transcoding.ObjectJSONEncoder(skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: json.encoder.JSONEncoder

default(obj)[source]

eventsourcing.infrastructure.activerecord

class eventsourcing.infrastructure.activerecord.AbstractActiveRecordStrategy(active_record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>)[source]

Bases: object

all_items()[source]

Returns all stored items from all sequences (possibly in chronological order, depending on database).

all_records(resume=None, *arg, **kwargs)[source]

Returns all records in the table (possibly in chronological order, depending on database).

append(sequenced_item_or_items)[source]

Writes sequenced item into the datastore.

delete_record(record)[source]

Removes permanently given record from the table.

get_field_kwargs(item)[source]
get_item(sequence_id, eq)[source]

Reads sequenced item from the datastore.

get_items(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Reads sequenced items from the datastore.

raise_index_error(eq)[source]
raise_sequenced_item_error(sequenced_item, e)[source]

eventsourcing.infrastructure.datastore

class eventsourcing.infrastructure.datastore.Datastore(settings)[source]

Bases: object

drop_connection()[source]

Drops connection to a datastore.

drop_tables()[source]

Drops tables used to store events.

setup_connection()[source]

Sets up a connection to a datastore.

setup_tables()[source]

Sets up tables used to store events.

truncate_tables()[source]

Truncates tables used to store events.

exception eventsourcing.infrastructure.datastore.DatastoreConnectionError[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreError

exception eventsourcing.infrastructure.datastore.DatastoreError[source]

Bases: Exception

class eventsourcing.infrastructure.datastore.DatastoreSettings[source]

Bases: object

Base class for settings for database connection used by a stored event repository.

exception eventsourcing.infrastructure.datastore.DatastoreTableError[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreError

eventsourcing.infrastructure.cassandra.activerecords

eventsourcing.infrastructure.cassandra.datastore

eventsourcing.infrastructure.sqlalchemy.activerecords

eventsourcing.infrastructure.sqlalchemy.datastore

eventsourcing.infrastructure.iterators

class eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator(active_record_strategy, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: object

DEFAULT_PAGE_SIZE = 1000
class eventsourcing.infrastructure.iterators.GetEntityEventsThread(active_record_strategy, sequence_id, gt=None, gte=None, lt=None, lte=None, page_size=None, is_ascending=True, *args, **kwargs)[source]

Bases: threading.Thread

run()[source]
class eventsourcing.infrastructure.iterators.SequencedItemIterator(active_record_strategy, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator

class eventsourcing.infrastructure.iterators.ThreadedSequencedItemIterator(active_record_strategy, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator

start_thread()[source]

eventsourcing.infrastructure.repositories.array

class eventsourcing.infrastructure.repositories.array.ArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.array.AbstractArrayRepository, eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

class eventsourcing.infrastructure.repositories.array.BigArrayRepository(base_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.array.AbstractBigArrayRepository, eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

subrepo
subrepo_class

alias of ArrayRepository

eventsourcing.infrastructure.integersequencegenerators.base

class eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator[source]

Bases: object

next()[source]

Python 2.7 version of the iterator protocol.

class eventsourcing.infrastructure.integersequencegenerators.base.SimpleIntegerSequenceGenerator(i=0)[source]

Bases: eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator

eventsourcing.infrastructure.integersequencegenerators.redisincr

Exception classes

exception eventsourcing.exceptions.ArrayIndexError[source]

Bases: IndexError, eventsourcing.exceptions.EventSourcingError

Raised when appending item to an array that is full.

exception eventsourcing.exceptions.ConcurrencyError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when appending events at the wrong version to a versioned stream.

exception eventsourcing.exceptions.ConsistencyError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when applying an event stream to a versioned entity.

exception eventsourcing.exceptions.DatasourceSettingsError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when an error is detected in settings for a datasource.

exception eventsourcing.exceptions.EntityIsDiscarded[source]

Bases: AssertionError

Raised when access to a recently discarded entity object is attempted.

exception eventsourcing.exceptions.EntityVersionNotFound[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raise when accessing an entity version that does not exist.

exception eventsourcing.exceptions.EventSourcingError[source]

Bases: Exception

Base eventsourcing exception.

exception eventsourcing.exceptions.MismatchedOriginatorError[source]

Bases: eventsourcing.exceptions.ConsistencyError

Raised when applying an event to an inappropriate object.

exception eventsourcing.exceptions.MismatchedOriginatorIDError[source]

Bases: eventsourcing.exceptions.MismatchedOriginatorError

Raised when applying an event to the wrong entity or aggregate.

exception eventsourcing.exceptions.MismatchedOriginatorVersionError[source]

Bases: eventsourcing.exceptions.MismatchedOriginatorError

Raised when applying an event to the wrong version of an entity or aggregate.

exception eventsourcing.exceptions.MutatorRequiresTypeNotInstance[source]

Bases: eventsourcing.exceptions.ConsistencyError

Raised when mutator function received a class rather than an entity.

exception eventsourcing.exceptions.ProgrammingError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when programming errors are encountered.

exception eventsourcing.exceptions.RepositoryKeyError[source]

Bases: KeyError, eventsourcing.exceptions.EventSourcingError

Raised when using entity repository’s dictionary like interface to get an entity that does not exist.

exception eventsourcing.exceptions.SequencedItemError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when an integer sequence error occurs e.g. trying to save a version that already exists.

exception eventsourcing.exceptions.TimeSequenceError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when a time sequence error occurs e.g. trying to save a timestamp that already exists.

exception eventsourcing.exceptions.TopicResolutionError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when unable to resolve a topic to a Python class.

Example application

eventsourcing.example.interface.flaskapp

eventsourcing.example.application

class eventsourcing.example.application.ExampleApplication(**kwargs)[source]

Bases: eventsourcing.application.base.ApplicationWithPersistencePolicies

Example event sourced application with entity factory and repository.

create_new_example(foo='', a='', b='')[source]

Entity object factory.

eventsourcing.example.application.close_example_application()[source]

Shuts down single global instance of application.

To be called when tearing down, perhaps between tests, in order to allow a subsequent call to init_example_application().

eventsourcing.example.application.construct_example_application(**kwargs)[source]

Application object factory.

eventsourcing.example.application.get_example_application()[source]

Returns single global instance of application.

To be called when handling a worker request, if required.

eventsourcing.example.application.init_example_application(**kwargs)[source]

Constructs single global instance of application.

To be called when initialising a worker process.

eventsourcing.example.domainmodel

class eventsourcing.example.domainmodel.AbstractExampleRepository(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

class eventsourcing.example.domainmodel.Example(foo='', a='', b='', **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

An example event sourced domain model entity.

class AttributeChanged(timestamp=None, **kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when an Example is created.

class Created(timestamp=None, **kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.Created

Published when an Example is created.

class Discarded(timestamp=None, **kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.Discarded

Published when an Example is discarded.

class Event(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Layer supertype.

class Heartbeat(timestamp=None, **kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.Event

Published when a heartbeat in the entity occurs (see below).

a

An example attribute.

b

Another example attribute.

beat_heart(number_of_beats=1)[source]
count_heartbeats()[source]
foo

An example attribute.

eventsourcing.example.domainmodel.create_new_example(foo='', a='', b='')[source]

Factory method for example entities.

Return type:Example
eventsourcing.example.domainmodel.example_mutator(initial, event)[source]
eventsourcing.example.domainmodel.heartbeat_mutator(self, event)[source]

eventsourcing.example.infrastructure

class eventsourcing.example.infrastructure.ExampleRepository(event_store, mutator=None, snapshot_strategy=None, use_cache=False, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository, eventsourcing.example.domainmodel.AbstractExampleRepository

Event sourced repository for the Example domain model entity.

mutator(initial, event)