Module docs¶
This document describes the packages, modules, classes, functions and other code details of the library.
eventsourcing¶
The eventsourcing package contains packages for the application layer, the domain layer, the infrastructure layer, and the interface layer. There is also a module for exceptions, an example package, and a utils package.
application¶
The application layer brings together the domain and infrastructure layers.
base¶
-
class
eventsourcing.application.base.
ApplicationWithEventStores
(entity_record_manager=None, log_record_manager=None, snapshot_record_manager=None, always_encrypt=False, cipher=None)[source]¶ Bases:
object
Event sourced application object class.
Can construct event stores using given database records. Supports three different event stores: for log events, for entity events, and for snapshot events.
-
construct_event_store
(event_sequence_id_attr, event_position_attr, record_manager, always_encrypt=False, cipher=None)[source]¶
-
construct_sequenced_item_mapper
(sequenced_item_class, event_sequence_id_attr, event_position_attr, json_encoder_class=<class 'eventsourcing.utils.transcoding.ObjectJSONEncoder'>, json_decoder_class=<class 'eventsourcing.utils.transcoding.ObjectJSONDecoder'>, always_encrypt=False, cipher=None)[source]¶
-
-
class
eventsourcing.application.base.
ApplicationWithPersistencePolicies
(**kwargs)[source]¶ Bases:
eventsourcing.application.base.ApplicationWithEventStores
policies¶
simple¶
-
class
eventsourcing.application.simple.
SimpleApplication
(persist_event_type=None, uri=None, session=None, cipher_key=None, stored_event_record_class=None, setup_table=True, contiguous_record_ids=True)[source]¶ Bases:
object
domain.model¶
The domain layer contains a domain model, and optionally services that work across different entities or aggregates.
The domain model package contains classes and functions that can help develop an event sourced domain model.
aggregate¶
Base classes for aggregates in a domain driven design.
-
class
eventsourcing.domain.model.aggregate.
AggregateRoot
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedVersionedEntity
Root entity for an aggregate in a domain driven design.
-
class
AttributeChanged
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.Event
,eventsourcing.domain.model.entity.AttributeChanged
Published when an AggregateRoot is changed.
-
class
Created
(originator_version=0, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.Event
,eventsourcing.domain.model.entity.Created
Published when an AggregateRoot is created.
-
class
Discarded
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.Event
,eventsourcing.domain.model.entity.Discarded
Published when an AggregateRoot is discarded.
-
class
array¶
A kind of collection, indexed by integer. Doesn’t need to replay all events to exist.
-
class
eventsourcing.domain.model.array.
AbstractArrayRepository
(array_size=10000, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEntityRepository
Repository for sequence objects.
-
class
eventsourcing.domain.model.array.
AbstractBigArrayRepository
[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEntityRepository
Repository for compound sequence objects.
-
subrepo
¶ Sub-sequence repository.
-
-
class
eventsourcing.domain.model.array.
Array
(array_id, repo)[source]¶ Bases:
object
-
class
eventsourcing.domain.model.array.
BigArray
(array_id, repo)[source]¶ Bases:
eventsourcing.domain.model.array.Array
A virtual array holding items in indexed positions, across a number of Array instances.
Getting and setting items at index position is supported. Slices are supported, and operate across the underlying arrays. Appending is also supported.
BigArray is designed to overcome the concern of needing a single large sequence that may not be suitably stored in any single partiton. In simple terms, if events of an aggregate can fit in a partition, we can use the same size partition to make a tree of arrays that will certainly be capable of sequencing all the events of the application in a single stream.
With normal size base arrays, enterprise applications can expect read and write time to be approximately constant with respect to the number of items in the array.
The array is composed of a tree of arrays, which gives the capacity equal to the size of each array to the power of the size of each array. If the arrays are limited to be about the maximum size of an aggregate event stream (a large number but not too many that would cause there to be too much data in any one partition, let’s say 1000s to be safe) then it would be possible to fit such a large number of aggregates in the corresponding BigArray, that we can be confident it would be full.
Write access time in the worst case, and the time to identify the index of the last item in the big array, is proportional to the log of the highest assigned index to base the underlying array size. Write time on average, and read time given an index, is constant with respect to the number of items in a BigArray.
Items can be appended in log time in a single thread. However, the time between reading the current last index and claiming the next position leads to contention and retries when there are lots of threads of execution all attempting to append items, which inherently limits throughput.
Todo: Not possible in Cassandra, but maybe do it in a transaction in SQLAlchemy?
An alternative to reading the last item before writing the next is to use an integer sequence generator to generate a stream of integers. Items can be assigned to index positions in a big array, according to the integers that are issued. Throughput will then be much better, and will be limited only by the rate at which the database can have events written to it (unless the number generator is quite slow).
An external integer sequence generator, such as Redis’ INCR command, or an auto-incrementing database column, may constitute a single point of failure.
collection¶
Decorators useful in domain models based on the classes in this library.
-
eventsourcing.domain.model.decorators.
attribute
(getter)[source]¶ When used as a method decorator, returns a property object with the method as the getter and a setter defined to call instance method change_attribute(), which publishes an AttributeChanged event.
-
eventsourcing.domain.model.decorators.
mutator
(arg=None)[source]¶ Structures mutator functions by allowing handlers to be registered for different types of event. When the decorated function is called with an initial value and an event, it will call the handler that has been registered for that type of event.
It works like singledispatch, which it uses. The difference is that when the decorated function is called, this decorator dispatches according to the type of last call arg, which fits better with reduce(). The builtin Python function reduce() is used by the library to replay a sequence of events against an initial state. If a mutator function is given to reduce(), along with a list of events and an initializer, reduce() will call the mutator function once for each event in the list, but the initializer will be the first value, and the event will be the last argument, and we want to dispatch according to the type of the event. It happens that singledispatch is coded to switch on the type of the first argument, which makes it unsuitable for structuring a mutator function without the modifications introduced here.
The other aspect introduced by this decorator function is the option to set the type of the handled entity in the decorator. When an entity is replayed from scratch, in other words when all its events are replayed, the initial state is None. The handler which handles the first event in the sequence will probably construct an object instance. It is possible to write the type into the handler, but that makes the entity more difficult to subclass because you will also need to write a handler for it. If the decorator is invoked with the type, when the initial value passed as a call arg to the mutator function is None, the handler will instead receive the type of the entity, which it can use to construct the entity object.
class Entity(object): class Created(object): pass @mutator(Entity) def mutate(initial, event): raise NotImplementedError(type(event)) @mutate.register(Entity.Created) def _(initial, event): return initial(**event.__dict__) entity = mutate(None, Entity.Created())
-
eventsourcing.domain.model.decorators.
random
() → x in the interval [0, 1).¶
-
eventsourcing.domain.model.decorators.
retry
(exc=<class 'Exception'>, max_attempts=1, wait=0)[source]¶
-
eventsourcing.domain.model.decorators.
subscribe_to
(event_class)[source]¶ Decorator for making a custom event handler function subscribe to a certain event type
event_class: DomainEvent class or its child classes that the handler function should subscribe to
The following example shows a custom handler that reacts to Todo.Created event and saves a projection of a Todo model object.
@subscribe_to(Todo.Created) def new_todo_projection(event): todo = TodoProjection(id=event.originator_id, title=event.title) todo.save()
entity¶
Base classes for domain entities of different kinds.
The entity module provides base classes for domain entities.
-
class
eventsourcing.domain.model.entity.
AbstractEntityRepository
[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEventPlayer
-
event_store
¶ Returns event store object used by this repository.
-
-
class
eventsourcing.domain.model.entity.
DomainEntity
(id)[source]¶ Bases:
eventsourcing.domain.model.events.QualnameABC
Base class for domain entities.
-
class
AttributeChanged
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.events.AttributeChanged
Published when a DomainEntity is discarded.
-
class
Created
(originator_topic, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.events.Created
Published when an entity is created.
-
originator_topic
¶
-
-
class
Discarded
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.Discarded
,eventsourcing.domain.model.entity.Event
Published when a DomainEntity is discarded.
-
class
Event
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithOriginatorID
,eventsourcing.domain.model.events.DomainEvent
Supertype for events of domain entities.
-
__change_attribute__
(name, value)[source]¶ Changes named attribute with the given value, by triggering an AttributeChanged event.
-
__publish__
(event)[source]¶ Publishes given event for subscribers in the application.
Parameters: event – domain event or list of events
-
__publish_to_subscribers__
(event)[source]¶ Actually dispatches given event to publish-subscribe mechanism.
Parameters: event – domain event or list of events
-
__trigger_event__
(event_class, **kwargs)[source]¶ Constructs, applies, and publishes a domain event.
-
id
¶ Entity ID allows an entity instance to be referenced and distinguished from others, even though its state may change over time.
-
class
-
class
eventsourcing.domain.model.entity.
TimestampedEntity
(__created_on__, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.DomainEntity
-
class
AttributeChanged
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.AttributeChanged
Published when a TimestampedEntity is changed.
-
class
Created
(originator_topic, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Event
Published when a TimestampedEntity is created.
-
class
Discarded
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.Discarded
Published when a TimestampedEntity is discarded.
-
class
Event
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.events.EventWithTimestamp
Supertype for events of timestamped entities.
-
class
-
class
eventsourcing.domain.model.entity.
TimestampedVersionedEntity
(__created_on__, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedEntity
,eventsourcing.domain.model.entity.VersionedEntity
-
class
AttributeChanged
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.AttributeChanged
,eventsourcing.domain.model.entity.AttributeChanged
Published when a TimestampedVersionedEntity is created.
-
class
Created
(originator_version=0, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Event
Published when a TimestampedVersionedEntity is created.
-
class
-
class
eventsourcing.domain.model.entity.
TimeuuidedVersionedEntity
(event_id, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimeuuidedEntity
,eventsourcing.domain.model.entity.VersionedEntity
-
class
eventsourcing.domain.model.entity.
VersionedEntity
(__version__=None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.DomainEntity
-
class
AttributeChanged
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.AttributeChanged
Published when a VersionedEntity is changed.
-
class
Created
(originator_version=0, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Event
Published when a VersionedEntity is created.
-
class
Discarded
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.Discarded
Published when a VersionedEntity is discarded.
-
class
Event
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithOriginatorVersion
,eventsourcing.domain.model.entity.Event
Supertype for events of versioned entities.
-
class
events¶
Base classes for domain events of different kinds.
-
class
eventsourcing.domain.model.events.
AttributeChanged
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Can be published when an attribute of an entity is created.
-
name
¶
-
value
¶
-
-
class
eventsourcing.domain.model.events.
Created
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Can be published when an entity is created.
-
class
eventsourcing.domain.model.events.
Discarded
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Published when something is discarded.
-
class
eventsourcing.domain.model.events.
DomainEvent
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.QualnameABC
Base class for domain events.
Implements methods to make instances read-only, comparable for equality, have recognisable representations, and hashable.
-
__hash__
()[source]¶ Computes a Python integer hash for an event, using its event hash string if available.
Supports equality and inequality comparisons.
-
__mutate__
(obj)[source]¶ Update obj with values from self.
Can be extended, but subclasses must call super method, and return an object.
Parameters: obj – object to be mutated Returns: mutated object
-
mutate
(obj)[source]¶ Convenience for use in custom models, to update obj with values from self without needing to call super method and return obj (two extra lines).
Can be overridden by subclasses. Any value returned by this method will be ignored.
Please note, subclasses that extend mutate() might not have fully completed that method before this method is called. To ensure all base classes have completed their mutate behaviour before mutating an event in a concrete class, extend mutate() instead of overriding this method.
Parameters: obj – object to be mutated
-
-
class
eventsourcing.domain.model.events.
EventWithOriginatorID
(originator_id, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
-
originator_id
¶
-
-
class
eventsourcing.domain.model.events.
EventWithOriginatorVersion
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
For events that have an originator version number.
-
originator_version
¶
-
-
class
eventsourcing.domain.model.events.
EventWithTimestamp
(timestamp=None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
For events that have a timestamp value.
-
timestamp
¶
-
-
class
eventsourcing.domain.model.events.
EventWithTimeuuid
(event_id=None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
For events that have an UUIDv1 event ID.
-
event_id
¶
-
-
class
eventsourcing.domain.model.events.
Logged
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Published when something is logged.
-
class
eventsourcing.domain.model.events.
QualnameABC
[source]¶ Bases:
object
Base class that introduces __qualname__ for objects in Python 2.7.
snapshot¶
Snapshotting is implemented in the domain layer as an event.
-
class
eventsourcing.domain.model.snapshot.
AbstractSnapshop
[source]¶ Bases:
object
-
originator_id
¶ ID of the snapshotted entity.
-
originator_version
¶ Version of the last event applied to the entity.
-
state
¶ State of the snapshotted entity.
-
topic
¶ Path to the class of the snapshotted entity.
-
-
class
eventsourcing.domain.model.snapshot.
Snapshot
(originator_id, originator_version, topic, state)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithTimestamp
,eventsourcing.domain.model.events.EventWithOriginatorVersion
,eventsourcing.domain.model.events.EventWithOriginatorID
,eventsourcing.domain.model.snapshot.AbstractSnapshop
-
state
¶ State of the snapshotted entity.
-
topic
¶ Path to the class of the snapshotted entity.
-
timebucketedlog¶
Time-bucketed logs allow a sequence of the items that is sequenced by timestamp to be split across a number of different database partitions, which avoids one partition becoming very large (and then unworkable).
-
class
eventsourcing.domain.model.timebucketedlog.
MessageLogged
(message, originator_id)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithTimestamp
,eventsourcing.domain.model.events.EventWithOriginatorID
,eventsourcing.domain.model.events.Logged
-
message
¶
-
-
class
eventsourcing.domain.model.timebucketedlog.
Timebucketedlog
(name, bucket_size=None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedVersionedEntity
-
class
BucketSizeChanged
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.timebucketedlog.Event
,eventsourcing.domain.model.entity.AttributeChanged
-
class
Event
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
Supertype for events of time-bucketed log.
-
class
Started
(originator_version=0, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.timebucketedlog.Event
-
bucket_size
¶
-
name
¶
-
started_on
¶
-
class
-
class
eventsourcing.domain.model.timebucketedlog.
TimebucketedlogRepository
[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEntityRepository
-
get_or_create
(log_name, bucket_size)[source]¶ Gets or creates a log.
Return type: Timebucketedlog
-
-
eventsourcing.domain.model.timebucketedlog.
make_timebucket_id
(log_id, timestamp, bucket_size)[source]¶
infrastructure¶
The infrastructure layer adapts external devices in ways that are useful for the application, such as the way an event store encapsulates a database.
base¶
Abstract base classes for the infrastructure layer.
-
class
eventsourcing.infrastructure.base.
AbstractRecordManager
(record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids=False)[source]¶ Bases:
object
-
all_items
()[source]¶ Returns all stored items from all sequences (possibly in chronological order, depending on database).
-
all_records
(start=None, stop=None, *args, **kwargs)[source]¶ Returns all records in the table (possibly in chronological order, depending on database).
-
-
class
eventsourcing.infrastructure.base.
RelationalRecordManager
(*args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.base.AbstractRecordManager
-
get_items
(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]¶ Returns items of a sequence.
-
get_records
(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]¶ Returns records for a sequence.
-
insert_select_max
¶ SQL statement that inserts records with contiguous IDs, by selecting max ID from indexed table records.
-
insert_values
¶ SQL statement that inserts records without ID.
-
record_table_name
¶ Returns table name - used in raw queries, and to detect record ID conflicts.
Return type: str
-
cassandra¶
Classes for event sourcing with Apache Cassandra.
-
class
eventsourcing.infrastructure.cassandra.datastore.
CassandraDatastore
(tables, *args, **kwargs)[source]¶
-
class
eventsourcing.infrastructure.cassandra.datastore.
CassandraSettings
(hosts=None, port=None, protocol_version=None, default_keyspace=None, consistency=None, replication_factor=None, username=None, password=None)[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreSettings
-
CONSISTENCY_LEVEL
= 'LOCAL_QUORUM'¶
-
DEFAULT_KEYSPACE
= 'eventsourcing'¶
-
HOSTS
= ['127.0.0.1']¶
-
PORT
= 9042¶
-
PROTOCOL_VERSION
= 3¶
-
REPLICATION_FACTOR
= 1¶
-
-
class
eventsourcing.infrastructure.cassandra.factory.
CassandraInfrastructureFactory
(record_manager_class=None, sequenced_item_class=None, integer_sequenced_record_class=None, timestamp_sequenced_record_class=None, snapshot_record_class=None, contiguous_record_ids=False, session=None)[source]¶ Bases:
eventsourcing.infrastructure.factory.InfrastructureFactory
-
integer_sequenced_record_class
¶ alias of
IntegerSequencedRecord
-
record_manager_class
¶ alias of
CassandraRecordManager
-
snapshot_record_class
¶ alias of
SnapshotRecord
-
timestamp_sequenced_record_class
¶ alias of
TimestampSequencedRecord
-
-
class
eventsourcing.infrastructure.cassandra.manager.
CassandraRecordManager
(record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids=False)[source]¶ Bases:
eventsourcing.infrastructure.base.AbstractRecordManager
datastore¶
Base classes for concrete datastore classes.
-
exception
eventsourcing.infrastructure.datastore.
DatastoreConnectionError
[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreError
-
class
eventsourcing.infrastructure.datastore.
DatastoreSettings
[source]¶ Bases:
object
Base class for settings for database connection used by a stored event repository.
-
exception
eventsourcing.infrastructure.datastore.
DatastoreTableError
[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreError
django¶
A Django application for event sourcing with the Django ORM.
-
class
eventsourcing.infrastructure.django.manager.
DjangoRecordManager
(convert_position_float_to_decimal=False, *args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.base.RelationalRecordManager
-
_prepare_insert
(tmpl)[source]¶ With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, using an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.
-
get_records
(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]¶
-
record_table_name
¶
-
eventplayer¶
Base classes for event players of different kinds.
-
class
eventsourcing.infrastructure.eventplayer.
EventPlayer
(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEventPlayer
-
event_store
¶
-
eventsourcedrepository¶
Base classes for event sourced repositories (not abstract, can be used directly).
-
class
eventsourcing.infrastructure.eventsourcedrepository.
EventSourcedRepository
(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]¶ Bases:
eventsourcing.infrastructure.eventplayer.EventPlayer
,eventsourcing.domain.model.entity.AbstractEntityRepository
-
__contains__
(entity_id)[source]¶ Returns a boolean value according to whether entity with given ID exists.
-
eventstore¶
The event store provides the application-level interface to the event sourcing persistence mechanism.
-
class
eventsourcing.infrastructure.eventstore.
AbstractEventStore
[source]¶ Bases:
object
Abstract base class for event stores. Defines the methods expected of an event store by other classes in the library.
-
class
eventsourcing.infrastructure.eventstore.
EventStore
(record_manager, sequenced_item_mapper)[source]¶ Bases:
eventsourcing.infrastructure.eventstore.AbstractEventStore
Event store appends domain events to stored sequences. It uses a record manager to map named tuples to database records, and it uses a sequenced item mapper to map named tuples to application-level objects.
-
__init__
(record_manager, sequenced_item_mapper)[source]¶ Initialises event store object.
Parameters: - record_manager – record manager
- sequenced_item_mapper – sequenced item mapper
-
all_domain_events
()[source]¶ Gets all domain events in the event store.
Returns: map object, yielding a sequence of domain events
-
append
(domain_event_or_events)[source]¶ Appends given domain event, or list of domain events, to their sequence.
Parameters: domain_event_or_events – domain event, or list of domain events
-
get_domain_event
(originator_id, eq)[source]¶ Gets a domain event from the sequence identified by originator_id at position eq.
Parameters: - originator_id – ID of a sequence of events
- eq – get item at this position
Returns: domain event
-
get_domain_events
(originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True, page_size=None)[source]¶ Gets domain events from the sequence identified by originator_id.
Parameters: - originator_id – ID of a sequence of events
- gt – get items after this position
- gte – get items at or after this position
- lt – get items before this position
- lte – get items before or at this position
- limit – get limited number of items
- is_ascending – get items from lowest position
- page_size – restrict and repeat database query
Returns: list of domain events
-
get_most_recent_event
(originator_id, lt=None, lte=None)[source]¶ Gets a domain event from the sequence identified by originator_id at the highest position.
Parameters: - originator_id – ID of a sequence of events
- lt – get highest before this position
- lte – get highest at or before this position
Returns: domain event
-
iterator_class
¶ alias of
SequencedItemIterator
-
integersequencegenerators¶
Different ways of generating sequences of integers.
-
class
eventsourcing.infrastructure.integersequencegenerators.base.
AbstractIntegerSequenceGenerator
[source]¶ Bases:
object
-
class
eventsourcing.infrastructure.integersequencegenerators.base.
SimpleIntegerSequenceGenerator
(i=0)[source]¶ Bases:
eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator
-
class
eventsourcing.infrastructure.integersequencegenerators.redisincr.
RedisIncr
(redis=None, key=None)[source]¶ Bases:
eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator
Generates a sequence of integers, using Redis’ INCR command.
Maximum number is 2**63, or 9223372036854775807, the maximum value of a 64 bit signed integer.
iterators¶
Different ways of getting sequenced items from a datastore.
-
class
eventsourcing.infrastructure.iterators.
AbstractSequencedItemIterator
(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]¶ Bases:
object
-
DEFAULT_PAGE_SIZE
= 1000¶
-
_inc_page_counter
()[source]¶ Increments the page counter.
- Each query result as a page, even if there are no items in the page. This really counts queries.
- it is easy to divide the number of events by the page size if the “correct” answer is required
- there will be a difference in the counts when the number of events can be exactly divided by the page size, because there is no way to know in advance that a full page is also the last page.
-
-
class
eventsourcing.infrastructure.iterators.
GetEntityEventsThread
(record_manager, sequence_id, gt=None, gte=None, lt=None, lte=None, page_size=None, is_ascending=True, *args, **kwargs)[source]¶ Bases:
threading.Thread
-
class
eventsourcing.infrastructure.iterators.
SequencedItemIterator
(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]¶ Bases:
eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator
-
class
eventsourcing.infrastructure.iterators.
ThreadedSequencedItemIterator
(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]¶ Bases:
eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator
repositories¶
Repository base classes for entity classes defined in the library.
-
class
eventsourcing.infrastructure.repositories.array.
ArrayRepository
(array_size=10000, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.array.AbstractArrayRepository
,eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
-
class
eventsourcing.infrastructure.repositories.array.
BigArrayRepository
(array_size=10000, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.array.AbstractBigArrayRepository
,eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
-
subrepo
¶
-
subrepo_class
¶ alias of
ArrayRepository
-
-
class
eventsourcing.infrastructure.repositories.collection_repo.
CollectionRepository
(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]¶ Bases:
eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
,eventsourcing.domain.model.collection.AbstractCollectionRepository
Event sourced repository for the Collection domain model entity.
-
class
eventsourcing.infrastructure.repositories.timebucketedlog_repo.
TimebucketedlogRepo
(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]¶ Bases:
eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
,eventsourcing.domain.model.timebucketedlog.TimebucketedlogRepository
Event sourced repository for the Example domain model entity.
sequenceditem¶
The persistence model for storing events.
-
class
eventsourcing.infrastructure.sequenceditem.
SequencedItem
(sequence_id, position, topic, data)¶ Bases:
tuple
-
__getnewargs__
()¶ Return self as a plain tuple. Used by copy and pickle.
-
static
__new__
(_cls, sequence_id, position, topic, data)¶ Create new instance of SequencedItem(sequence_id, position, topic, data)
-
__repr__
()¶ Return a nicely formatted representation string
-
_asdict
()¶ Return a new OrderedDict which maps field names to their values.
-
classmethod
_make
(iterable, new=<built-in method __new__ of type object at 0xa385c0>, len=<built-in function len>)¶ Make a new SequencedItem object from a sequence or iterable
-
_replace
(_self, **kwds)¶ Return a new SequencedItem object replacing specified fields with new values
-
data
¶ Alias for field number 3
-
position
¶ Alias for field number 1
-
sequence_id
¶ Alias for field number 0
-
topic
¶ Alias for field number 2
-
-
class
eventsourcing.infrastructure.sequenceditem.
SequencedItemFieldNames
(sequenced_item_class)[source]¶ Bases:
object
-
data
¶
-
other_names
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-
-
class
eventsourcing.infrastructure.sequenceditem.
StoredEvent
(originator_id, originator_version, event_type, state)¶ Bases:
tuple
-
__getnewargs__
()¶ Return self as a plain tuple. Used by copy and pickle.
-
static
__new__
(_cls, originator_id, originator_version, event_type, state)¶ Create new instance of StoredEvent(originator_id, originator_version, event_type, state)
-
__repr__
()¶ Return a nicely formatted representation string
-
_asdict
()¶ Return a new OrderedDict which maps field names to their values.
-
classmethod
_make
(iterable, new=<built-in method __new__ of type object at 0xa385c0>, len=<built-in function len>)¶ Make a new StoredEvent object from a sequence or iterable
-
_replace
(_self, **kwds)¶ Return a new StoredEvent object replacing specified fields with new values
-
event_type
¶ Alias for field number 2
-
originator_id
¶ Alias for field number 0
-
originator_version
¶ Alias for field number 1
-
state
¶ Alias for field number 3
-
sequenceditemmapper¶
The sequenced item mapper maps sequenced items to application-level objects.
-
class
eventsourcing.infrastructure.sequenceditemmapper.
AbstractSequencedItemMapper
[source]¶ Bases:
object
-
class
eventsourcing.infrastructure.sequenceditemmapper.
SequencedItemMapper
(sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=None, json_decoder_class=None, cipher=None, other_attr_names=())[source]¶ Bases:
eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper
Uses JSON to transcode domain events.
-
construct_item_args
(domain_event)[source]¶ Constructs attributes of a sequenced item from the given domain event.
-
snapshotting¶
Snapshotting avoids having to replay an entire sequence of events to obtain the current state of a projection.
-
class
eventsourcing.infrastructure.snapshotting.
AbstractSnapshotStrategy
[source]¶ Bases:
object
-
get_snapshot
(entity_id, lt=None, lte=None)[source]¶ Gets the last snapshot for entity, optionally until a particular version number.
Return type: Snapshot
-
take_snapshot
(entity_id, entity, last_event_version)[source]¶ Takes a snapshot of entity, using given ID, state and version number.
Return type: AbstractSnapshop
-
-
class
eventsourcing.infrastructure.snapshotting.
EventSourcedSnapshotStrategy
(event_store)[source]¶ Bases:
eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy
Snapshot strategy that uses an event sourced snapshot.
sqlalchemy¶
Classes for event sourcing with SQLAlchemy.
-
class
eventsourcing.infrastructure.sqlalchemy.datastore.
SQLAlchemyDatastore
(base=<class 'sqlalchemy.ext.declarative.api.Base'>, tables=None, connection_strategy='plain', session=None, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.datastore.Datastore
-
session
¶
-
-
class
eventsourcing.infrastructure.sqlalchemy.datastore.
SQLAlchemySettings
(uri=None)[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreSettings
-
class
eventsourcing.infrastructure.sqlalchemy.factory.
SQLAlchemyInfrastructureFactory
(session, *args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.factory.InfrastructureFactory
-
integer_sequenced_record_class
¶ alias of
IntegerSequencedWithIDRecord
-
record_manager_class
¶ alias of
SQLAlchemyRecordManager
-
snapshot_record_class
¶ alias of
SnapshotRecord
-
timestamp_sequenced_record_class
¶ alias of
TimestampSequencedNoIDRecord
-
-
eventsourcing.infrastructure.sqlalchemy.factory.
construct_sqlalchemy_eventstore
(session, sequenced_item_class=None, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=None, json_decoder_class=None, cipher=None, record_class=None, contiguous_record_ids=False)[source]¶
-
class
eventsourcing.infrastructure.sqlalchemy.manager.
SQLAlchemyRecordManager
(session, *args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.base.RelationalRecordManager
-
_prepare_insert
(tmpl)[source]¶ With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, assumes an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.
-
all_records
(start=None, stop=None, *args, **kwargs)[source]¶ Returns all records in the table.
Intended to support getting all application domain events in order, especially if the records have contiguous IDs.
-
get_items
(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]¶
-
get_records
(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]¶
-
query
¶
-
record_table_name
¶
-
-
class
eventsourcing.infrastructure.sqlalchemy.records.
IntegerSequencedNoIDRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
data
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-
-
eventsourcing.infrastructure.sqlalchemy.records.
IntegerSequencedRecord
¶ alias of
IntegerSequencedWithIDRecord
-
class
eventsourcing.infrastructure.sqlalchemy.records.
IntegerSequencedWithIDRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
data
¶
-
id
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-
-
class
eventsourcing.infrastructure.sqlalchemy.records.
SnapshotRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
data
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-
-
class
eventsourcing.infrastructure.sqlalchemy.records.
StoredEventRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
event_type
¶
-
id
¶
-
originator_id
¶
-
originator_version
¶
-
state
¶
-
-
class
eventsourcing.infrastructure.sqlalchemy.records.
TimestampSequencedNoIDRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
data
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-
-
eventsourcing.infrastructure.sqlalchemy.records.
TimestampSequencedRecord
¶ alias of
TimestampSequencedNoIDRecord
-
class
eventsourcing.infrastructure.sqlalchemy.records.
TimestampSequencedWithIDRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
data
¶
-
id
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-
timebucketedlog_reader¶
Reader for timebucketed logs.
-
class
eventsourcing.infrastructure.timebucketedlog_reader.
TimebucketedlogReader
(log, event_store, page_size=50)[source]¶ Bases:
object
-
eventsourcing.infrastructure.timebucketedlog_reader.
get_timebucketedlog_reader
(log, event_store)[source]¶ Return type: TimebucketedlogReader
interface¶
The interface layer uses an application to service client requests.
notificationlog¶
Notification log is a pull-based mechanism for updating other applications.
-
class
eventsourcing.interface.notificationlog.
AbstractNotificationLog
[source]¶ Bases:
object
Presents a sequence of sections from a sequence of notifications.
-
class
eventsourcing.interface.notificationlog.
BigArrayNotificationLog
(big_array, section_size)[source]¶ Bases:
eventsourcing.interface.notificationlog.LocalNotificationLog
-
class
eventsourcing.interface.notificationlog.
LocalNotificationLog
(section_size)[source]¶ Bases:
eventsourcing.interface.notificationlog.AbstractNotificationLog
Presents a sequence of sections from a sequence of notifications.
-
class
eventsourcing.interface.notificationlog.
NotificationLogReader
(notification_log)[source]¶ Bases:
object
-
class
eventsourcing.interface.notificationlog.
NotificationLogView
(notification_log, json_encoder_class=None)[source]¶ Bases:
object
-
class
eventsourcing.interface.notificationlog.
RecordManagerNotificationLog
(record_manager, section_size)[source]¶ Bases:
eventsourcing.interface.notificationlog.LocalNotificationLog
-
class
eventsourcing.interface.notificationlog.
RemoteNotificationLog
(base_url, json_decoder_class=None)[source]¶ Bases:
eventsourcing.interface.notificationlog.AbstractNotificationLog
utils¶
The utils package contains common functions that are used in more than one layer.
cipher¶
time¶
-
eventsourcing.utils.times.
datetime_from_timestamp
(t)[source]¶ Returns a datetime from a decimal UNIX timestamp.
Parameters: t – timestamp, either Decimal or float Returns: datetime.datetime object
-
eventsourcing.utils.times.
decimaltimestamp
(t=None)[source]¶ A UNIX timestamp as a Decimal object (exact number type).
Returns current time when called without args, otherwise converts given floating point number
t
to a Decimal with 9 decimal places.Parameters: t – Floating point UNIX timestamp (“seconds since epoch”). Returns: A Decimal with 6 decimal places, representing the given floating point or the value returned by time.time().
topic¶
-
eventsourcing.utils.topic.
get_topic
(domain_class)[source]¶ Returns a string describing a class.
- Args:
- domain_class: A class.
- Returns:
- A string describing the class.
-
eventsourcing.utils.topic.
resolve_attr
(obj, path)[source]¶ A recursive version of getattr for navigating dotted paths.
- Args:
- obj: An object for which we want to retrieve a nested attribute. path: A dot separated string containing zero or more attribute names.
- Returns:
- The attribute referred to by obj.a1.a2.a3…
- Raises:
- AttributeError: If there is no such attribute.
exceptions¶
A few exception classes are defined by the library to indicate particular kinds of error.
-
exception
eventsourcing.exceptions.
ArrayIndexError
[source]¶ Bases:
IndexError
,eventsourcing.exceptions.EventSourcingError
Raised when appending item to an array that is full.
-
exception
eventsourcing.exceptions.
ConcurrencyError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when appending events at the wrong version to a versioned stream.
-
exception
eventsourcing.exceptions.
ConsistencyError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when applying an event stream to a versioned entity.
-
exception
eventsourcing.exceptions.
DataIntegrityError
[source]¶ Bases:
ValueError
,eventsourcing.exceptions.EventSourcingError
Raised when a sequenced item data is damaged (hash doesn’t match data)
-
exception
eventsourcing.exceptions.
DatasourceSettingsError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when an error is detected in settings for a datasource.
-
exception
eventsourcing.exceptions.
EntityIsDiscarded
[source]¶ Bases:
AssertionError
Raised when access to a recently discarded entity object is attempted.
-
exception
eventsourcing.exceptions.
EntityVersionNotFound
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raise when accessing an entity version that does not exist.
-
exception
eventsourcing.exceptions.
EventHashError
[source]¶ Bases:
eventsourcing.exceptions.DataIntegrityError
Raised when an event’s seal hash doesn’t match the hash of the state of the event.
-
exception
eventsourcing.exceptions.
EventSourcingError
[source]¶ Bases:
Exception
Base eventsourcing exception.
-
exception
eventsourcing.exceptions.
HeadHashError
[source]¶ Bases:
eventsourcing.exceptions.DataIntegrityError
,eventsourcing.exceptions.MismatchedOriginatorError
Raised when applying an event with hash different from aggregate head.
-
exception
eventsourcing.exceptions.
MismatchedOriginatorError
[source]¶ Bases:
eventsourcing.exceptions.ConsistencyError
Raised when applying an event to an inappropriate object.
-
exception
eventsourcing.exceptions.
MutatorRequiresTypeNotInstance
[source]¶ Bases:
eventsourcing.exceptions.ConsistencyError
Raised when mutator function received a class rather than an entity.
-
exception
eventsourcing.exceptions.
OriginatorIDError
[source]¶ Bases:
eventsourcing.exceptions.MismatchedOriginatorError
Raised when applying an event to the wrong entity or aggregate.
-
exception
eventsourcing.exceptions.
OriginatorVersionError
[source]¶ Bases:
eventsourcing.exceptions.MismatchedOriginatorError
Raised when applying an event to the wrong version of an entity or aggregate.
-
exception
eventsourcing.exceptions.
ProgrammingError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when programming errors are encountered.
-
exception
eventsourcing.exceptions.
RecordIDConflict
[source]¶ Bases:
eventsourcing.exceptions.SequencedItemConflict
Raised when a record ID conflict is detected.
-
exception
eventsourcing.exceptions.
RepositoryKeyError
[source]¶ Bases:
KeyError
,eventsourcing.exceptions.EventSourcingError
Raised when using entity repository’s dictionary like interface to get an entity that does not exist.
-
exception
eventsourcing.exceptions.
SequencedItemConflict
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when a sequence error occurs e.g. trying to save a version that already exists.
-
exception
eventsourcing.exceptions.
TimeSequenceError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when a time sequence error occurs e.g. trying to save a timestamp that already exists.
-
exception
eventsourcing.exceptions.
TopicResolutionError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when unable to resolve a topic to a Python class.
example¶
A simple, unit-tested, event sourced application.
application¶
-
class
eventsourcing.example.application.
ExampleApplication
(**kwargs)[source]¶ Bases:
eventsourcing.application.base.ApplicationWithPersistencePolicies
Example event sourced application with entity factory and repository.
-
eventsourcing.example.application.
close_example_application
()[source]¶ Shuts down single global instance of application.
To be called when tearing down, perhaps between tests, in order to allow a subsequent call to init_example_application().
-
eventsourcing.example.application.
construct_example_application
(**kwargs)[source]¶ Application object factory.
domainmodel¶
-
class
eventsourcing.example.domainmodel.
AbstractExampleRepository
[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEntityRepository
-
class
eventsourcing.example.domainmodel.
Example
(foo='', a='', b='', **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedVersionedEntity
An example event sourced domain model entity.
-
class
AttributeChanged
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.example.domainmodel.Event
,eventsourcing.domain.model.entity.AttributeChanged
Published when an Example is created.
-
class
Created
(originator_version=0, **kwargs)[source]¶ Bases:
eventsourcing.example.domainmodel.Event
,eventsourcing.domain.model.entity.Created
Published when an Example is created.
-
class
Discarded
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.example.domainmodel.Event
,eventsourcing.domain.model.entity.Discarded
Published when an Example is discarded.
-
class
Event
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
Supertype for events of example entities.
-
class
Heartbeat
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.example.domainmodel.Event
,eventsourcing.domain.model.entity.Event
Published when a heartbeat in the entity occurs (see below).
-
a
¶ An example attribute.
-
b
¶ Another example attribute.
-
foo
¶ An example attribute.
-
class
infrastructure¶
-
class
eventsourcing.example.infrastructure.
ExampleRepository
(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]¶ Bases:
eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
,eventsourcing.example.domainmodel.AbstractExampleRepository
Event sourced repository for the Example domain model entity.
interface¶
-
class
eventsourcing.example.interface.flaskapp.
IntegerSequencedItem
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Model
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
data
¶
-
id
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-