Module docs

This document describes the packages, modules, classes, functions and other code details of the library.

eventsourcing

The eventsourcing package contains packages for the application layer, the domain layer, the infrastructure layer, and the interface layer. There is also a module for exceptions, an example package, and a utils package.

application

The application layer brings together the domain and infrastructure layers.

actors

class eventsourcing.application.actors.ActorModelRunner(system: eventsourcing.application.system.System, pipeline_ids, system_actor_name='system', shutdown_on_close=False, **kwargs)[source]

Bases: eventsourcing.application.system.SystemRunner

Uses actor model framework to run a system of process applications.

actor_system
close()[source]

Stops all the actors running a system of process applications.

forward_prompt(prompt)[source]
static is_prompt(event)[source]
shutdown()[source]
start()[source]

Starts all the actors to run a system of process applications.

class eventsourcing.application.actors.PipelineActor[source]

Bases: thespian.actors.Actor

forward_prompt(msg)[source]
init_pipeline(msg)[source]
receiveMessage(msg, sender)[source]

Main entry point handling a request received by this Actor. Runs without interruption and may access locals to this Actor (only) without concern that these locals will be modified externally.

class eventsourcing.application.actors.PipelineInitRequest(process_classes, infrastructure_class, system_followings, pipeline_id)[source]

Bases: object

class eventsourcing.application.actors.ProcessInitRequest(process_application_class, infrastructure_class, pipeline_id, upstream_application_names, downstream_actors, pipeline_actor)[source]

Bases: object

class eventsourcing.application.actors.ProcessMaster[source]

Bases: thespian.actors.Actor

consume_prompt(prompt)[source]
handle_slave_run_response()[source]
init_process(msg)[source]
receiveMessage(msg, sender)[source]

Main entry point handling a request received by this Actor. Runs without interruption and may access locals to this Actor (only) without concern that these locals will be modified externally.

run_slave()[source]
class eventsourcing.application.actors.ProcessSlave[source]

Bases: thespian.actors.Actor

close()[source]
init_process(msg)[source]
is_my_prompt(prompt)[source]
receiveMessage(msg, sender)[source]

Main entry point handling a request received by this Actor. Runs without interruption and may access locals to this Actor (only) without concern that these locals will be modified externally.

run_process(msg)[source]
send_prompt(prompt)[source]
class eventsourcing.application.actors.SlaveRunRequest(last_prompts, master)[source]

Bases: object

class eventsourcing.application.actors.SlaveRunResponse[source]

Bases: object

class eventsourcing.application.actors.SystemActor[source]

Bases: thespian.actors.Actor

init_pipelines(msg)[source]
receiveMessage(msg, sender)[source]

Main entry point handling a request received by this Actor. Runs without interruption and may access locals to this Actor (only) without concern that these locals will be modified externally.

class eventsourcing.application.actors.SystemInitRequest(process_classes, infrastructure_class, system_followings, pipeline_ids)[source]

Bases: object

class eventsourcing.application.actors.SystemInitResponse(pipeline_actors)[source]

Bases: object

eventsourcing.application.actors.shutdown_actor_system()[source]
eventsourcing.application.actors.start_actor_system(system_base=None, logcfg={'formatters': {'normal': {'format': '%(levelname)-8s %(message)s'}}, 'handlers': {}, 'loggers': {}, 'version': 1})[source]
eventsourcing.application.actors.start_multiproc_tcp_base_system()[source]

command

class eventsourcing.application.command.CommandProcess(name=None, policy=None, setup_table=False, **kwargs)[source]

Bases: eventsourcing.application.process.ProcessApplication

persist_event_type

alias of eventsourcing.domain.model.command.Command.Event

django

class eventsourcing.application.django.DjangoApplication(name='', persistence_policy=None, persist_event_type=None, cipher_key=None, sequenced_item_class=None, sequenced_item_mapper_class=None, record_manager_class=None, stored_event_record_class=None, snapshot_record_class=None, setup_table=True, contiguous_record_ids=True, pipeline_id=0, json_encoder_class=None, json_decoder_class=None, notification_log_section_size=None)[source]

Bases: eventsourcing.application.simple.ApplicationWithConcreteInfrastructure

record_manager_class

alias of eventsourcing.infrastructure.django.manager.DjangoRecordManager

classmethod reset_connection_after_forking()[source]

Resets database connection after forking.

snapshot_record_class

alias of eventsourcing.infrastructure.django.models.EntitySnapshotRecord

stored_event_record_class

alias of eventsourcing.infrastructure.django.models.StoredEventRecord

multiprocess

class eventsourcing.application.multiprocess.MultiprocessRunner(system: eventsourcing.application.system.System, pipeline_ids=(0, ), poll_interval=None, setup_tables=False, sleep_for_setup_tables=0, *args, **kwargs)[source]

Bases: eventsourcing.application.system.SystemRunner

broadcast_prompt(prompt)[source]
close()[source]
static is_prompt(event)[source]
start()[source]
class eventsourcing.application.multiprocess.OperatingSystemProcess(application_process_class, infrastructure_class, upstream_names, pipeline_id=0, poll_interval=5, setup_tables=False, inbox=None, outbox=None, *args, **kwargs)[source]

Bases: multiprocessing.context.Process

broadcast_prompt(prompt)[source]
static is_prompt(event)[source]
loop_on_prompts()[source]
run()[source]

Method to be run in sub-process; can be overridden in sub-class

run_process(prompt=None)[source]

pipeline

class eventsourcing.application.pipeline.Pipeable[source]

Bases: object

class eventsourcing.application.pipeline.PipeableMetaclass[source]

Bases: abc.ABCMeta

class eventsourcing.application.pipeline.PipelineExpression(left, right)[source]

Bases: object

policies

class eventsourcing.application.policies.PersistencePolicy(event_store, persist_event_type=None)[source]

Bases: object

Stores events of given type to given event store, whenever they are published.

close()[source]
is_event(event)[source]
store_event(event)[source]
class eventsourcing.application.policies.SnapshottingPolicy(repository, snapshot_store, persist_event_type=<class 'eventsourcing.domain.model.events.EventWithOriginatorVersion'>, period=2)[source]

Bases: object

close()[source]
condition(event)[source]
take_snapshot(event)[source]

popo

class eventsourcing.application.popo.PopoApplication(name='', persistence_policy=None, persist_event_type=None, cipher_key=None, sequenced_item_class=None, sequenced_item_mapper_class=None, record_manager_class=None, stored_event_record_class=None, snapshot_record_class=None, setup_table=True, contiguous_record_ids=True, pipeline_id=0, json_encoder_class=None, json_decoder_class=None, notification_log_section_size=None)[source]

Bases: eventsourcing.application.simple.ApplicationWithConcreteInfrastructure

infrastructure_factory_class

alias of eventsourcing.infrastructure.popo.factory.PopoInfrastructureFactory

sequenced_item_mapper_class

alias of eventsourcing.infrastructure.popo.mapper.SequencedItemMapperForPopo

stored_event_record_class

alias of eventsourcing.infrastructure.popo.records.StoredEventRecord

process

class eventsourcing.application.process.ProcessApplication(name=None, policy=None, setup_table=False, **kwargs)[source]

Bases: eventsourcing.application.pipeline.Pipeable, eventsourcing.application.simple.SimpleApplication

call_policy(event)[source]
close()[source]
collect_pending_events(aggregates)[source]
construct_event_records(pending_events, causal_dependencies=None)[source]
construct_tracking_kwargs(notification_id, upstream_application_name)[source]
del_notification_generator(upstream_name)[source]
drop_table()[source]
follow(upstream_application_name, notification_log)[source]
get_event_from_notification(notification)[source]
get_notification_generator(upstream_name, advance_by)[source]
static policy(repository, event)[source]

Empty method, can be overridden in subclasses to implement concrete policy.

process_upstream_event(event, notification_id, upstream_name)[source]
publish_prompt(event=None)[source]

Publishes prompt for a given event.

Used to prompt downstream process application when an event is published by this application’s model, which can happen when application command methods, rather than the process policy, are called.

Wraps exceptions with PromptFailed, to avoid application policy exceptions being seen directly in other applications when running synchronously in single thread.

read_reader(upstream_name, advance_by=None)[source]
record_process_event(process_event)[source]
run(prompt=None, advance_by=None)[source]
set_notification_ids = False
set_reader_position_from_tracking_records(upstream_name)[source]
setup_table()[source]
take_snapshots(new_events)[source]
use_causal_dependencies = False
class eventsourcing.application.process.ProcessApplicationWithSnapshotting(snapshot_period=None, snapshot_record_class=None, **kwargs)[source]

Bases: eventsourcing.application.snapshotting.SnapshottingApplication, eventsourcing.application.process.ProcessApplication

take_snapshots(new_events)[source]
class eventsourcing.application.process.ProcessEvent(new_events, tracking_kwargs=None, causal_dependencies=None)[source]

Bases: object

class eventsourcing.application.process.Prompt(process_name, pipeline_id)[source]

Bases: object

class eventsourcing.application.process.RepositoryWrapper(repository)[source]

Bases: object

simple

class eventsourcing.application.simple.ApplicationWithConcreteInfrastructure(name='', persistence_policy=None, persist_event_type=None, cipher_key=None, sequenced_item_class=None, sequenced_item_mapper_class=None, record_manager_class=None, stored_event_record_class=None, snapshot_record_class=None, setup_table=True, contiguous_record_ids=True, pipeline_id=0, json_encoder_class=None, json_decoder_class=None, notification_log_section_size=None)[source]

Bases: eventsourcing.application.simple.SimpleApplication

Subclasses have actual infrastructure.

class eventsourcing.application.simple.SimpleApplication(name='', persistence_policy=None, persist_event_type=None, cipher_key=None, sequenced_item_class=None, sequenced_item_mapper_class=None, record_manager_class=None, stored_event_record_class=None, snapshot_record_class=None, setup_table=True, contiguous_record_ids=True, pipeline_id=0, json_encoder_class=None, json_decoder_class=None, notification_log_section_size=None)[source]

Bases: abc.ABC

Base class for event sourced applications.

Constructs infrastructure objects such as the repository and event store, and also the notification log which presents the application state as a sequence of events.

Needs actual infrastructure classes.

classmethod bind(*bases, **kwargs)[source]
change_pipeline(pipeline_id)[source]
close()[source]
construct_cipher(cipher_key)[source]
construct_datastore()[source]
construct_event_store()[source]
construct_infrastructure(*args, **kwargs)[source]
construct_infrastructure_factory(*args, **kwargs)[source]
Return type:InfrastructureFactory
construct_notification_log()[source]
construct_persistence_policy()[source]
construct_repository(**kwargs)[source]
datastore
drop_table()[source]
event_store
event_store_class

alias of eventsourcing.infrastructure.eventstore.EventStore

infrastructure_factory_class

alias of eventsourcing.infrastructure.factory.InfrastructureFactory

is_constructed_with_session = False
json_decoder_class = None
json_encoder_class = None
classmethod mixin(*bases)[source]
notification_log_section_size = None
persist_event_type = None
record_manager_class = None
repository
repository_class

alias of eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

classmethod reset_connection_after_forking()[source]
sequenced_item_class = None
sequenced_item_mapper_class = None
setup_table()[source]
snapshot_record_class = None
stored_event_record_class = None
use_cache = False

snapshotting

class eventsourcing.application.snapshotting.SnapshottingApplication(snapshot_period=None, snapshot_record_class=None, **kwargs)[source]

Bases: eventsourcing.application.simple.SimpleApplication

close()[source]
construct_event_store()[source]
construct_persistence_policy()[source]
construct_repository(**kwargs)[source]
drop_table()[source]
setup_table()[source]
snapshot_period = 2

sqlalchemy

class eventsourcing.application.sqlalchemy.SQLAlchemyApplication(uri=None, session=None, **kwargs)[source]

Bases: eventsourcing.application.simple.ApplicationWithConcreteInfrastructure

construct_infrastructure(*args, **kwargs)[source]
infrastructure_factory_class

alias of eventsourcing.infrastructure.sqlalchemy.factory.SQLAlchemyInfrastructureFactory

is_constructed_with_session = True
snapshot_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.EntitySnapshotRecord

stored_event_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.StoredEventRecord

system

class eventsourcing.application.system.BarrierControlledApplicationThread(process: eventsourcing.application.process.ProcessApplication, fetch_barrier: threading.Barrier, execute_barrier: threading.Barrier, stop_event: threading.Event)[source]

Bases: threading.Thread

abort()[source]
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class eventsourcing.application.system.BarrierControllingClockThread(normal_speed, scale_factor, tick_interval, fetch_barrier: threading.Barrier, execute_barrier: threading.Barrier, stop_event: threading.Event, is_verbose=False)[source]

Bases: eventsourcing.application.system.ClockThread

actual_clock_speed
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class eventsourcing.application.system.ClockThread(*args, **kwargs)[source]

Bases: threading.Thread

call_commands()[source]
call_in_future(cmd, ticks_delay)[source]
class eventsourcing.application.system.InProcessRunner(system: eventsourcing.application.system.System, infrastructure_class=None, setup_tables=False)[source]

Bases: eventsourcing.application.system.SystemRunner

Runs a system in the current process, either in the current thread, or with one thread for each process in the system.

close()[source]
handle_prompt(prompt)[source]
start()[source]
class eventsourcing.application.system.MultiThreadedRunner(system: eventsourcing.application.system.System, poll_interval=None, clock_speed=None)[source]

Bases: eventsourcing.application.system.InProcessRunner

Runs a system with a thread for each process.

broadcast_prompt(prompt)[source]
close()[source]
handle_prompt(prompt)[source]
start()[source]
start_clock()[source]
class eventsourcing.application.system.ProcessRunningClockThread(normal_speed, scale_factor, stop_event: threading.Event, is_verbose=False, seen_prompt_events=None, processes=None)[source]

Bases: eventsourcing.application.system.ClockThread

actual_clock_speed
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class eventsourcing.application.system.PromptOutbox[source]

Bases: object

Has a collection of downstream prompt inboxes.

put(prompt)[source]

Puts prompt in each downstream inbox (an actual queue).

class eventsourcing.application.system.PromptQueuedApplicationThread(process: eventsourcing.application.process.ProcessApplication, poll_interval=5, inbox=None, outbox=None, clock_event=None)[source]

Bases: threading.Thread

Application thread which uses queues of prompts.

It loops on an “inbox” queue of prompts, and adds its prompts to an “outbox” queue.

loop_on_prompts()[source]
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

run_process(prompt=None)[source]
class eventsourcing.application.system.SingleThreadedRunner(system: eventsourcing.application.system.System, infrastructure_class=<class 'eventsourcing.application.popo.PopoApplication'>, *args, **kwargs)[source]

Bases: eventsourcing.application.system.InProcessRunner

Runs a system in the current thread.

handle_prompt(prompt)[source]
run_followers(prompt)[source]

First caller adds a prompt to queue and runs followers until there are no more pending prompts.

Subsequent callers just add a prompt to the queue, avoiding recursion.

class eventsourcing.application.system.SteppingMultiThreadedRunner(*args, **kwargs)[source]

Bases: eventsourcing.application.system.SteppingRunner

Has a clock thread, and a thread for each application process in the system. The clock thread loops until stopped, waiting for a barrier, after sleeping for remaining tick interval timer. Application threads loop until stopped, waiting for the same barrier. Then, after all threads are waiting at the barrier, the barrier is lifted. The clock thread proceeds by sleeping for the clock tick interval. The application threads proceed by getting new notifications and processing all of them.

There are actually two barriers, so that each application thread waits before getting notifications, and then waits for all processes to complete getting notification before processing the notifications through the application policy. This avoids events created by a process application “bleeding” into the notifications of another process application in the same clock cycle.

Todo: Receive prompts, but set an event for the prompting process, to avoid unnecessary runs.

Allow commands to be scheduled at future clock tick number, and execute when reached.

close()[source]
handle_prompt(prompt)[source]
start()[source]
class eventsourcing.application.system.SteppingRunner(normal_speed=1, scale_factor=1, is_verbose=False, *args, **kwargs)[source]

Bases: eventsourcing.application.system.InProcessRunner

call_in_future(cmd, ticks_delay)[source]
class eventsourcing.application.system.SteppingSingleThreadedRunner(*args, **kwargs)[source]

Bases: eventsourcing.application.system.SteppingRunner

close()[source]
handle_prompt(prompt)[source]
start()[source]
class eventsourcing.application.system.System(*pipeline_exprs, **kwargs)[source]

Bases: object

__init__(*pipeline_exprs, **kwargs)[source]

Initialises a “process network” system object.

Parameters:pipeline_exprs – Pipeline expressions involving process application classes.

Each pipeline expression of process classes shows directly which process follows which other process in the system.

For example, the pipeline expression (A | B | C) shows that B follows A, and C follows B.

The pipeline expression (A | A) shows that A follows A.

The pipeline expression (A | B | A) shows that B follows A, and A follows B.

The pipeline expressions ((A | B | A), (A | C | A)) are equivalent to (A | B | A | C | A).

construct_app(process_class, infrastructure_class=None, **kwargs)[source]
is_prompt(event)[source]
class eventsourcing.application.system.SystemRunner(system: eventsourcing.application.system.System, infrastructure_class=None, setup_tables=False)[source]

Bases: abc.ABC

close()[source]
start()[source]

domain.model

The domain layer contains a domain model, and optionally services that work across different entities or aggregates.

The domain model package contains classes and functions that can help develop an event sourced domain model.

aggregate

Base classes for aggregates in a domain driven design.

class eventsourcing.domain.model.aggregate.AggregateRoot(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.AggregateRootWithHashchainedEvents

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.AttributeChanged

Published when an AggregateRoot is changed.

__module__ = 'eventsourcing.domain.model.aggregate'
class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.Created

Published when an AggregateRoot is created.

__module__ = 'eventsourcing.domain.model.aggregate'
class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.Discarded

Published when an AggregateRoot is discarded.

__module__ = 'eventsourcing.domain.model.aggregate'
class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event

Supertype for aggregate events.

__module__ = 'eventsourcing.domain.model.aggregate'
__module__ = 'eventsourcing.domain.model.aggregate'
class eventsourcing.domain.model.aggregate.AggregateRootWithHashchainedEvents(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.EntityWithHashchain, eventsourcing.domain.model.aggregate.BaseAggregateRoot

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.AttributeChanged

Published when an AggregateRoot is changed.

__module__ = 'eventsourcing.domain.model.aggregate'
class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.aggregate.Created

Published when an AggregateRoot is created.

__module__ = 'eventsourcing.domain.model.aggregate'
class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Discarded, eventsourcing.domain.model.aggregate.Discarded

Published when an AggregateRoot is discarded.

__module__ = 'eventsourcing.domain.model.aggregate'
class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.aggregate.Event

Supertype for aggregate events.

__module__ = 'eventsourcing.domain.model.aggregate'
__module__ = 'eventsourcing.domain.model.aggregate'
class eventsourcing.domain.model.aggregate.BaseAggregateRoot(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

Root entity for an aggregate in a domain driven design.

class AttributeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when an AggregateRoot is changed.

__module__ = 'eventsourcing.domain.model.aggregate'
class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Created

Published when an AggregateRoot is created.

__module__ = 'eventsourcing.domain.model.aggregate'
class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Discarded

Published when an AggregateRoot is discarded.

__module__ = 'eventsourcing.domain.model.aggregate'
class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Supertype for aggregate events.

__module__ = 'eventsourcing.domain.model.aggregate'
__batch_pending_events__()[source]
__init__(**kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__module__ = 'eventsourcing.domain.model.aggregate'
__publish__(event)[source]

Appends event to internal collection of pending events.

__save__()[source]

Publishes pending events for others in application.

array

A kind of collection, indexed by integer. Doesn’t need to replay all events to exist.

class eventsourcing.domain.model.array.AbstractArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

Repository for sequence objects.

__getitem__(array_id)[source]

Returns sequence for given ID.

__init__(array_size=10000, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__module__ = 'eventsourcing.domain.model.array'
class eventsourcing.domain.model.array.AbstractBigArrayRepository[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

Repository for compound sequence objects.

__getitem__(array_id)[source]

Returns sequence for given ID.

__module__ = 'eventsourcing.domain.model.array'
subrepo

Sub-sequence repository.

class eventsourcing.domain.model.array.Array(array_id, repo)[source]

Bases: object

__dict__ = mappingproxy({'__module__': 'eventsourcing.domain.model.array', '__init__': <function Array.__init__>, 'append': <function Array.append>, '__setitem__': <function Array.__setitem__>, '__getitem__': <function Array.__getitem__>, '__len__': <function Array.__len__>, 'get_next_position': <function Array.get_next_position>, 'get_last_item_and_next_position': <function Array.get_last_item_and_next_position>, 'get_items_assigned': <function Array.get_items_assigned>, 'get_item_assigned': <function Array.get_item_assigned>, '__eq__': <function Array.__eq__>, '__ne__': <function Array.__ne__>, '__dict__': <attribute '__dict__' of 'Array' objects>, '__weakref__': <attribute '__weakref__' of 'Array' objects>, '__doc__': None, '__hash__': None})
__eq__(other)[source]

Return self==value.

__getitem__(item)[source]

Returns item at index, or items in slice.

__hash__ = None
__init__(array_id, repo)[source]

Initialize self. See help(type(self)) for accurate signature.

__len__()[source]

Returns length of array.

__module__ = 'eventsourcing.domain.model.array'
__ne__(other)[source]

Return self!=value.

__setitem__(index, item)[source]

Sets item in array, at given index.

Won’t overrun the end of the array, because the position is fixed to be less than base_size.

__weakref__

list of weak references to the object (if defined)

append(item)[source]

Sets item in next position after the last item.

get_item_assigned(index)[source]
get_items_assigned(start_index=None, stop_index=None, limit=None, is_ascending=True)[source]
get_last_item_and_next_position()[source]
get_next_position()[source]
class eventsourcing.domain.model.array.BigArray(array_id, repo)[source]

Bases: eventsourcing.domain.model.array.Array

A virtual array holding items in indexed positions, across a number of Array instances.

Getting and setting items at index position is supported. Slices are supported, and operate across the underlying arrays. Appending is also supported.

BigArray is designed to overcome the concern of needing a single large sequence that may not be suitably stored in any single partiton. In simple terms, if events of an aggregate can fit in a partition, we can use the same size partition to make a tree of arrays that will certainly be capable of sequencing all the events of the application in a single stream.

With normal size base arrays, enterprise applications can expect read and write time to be approximately constant with respect to the number of items in the array.

The array is composed of a tree of arrays, which gives the capacity equal to the size of each array to the power of the size of each array. If the arrays are limited to be about the maximum size of an aggregate event stream (a large number but not too many that would cause there to be too much data in any one partition, let’s say 1000s to be safe) then it would be possible to fit such a large number of aggregates in the corresponding BigArray, that we can be confident it would be full.

Write access time in the worst case, and the time to identify the index of the last item in the big array, is proportional to the log of the highest assigned index to base the underlying array size. Write time on average, and read time given an index, is constant with respect to the number of items in a BigArray.

Items can be appended in log time in a single thread. However, the time between reading the current last index and claiming the next position leads to contention and retries when there are lots of threads of execution all attempting to append items, which inherently limits throughput.

Todo: Not possible in Cassandra, but maybe do it in a transaction in SQLAlchemy?

An alternative to reading the last item before writing the next is to use an integer sequence generator to generate a stream of integers. Items can be assigned to index positions in a big array, according to the integers that are issued. Throughput will then be much better, and will be limited only by the rate at which the database can have events written to it (unless the number generator is quite slow).

An external integer sequence generator, such as Redis’ INCR command, or an auto-incrementing database column, may constitute a single point of failure.

__getitem__(item)[source]

Returns item at index, or items in slice.

__init__(array_id, repo)[source]

Initialize self. See help(type(self)) for accurate signature.

__len__()[source]

Returns length of array.

__module__ = 'eventsourcing.domain.model.array'
__setitem__(position, item)[source]

Sets item in array, at given index.

Won’t overrun the end of the array, because the position is fixed to be less than base_size.

calc_parent(i, j, h)[source]

Returns get_big_array and end of span of parent sequence that contains given child.

calc_required_height(n, size)[source]
create_array_id(i, j)[source]
get_item(position)[source]
get_last_array()[source]

Returns last array in compound.

Return type:CompoundSequenceReader
get_last_item_and_next_position()[source]
get_slice(start, stop)[source]
class eventsourcing.domain.model.array.ItemAssigned(item, index, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Occurs when an item is set at a position in an array.

__init__(item, index, *args, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__module__ = 'eventsourcing.domain.model.array'
index
item

collection

Collections.

class eventsourcing.domain.model.collection.AbstractCollectionRepository[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

__module__ = 'eventsourcing.domain.model.collection'
class eventsourcing.domain.model.collection.Collection(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.Event, eventsourcing.domain.model.entity.Created

Published when collection is created.

__module__ = 'eventsourcing.domain.model.collection'
class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.Event, eventsourcing.domain.model.entity.Discarded

Published when collection is discarded.

__module__ = 'eventsourcing.domain.model.collection'
class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Supertype for events of collection entities.

__module__ = 'eventsourcing.domain.model.collection'
class EventWithItem(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.Event

__module__ = 'eventsourcing.domain.model.collection'
item
class ItemAdded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.EventWithItem

__module__ = 'eventsourcing.domain.model.collection'
__mutate__(obj)[source]

Update obj with values from self.

class ItemRemoved(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.EventWithItem

__module__ = 'eventsourcing.domain.model.collection'
__mutate__(obj)[source]

Update obj with values from self.

__init__(**kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__iter__()[source]
__module__ = 'eventsourcing.domain.model.collection'
add_item(item)[source]
items
remove_item(item)[source]
eventsourcing.domain.model.collection.register_new_collection(collection_id=None)[source]

command

Commands as aggregates.

class eventsourcing.domain.model.command.Command(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.AggregateRoot

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.command.Event, eventsourcing.domain.model.aggregate.AttributeChanged

__module__ = 'eventsourcing.domain.model.command'
class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.command.Event, eventsourcing.domain.model.aggregate.Created

__module__ = 'eventsourcing.domain.model.command'
class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.command.Event, eventsourcing.domain.model.aggregate.Discarded

__module__ = 'eventsourcing.domain.model.command'
class Done(**kwargs)[source]

Bases: eventsourcing.domain.model.command.Event

__module__ = 'eventsourcing.domain.model.command'
mutate(obj)[source]

Convenience for use in custom models, to update obj with values from self without needing to call super method and return obj (two extra lines).

Can be overridden by subclasses. Any value returned by this method will be ignored.

Please note, subclasses that extend mutate() might not have fully completed that method before this method is called. To ensure all base classes have completed their mutate behaviour before mutating an event in a concrete class, extend mutate() instead of overriding this method.

Parameters:obj – object to be mutated
class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event

__module__ = 'eventsourcing.domain.model.command'
__init__(**kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__module__ = 'eventsourcing.domain.model.command'
done()[source]
is_done

decorator

Decorators useful in domain models based on the classes in this library.

eventsourcing.domain.model.decorators.attribute(getter)[source]

When used as a method decorator, returns a property object with the method as the getter and a setter defined to call instance method change_attribute(), which publishes an AttributeChanged event.

eventsourcing.domain.model.decorators.mutator(arg=None)[source]

Structures mutator functions by allowing handlers to be registered for different types of event. When the decorated function is called with an initial value and an event, it will call the handler that has been registered for that type of event.

It works like singledispatch, which it uses. The difference is that when the decorated function is called, this decorator dispatches according to the type of last call arg, which fits better with reduce(). The builtin Python function reduce() is used by the library to replay a sequence of events against an initial state. If a mutator function is given to reduce(), along with a list of events and an initializer, reduce() will call the mutator function once for each event in the list, but the initializer will be the first value, and the event will be the last argument, and we want to dispatch according to the type of the event. It happens that singledispatch is coded to switch on the type of the first argument, which makes it unsuitable for structuring a mutator function without the modifications introduced here.

The other aspect introduced by this decorator function is the option to set the type of the handled entity in the decorator. When an entity is replayed from scratch, in other words when all its events are replayed, the initial state is None. The handler which handles the first event in the sequence will probably construct an object instance. It is possible to write the type into the handler, but that makes the entity more difficult to subclass because you will also need to write a handler for it. If the decorator is invoked with the type, when the initial value passed as a call arg to the mutator function is None, the handler will instead receive the type of the entity, which it can use to construct the entity object.

class Entity(object):
    class Created(object):
        pass

@mutator(Entity)
def mutate(initial, event):
    raise NotImplementedError(type(event))

@mutate.register(Entity.Created)
def _(initial, event):
    return initial(**event.__dict__)

entity = mutate(None, Entity.Created())
eventsourcing.domain.model.decorators.random() → x in the interval [0, 1).
eventsourcing.domain.model.decorators.retry(exc=<class 'Exception'>, max_attempts=1, wait=0, stall=0, verbose=False)[source]
eventsourcing.domain.model.decorators.subscribe_to(*event_classes)[source]

Decorator for making a custom event handler function subscribe to a certain class of event.

The decorated function will be called once for each matching event that is published, and will be given one argument, the event, when it is called. If events are published in lists, for example the AggregateRoot publishes a list of pending events when its __save__() method is called, then the decorated function will be called once for each event that is an instance of the given event_class.

Please note, this decorator isn’t suitable for use with object class methods. The decorator receives in Python 3 an unbound function, and defines a handler which it subscribes that calls the decorated function for each matching event. However the method isn’t called on the object, so the object instance is never available in the decorator, so the decorator can’t call a normal object method because it doesn’t have a value for ‘self’.

event_class: type used to match published events, an event matches if it is an instance of this type

The following example shows a custom handler that reacts to Todo.Created event and saves a projection of a Todo model object.

@subscribe_to(Todo.Created)
def new_todo_projection(event):
    todo = TodoProjection(id=event.originator_id, title=event.title)
    todo.save()

entity

Base classes for domain entities of different kinds.

The entity module provides base classes for domain entities.

class eventsourcing.domain.model.entity.AbstractEntityRepository[source]

Bases: eventsourcing.domain.model.entity.AbstractEventPlayer

__contains__(entity_id)[source]

Returns True or False, according to whether or not entity exists.

__getitem__(entity_id)[source]

Returns entity for given ID.

__module__ = 'eventsourcing.domain.model.entity'
event_store

Returns event store object used by this repository.

get_entity(entity_id, at=None)[source]

Returns entity for given ID.

take_snapshot(entity_id, lt=None, lte=None)[source]

Takes snapshot of entity state, using stored events. :return: Snapshot

class eventsourcing.domain.model.entity.AbstractEventPlayer[source]

Bases: object

__dict__ = mappingproxy({'__module__': 'eventsourcing.domain.model.entity', '__dict__': <attribute '__dict__' of 'AbstractEventPlayer' objects>, '__weakref__': <attribute '__weakref__' of 'AbstractEventPlayer' objects>, '__doc__': None})
__module__ = 'eventsourcing.domain.model.entity'
__weakref__

list of weak references to the object (if defined)

class eventsourcing.domain.model.entity.DomainEntity(id)[source]

Bases: object

Base class for domain entities.

class AttributeChanged(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.AttributeChanged

Published when a DomainEntity is discarded.

__module__ = 'eventsourcing.domain.model.entity'
__mutate__(obj)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
class Created(originator_topic, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.Created

Published when an entity is created.

__entity_kwargs__
__init__(originator_topic, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__module__ = 'eventsourcing.domain.model.entity'
__mutate__(entity_class=None)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
originator_topic
class Discarded(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.events.Discarded, eventsourcing.domain.model.entity.Event

Published when a DomainEntity is discarded.

__module__ = 'eventsourcing.domain.model.entity'
__mutate__(obj)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
class Event(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.events.DomainEvent

Supertype for events of domain entities.

__check_obj__(obj)[source]

Checks obj state before mutating.

__module__ = 'eventsourcing.domain.model.entity'
__mutate__(obj)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
__assert_not_discarded__()[source]

Raises exception if entity has been discarded already.

__change_attribute__(name, value)[source]

Changes named attribute with the given value, by triggering an AttributeChanged event.

classmethod __create__(originator_id=None, event_class=None, **kwargs)[source]
__dict__ = mappingproxy({'__module__': 'eventsourcing.domain.model.entity', '__doc__': '\n Base class for domain entities.\n ', '__init__': <function DomainEntity.__init__>, 'id': <property object>, 'Event': <class 'eventsourcing.domain.model.entity.DomainEntity.Event'>, '__create__': <classmethod object>, 'Created': <class 'eventsourcing.domain.model.entity.DomainEntity.Created'>, '__change_attribute__': <function DomainEntity.__change_attribute__>, 'AttributeChanged': <class 'eventsourcing.domain.model.entity.DomainEntity.AttributeChanged'>, '__discard__': <function DomainEntity.__discard__>, 'Discarded': <class 'eventsourcing.domain.model.entity.DomainEntity.Discarded'>, '__assert_not_discarded__': <function DomainEntity.__assert_not_discarded__>, '__trigger_event__': <function DomainEntity.__trigger_event__>, '__publish__': <function DomainEntity.__publish__>, '__publish_to_subscribers__': <function DomainEntity.__publish_to_subscribers__>, '__hash__': None, '__eq__': <function DomainEntity.__eq__>, '__ne__': <function DomainEntity.__ne__>, '__dict__': <attribute '__dict__' of 'DomainEntity' objects>, '__weakref__': <attribute '__weakref__' of 'DomainEntity' objects>})
__discard__()[source]

Discards self, by triggering a Discarded event.

__eq__(other)[source]

Return self==value.

__hash__ = None
__init__(id)[source]

Initialize self. See help(type(self)) for accurate signature.

__module__ = 'eventsourcing.domain.model.entity'
__ne__(other)[source]

Return self!=value.

__publish__(event)[source]

Publishes given event for subscribers in the application.

Parameters:event – domain event or list of events
__publish_to_subscribers__(event)[source]

Actually dispatches given event to publish-subscribe mechanism.

Parameters:event – domain event or list of events
__trigger_event__(event_class, **kwargs)[source]

Constructs, applies, and publishes a domain event.

__weakref__

list of weak references to the object (if defined)

id

Entity ID allows an entity instance to be referenced and distinguished from others, even though its state may change over time.

class eventsourcing.domain.model.entity.EntityWithHashchain(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Created

__entity_kwargs__
__module__ = 'eventsourcing.domain.model.entity'
__mutate__(entity_class=None)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

__module__ = 'eventsourcing.domain.model.entity'
__mutate__(obj)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithHash, eventsourcing.domain.model.entity.Event

Supertype for events of domain entities.

__check_obj__(obj)[source]

Extends superclass method by checking the __previous_hash__ of this event matches the __head__ hash of the entity obj.

__module__ = 'eventsourcing.domain.model.entity'
__mutate__(obj)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
classmethod __create__(*args, **kwargs)[source]
__genesis_hash__ = ''
__init__(*args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__module__ = 'eventsourcing.domain.model.entity'
__trigger_event__(event_class, **kwargs)[source]

Constructs, applies, and publishes a domain event.

class eventsourcing.domain.model.entity.TimestampedEntity(__created_on__=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class AttributeChanged(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when a TimestampedEntity is changed.

__module__ = 'eventsourcing.domain.model.entity'
class Created(originator_topic, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a TimestampedEntity is created.

__entity_kwargs__
__module__ = 'eventsourcing.domain.model.entity'
class Discarded(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

Published when a TimestampedEntity is discarded.

__module__ = 'eventsourcing.domain.model.entity'
class Event(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.EventWithTimestamp

Supertype for events of timestamped entities.

__module__ = 'eventsourcing.domain.model.entity'
__mutate__(obj)[source]

Update obj with values from self.

__created_on__
__init__(__created_on__=None, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__last_modified__
__module__ = 'eventsourcing.domain.model.entity'
class eventsourcing.domain.model.entity.TimestampedVersionedEntity(__created_on__=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedEntity, eventsourcing.domain.model.entity.VersionedEntity

class AttributeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged, eventsourcing.domain.model.entity.AttributeChanged

Published when a TimestampedVersionedEntity is created.

__module__ = 'eventsourcing.domain.model.entity'
class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a TimestampedVersionedEntity is created.

__module__ = 'eventsourcing.domain.model.entity'
class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded, eventsourcing.domain.model.entity.Discarded

Published when a TimestampedVersionedEntity is discarded.

__module__ = 'eventsourcing.domain.model.entity'
class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Event

Supertype for events of timestamped, versioned entities.

__module__ = 'eventsourcing.domain.model.entity'
__module__ = 'eventsourcing.domain.model.entity'
class eventsourcing.domain.model.entity.TimeuuidedEntity(event_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

__created_on__
__init__(event_id, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__last_modified__
__module__ = 'eventsourcing.domain.model.entity'
class eventsourcing.domain.model.entity.TimeuuidedVersionedEntity(event_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimeuuidedEntity, eventsourcing.domain.model.entity.VersionedEntity

__module__ = 'eventsourcing.domain.model.entity'
class eventsourcing.domain.model.entity.VersionedEntity(__version__=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class AttributeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when a VersionedEntity is changed.

__module__ = 'eventsourcing.domain.model.entity'
class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a VersionedEntity is created.

__entity_kwargs__
__init__(originator_version=0, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__module__ = 'eventsourcing.domain.model.entity'
class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

Published when a VersionedEntity is discarded.

__module__ = 'eventsourcing.domain.model.entity'
class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithOriginatorVersion, eventsourcing.domain.model.entity.Event

Supertype for events of versioned entities.

__check_obj__(obj)[source]

Extends superclass method by checking the event’s originator version follows (1 +) this entity’s version.

__module__ = 'eventsourcing.domain.model.entity'
__mutate__(obj)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
__init__(__version__=None, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__module__ = 'eventsourcing.domain.model.entity'
__trigger_event__(event_class, **kwargs)[source]

Triggers domain event with entity’s next version number.

The event carries the version number that the originator will have when the originator is mutated with this event. (The event’s originator version isn’t the version of the originator that triggered the event. The Created event has version 0, and so a newly created instance is at version 0. The second event has originator version 1, and so will the originator when the second event has been applied.)

__version__

events

Base classes for domain events of different kinds.

class eventsourcing.domain.model.events.AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Can be published when an attribute of an entity is created.

__module__ = 'eventsourcing.domain.model.events'
name
value
class eventsourcing.domain.model.events.Created(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Can be published when an entity is created.

__module__ = 'eventsourcing.domain.model.events'
class eventsourcing.domain.model.events.Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Published when something is discarded.

__module__ = 'eventsourcing.domain.model.events'
class eventsourcing.domain.model.events.DomainEvent(**kwargs)[source]

Bases: object

Base class for domain events.

Implements methods to make instances read-only, comparable for equality, have recognisable representations, and hashable.

__dict__ = mappingproxy({'__module__': 'eventsourcing.domain.model.events', '__doc__': '\n Base class for domain events.\n\n Implements methods to make instances read-only, comparable\n for equality, have recognisable representations, and hashable.\n ', '__json_encoder_class__': <class 'eventsourcing.utils.transcoding.ObjectJSONEncoder'>, '__notifiable__': True, '__init__': <function DomainEvent.__init__>, '__repr__': <function DomainEvent.__repr__>, '__mutate__': <function DomainEvent.__mutate__>, 'mutate': <function DomainEvent.mutate>, '__setattr__': <function DomainEvent.__setattr__>, '__eq__': <function DomainEvent.__eq__>, '__ne__': <function DomainEvent.__ne__>, '__hash__': <function DomainEvent.__hash__>, '__hash_object__': <classmethod object>, '__dict__': <attribute '__dict__' of 'DomainEvent' objects>, '__weakref__': <attribute '__weakref__' of 'DomainEvent' objects>})
__eq__(other)[source]

Tests for equality of two event objects.

__hash__()[source]

Computes a Python integer hash for an event, using its event hash string.

Supports equality and inequality comparisons.

classmethod __hash_object__(obj)[source]
__init__(**kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__json_encoder_class__

alias of eventsourcing.utils.transcoding.ObjectJSONEncoder

__module__ = 'eventsourcing.domain.model.events'
__mutate__(obj)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
__ne__(other)[source]

Negates the equality test.

__notifiable__ = True
__repr__()[source]

Returns string representing the type and attribute values of the event.

__setattr__(key, value)[source]

Inhibits event attributes from being updated by assignment.

__weakref__

list of weak references to the object (if defined)

mutate(obj)[source]

Convenience for use in custom models, to update obj with values from self without needing to call super method and return obj (two extra lines).

Can be overridden by subclasses. Any value returned by this method will be ignored.

Please note, subclasses that extend mutate() might not have fully completed that method before this method is called. To ensure all base classes have completed their mutate behaviour before mutating an event in a concrete class, extend mutate() instead of overriding this method.

Parameters:obj – object to be mutated
exception eventsourcing.domain.model.events.EventHandlersNotEmptyError[source]

Bases: Exception

__module__ = 'eventsourcing.domain.model.events'
__weakref__

list of weak references to the object (if defined)

class eventsourcing.domain.model.events.EventWithHash(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Base class for domain events.

Implements methods to make instances read-only, comparable for equality, have recognisable representations, and hashable.

__check_hash__()[source]
__event_hash__
__hash__()[source]

Computes a Python integer hash for an event, using its event hash string.

Supports equality and inequality comparisons.

__init__(**kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__module__ = 'eventsourcing.domain.model.events'
__mutate__(obj)[source]

Update obj with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
class eventsourcing.domain.model.events.EventWithOriginatorID(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

__init__(originator_id, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__module__ = 'eventsourcing.domain.model.events'
originator_id
class eventsourcing.domain.model.events.EventWithOriginatorVersion(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an originator version number.

__init__(originator_version, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__module__ = 'eventsourcing.domain.model.events'
originator_version
class eventsourcing.domain.model.events.EventWithTimestamp(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have a timestamp value.

__init__(timestamp=None, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__module__ = 'eventsourcing.domain.model.events'
timestamp
class eventsourcing.domain.model.events.EventWithTimeuuid(event_id=None, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an UUIDv1 event ID.

__init__(event_id=None, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__module__ = 'eventsourcing.domain.model.events'
event_id
class eventsourcing.domain.model.events.Logged(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Published when something is logged.

__module__ = 'eventsourcing.domain.model.events'
eventsourcing.domain.model.events.assert_event_handlers_empty()[source]
eventsourcing.domain.model.events.clear_event_handlers()[source]
eventsourcing.domain.model.events.create_timesequenced_event_id()[source]
eventsourcing.domain.model.events.publish(event)[source]
eventsourcing.domain.model.events.subscribe(handler, predicate=None)[source]
eventsourcing.domain.model.events.unsubscribe(handler, predicate=None)[source]

snapshot

Snapshotting is implemented in the domain layer as an event.

class eventsourcing.domain.model.snapshot.AbstractSnapshop[source]

Bases: abc.ABC

__abstractmethods__ = frozenset({'originator_version', 'state', 'originator_id', 'topic'})
__dict__ = mappingproxy({'__module__': 'eventsourcing.domain.model.snapshot', 'topic': <property object>, 'state': <property object>, 'originator_id': <property object>, 'originator_version': <property object>, '__dict__': <attribute '__dict__' of 'AbstractSnapshop' objects>, '__weakref__': <attribute '__weakref__' of 'AbstractSnapshop' objects>, '__doc__': None, '__abstractmethods__': frozenset({'originator_version', 'state', 'originator_id', 'topic'}), '_abc_impl': <_abc_data object>})
__module__ = 'eventsourcing.domain.model.snapshot'
__weakref__

list of weak references to the object (if defined)

originator_id

ID of the snapshotted entity.

originator_version

Version of the last event applied to the entity.

state

State of the snapshotted entity.

topic

Path to the class of the snapshotted entity.

class eventsourcing.domain.model.snapshot.Snapshot(originator_id, originator_version, topic, state)[source]

Bases: eventsourcing.domain.model.events.EventWithTimestamp, eventsourcing.domain.model.events.EventWithOriginatorVersion, eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.snapshot.AbstractSnapshop

__abstractmethods__ = frozenset()
__init__(originator_id, originator_version, topic, state)[source]

Initialises event attribute values directly from constructor kwargs.

__module__ = 'eventsourcing.domain.model.snapshot'
state

State of the snapshotted entity.

topic

Path to the class of the snapshotted entity.

timebucketedlog

Time-bucketed logs allow a sequence of the items that is sequenced by timestamp to be split across a number of different database partitions, which avoids one partition becoming very large (and then unworkable).

class eventsourcing.domain.model.timebucketedlog.MessageLogged(message, originator_id)[source]

Bases: eventsourcing.domain.model.events.EventWithTimestamp, eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.events.Logged

__init__(message, originator_id)[source]

Initialises event attribute values directly from constructor kwargs.

__module__ = 'eventsourcing.domain.model.timebucketedlog'
message
class eventsourcing.domain.model.timebucketedlog.Timebucketedlog(name, bucket_size=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

class BucketSizeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.timebucketedlog.Event, eventsourcing.domain.model.entity.AttributeChanged

__module__ = 'eventsourcing.domain.model.timebucketedlog'
class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Supertype for events of time-bucketed log.

__module__ = 'eventsourcing.domain.model.timebucketedlog'
class Started(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.timebucketedlog.Event

__module__ = 'eventsourcing.domain.model.timebucketedlog'
__init__(name, bucket_size=None, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__module__ = 'eventsourcing.domain.model.timebucketedlog'
bucket_size
log_message(message)[source]
name
started_on
class eventsourcing.domain.model.timebucketedlog.TimebucketedlogRepository[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

__module__ = 'eventsourcing.domain.model.timebucketedlog'
get_or_create(log_name, bucket_size)[source]

Gets or creates a log.

Return type:Timebucketedlog
eventsourcing.domain.model.timebucketedlog.bucket_duration(bucket_size)[source]
eventsourcing.domain.model.timebucketedlog.bucket_starts(timestamp, bucket_size)[source]
eventsourcing.domain.model.timebucketedlog.make_timebucket_id(log_id, timestamp, bucket_size)[source]
eventsourcing.domain.model.timebucketedlog.next_bucket_starts(timestamp, bucket_size)[source]
eventsourcing.domain.model.timebucketedlog.previous_bucket_starts(timestamp, bucket_size)[source]
eventsourcing.domain.model.timebucketedlog.start_new_timebucketedlog(name, bucket_size=None)[source]
eventsourcing.domain.model.timebucketedlog.timestamp_from_datetime(dt)[source]

infrastructure

The infrastructure layer adapts external devices in ways that are useful for the application, such as the way an event store encapsulates a database.

base

Abstract base classes for the infrastructure layer.

class eventsourcing.infrastructure.base.ACIDRecordManager(record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]

Bases: eventsourcing.infrastructure.base.AbstractSequencedItemRecordManager

ACID record managers can write tracking records and event records in an atomic transaction, needed for atomic processing in process applications.

get_max_record_id()[source]

Return maximum notification ID in pipeline.

get_max_tracking_record_id(upstream_application_name)[source]

Return maximum tracking record ID for notification from upstream application in pipeline.

get_pipeline_and_notification_id(sequence_id, position)[source]

Returns pipeline ID and notification ID for event at given position in given sequence.

has_tracking_record(upstream_application_name, pipeline_id, notification_id)[source]

True if tracking record exists for notification from upstream in pipeline.

tracking_record_class = None
tracking_record_field_names = ['application_name', 'upstream_application_name', 'pipeline_id', 'notification_id']
write_records(records, tracking_kwargs=None)[source]

Writes tracking, event and notification records for a process event.

class eventsourcing.infrastructure.base.AbstractSequencedItemRecordManager(record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]

Bases: abc.ABC

all_sequence_ids()[source]

Returns all sequence IDs.

clone(application_name, pipeline_id, **kwargs)[source]
delete_record(record)[source]

Removes permanently given record from the table.

from_record(record)[source]

Constructs and returns a sequenced item object, from given ORM object.

get_field_kwargs(item)[source]
get_item(sequence_id, position)[source]

Gets sequenced item from the datastore.

get_items(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns sequenced item generator.

get_notifications(start=None, stop=None, *args, **kwargs)[source]

Returns records sequenced by notification ID, from application, for pipeline, in given range.

Args ‘start’ and ‘stop’ are positions in a zero-based integer sequence.

get_record(sequence_id, position)[source]

Gets record at position in sequence.

get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

list_items(*args, **kwargs)[source]

Returns list of sequenced items.

list_sequence_ids()[source]
raise_index_error(position)[source]
raise_operational_error(e)[source]
raise_record_integrity_error(e)[source]
raise_sequenced_item_conflict()[source]
record_sequenced_item(sequenced_item)[source]

Writes sequenced item into the datastore.

record_sequenced_items(sequenced_item_or_items)[source]

Writes sequenced item(s) into the datastore.

to_record(sequenced_item)[source]

Constructs a record object from given sequenced item object.

class eventsourcing.infrastructure.base.SQLRecordManager(*args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.ACIDRecordManager

This is has code common to (extracted from) the SQLAlchemy and Django record managers.

This makes the subclasses harder to read and probably more brittle. So it might be better to inline this with the subclasses, so that each looks more like normal Django or SQLAlchemy code. Also, the record manager test cases don’t cover the notification log and tracking record functionality needed by ProcessApplication, and should so that other record managers can more easily be developed.

_prepare_insert(tmpl, record_class, field_names, placeholder_for_id=False)[source]

Compile SQL statement with placeholders for bind parameters.

get_record_table_name(record_class)[source]

Returns table name - used in raw queries.

Return type:str
insert_select_max

SQL statement that inserts records with contiguous IDs, by selecting max ID from indexed table records.

insert_tracking_record

SQL statement that inserts tracking records.

insert_values

SQL statement that inserts records without ID.

record_sequenced_items(sequenced_item_or_items)[source]

Writes sequenced item(s) into the datastore.

to_records(sequenced_item_or_items)[source]

cassandra

Classes for event sourcing with Apache Cassandra.

class eventsourcing.infrastructure.cassandra.datastore.CassandraDatastore(tables, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.datastore.Datastore

close_connection()[source]

Drops connection to a datastore.

drop_table(*_)[source]
drop_tables()[source]

Drops tables used to store events.

setup_connection()[source]

Sets up a connection to a datastore.

setup_tables()[source]

Sets up tables used to store events.

truncate_tables()[source]

Truncates tables used to store events.

class eventsourcing.infrastructure.cassandra.datastore.CassandraSettings(hosts=None, port=None, protocol_version=None, default_keyspace=None, consistency=None, replication_factor=None, username=None, password=None)[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreSettings

CONSISTENCY_LEVEL = 'LOCAL_QUORUM'
DEFAULT_KEYSPACE = 'eventsourcing'
HOSTS = ['127.0.0.1']
PORT = 9042
PROTOCOL_VERSION = 3
REPLICATION_FACTOR = 1
class eventsourcing.infrastructure.cassandra.factory.CassandraInfrastructureFactory(record_manager_class=None, sequenced_item_class=None, integer_sequenced_record_class=None, timestamp_sequenced_record_class=None, snapshot_record_class=None, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

integer_sequenced_record_class

alias of eventsourcing.infrastructure.cassandra.records.IntegerSequencedRecord

record_manager_class

alias of eventsourcing.infrastructure.cassandra.manager.CassandraRecordManager

snapshot_record_class

alias of eventsourcing.infrastructure.cassandra.records.SnapshotRecord

timestamp_sequenced_record_class

alias of eventsourcing.infrastructure.cassandra.records.TimestampSequencedRecord

class eventsourcing.infrastructure.cassandra.manager.CassandraRecordManager(record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]

Bases: eventsourcing.infrastructure.base.AbstractSequencedItemRecordManager

all_sequence_ids()[source]

Returns all sequence IDs.

delete_record(record)[source]

Removes permanently given record from the table.

filter(**kwargs)[source]
get_notifications(start=None, stop=None, *args, **kwargs)[source]

Not implemented.

get_record(sequence_id, position)[source]

Gets record at position in sequence.

get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

record_sequenced_items(sequenced_item_or_items)[source]

Writes sequenced item(s) into the datastore.

class eventsourcing.infrastructure.cassandra.records.IntegerSequencedRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores integer-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

pk = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
position = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
sequence_id = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
state = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
topic = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
class eventsourcing.infrastructure.cassandra.records.SnapshotRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores snapshots in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

pk = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
position = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
sequence_id = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
state = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
topic = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
class eventsourcing.infrastructure.cassandra.records.StoredEventRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores integer-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

originator_id = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
originator_version = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
pk = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
state = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
topic = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
class eventsourcing.infrastructure.cassandra.records.TimestampSequencedRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores timestamp-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

pk = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
position = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
sequence_id = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
state = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
topic = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
class eventsourcing.infrastructure.cassandra.records.TimeuuidSequencedRecord(**values)[source]

Bases: cassandra.cqlengine.models.Model

Stores timeuuid-sequenced items in Cassandra.

exception DoesNotExist

Bases: cassandra.cqlengine.models.DoesNotExist

exception MultipleObjectsReturned

Bases: cassandra.cqlengine.models.MultipleObjectsReturned

pk = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
position = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
sequence_id = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
state = <cassandra.cqlengine.models.ColumnQueryEvaluator object>
topic = <cassandra.cqlengine.models.ColumnQueryEvaluator object>

datastore

Base classes for concrete datastore classes.

class eventsourcing.infrastructure.datastore.Datastore(settings)[source]

Bases: abc.ABC

close_connection()[source]

Drops connection to a datastore.

drop_tables()[source]

Drops tables used to store events.

setup_connection()[source]

Sets up a connection to a datastore.

setup_tables()[source]

Sets up tables used to store events.

truncate_tables()[source]

Truncates tables used to store events.

exception eventsourcing.infrastructure.datastore.DatastoreConnectionError[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreError

exception eventsourcing.infrastructure.datastore.DatastoreError[source]

Bases: Exception

class eventsourcing.infrastructure.datastore.DatastoreSettings[source]

Bases: object

Base class for settings for database connection used by a stored event repository.

exception eventsourcing.infrastructure.datastore.DatastoreTableError[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreError

django

A Django application for event sourcing with the Django ORM.

class eventsourcing.infrastructure.django.factory.DjangoInfrastructureFactory(record_manager_class=None, sequenced_item_class=None, integer_sequenced_record_class=None, timestamp_sequenced_record_class=None, snapshot_record_class=None, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

integer_sequenced_record_class

alias of eventsourcing.infrastructure.django.models.IntegerSequencedRecord

record_manager_class

alias of eventsourcing.infrastructure.django.manager.DjangoRecordManager

snapshot_record_class

alias of eventsourcing.infrastructure.django.models.SnapshotRecord

timestamp_sequenced_record_class

alias of eventsourcing.infrastructure.django.models.TimestampSequencedRecord

class eventsourcing.infrastructure.django.manager.DjangoRecordManager(*args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.SQLRecordManager

_prepare_insert(tmpl, record_class, field_names, placeholder_for_id=False)[source]

With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, using an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.

all_sequence_ids()[source]

Returns all sequence IDs.

delete_record(record)[source]

Permanently removes record from table.

get_max_record_id()[source]

Return maximum notification ID in pipeline.

get_max_tracking_record_id(upstream_application_name)[source]

Return maximum tracking record ID for notification from upstream application in pipeline.

get_notifications(start=None, stop=None, *args, **kwargs)[source]

Returns all records in the table.

get_record(sequence_id, position)[source]

Gets record at position in sequence.

get_record_table_name(record_class)[source]

Returns table name from SQLAlchemy record class.

get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

has_tracking_record(upstream_application_name, pipeline_id, notification_id)[source]

True if tracking record exists for notification from upstream in pipeline.

tracking_record_class

alias of eventsourcing.infrastructure.django.models.NotificationTrackingRecord

write_records(records, tracking_kwargs=None)[source]

Writes tracking, event and notification records for a process event.

class eventsourcing.infrastructure.django.models.EntitySnapshotRecord(uid, application_name, originator_id, originator_version, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

application_name

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

objects = <django.db.models.manager.Manager object>
originator_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

originator_version

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

state

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

topic

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

uid

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

class eventsourcing.infrastructure.django.models.IntegerSequencedRecord(id, sequence_id, position, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

objects = <django.db.models.manager.Manager object>
position

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

sequence_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

state

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

topic

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

class eventsourcing.infrastructure.django.models.NotificationTrackingRecord(uid, application_name, upstream_application_name, pipeline_id, notification_id)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

application_name

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

notification_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

objects = <django.db.models.manager.Manager object>
pipeline_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

uid

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

upstream_application_name

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

class eventsourcing.infrastructure.django.models.SnapshotRecord(uid, sequence_id, position, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

objects = <django.db.models.manager.Manager object>
position

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

sequence_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

state

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

topic

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

uid

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

class eventsourcing.infrastructure.django.models.StoredEventRecord(uid, application_name, originator_id, originator_version, pipeline_id, notification_id, topic, state, causal_dependencies)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

application_name

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

causal_dependencies

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

notification_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

objects = <django.db.models.manager.Manager object>
originator_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

originator_version

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

pipeline_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

state

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

topic

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

uid

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

class eventsourcing.infrastructure.django.models.TimestampSequencedRecord(id, sequence_id, position, topic, state)[source]

Bases: django.db.models.base.Model

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

objects = <django.db.models.manager.Manager object>
position

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

sequence_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

state

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

topic

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

eventsourcing.infrastructure.django.utils.close_django_connection()[source]
eventsourcing.infrastructure.django.utils.setup_django()[source]

eventplayer

Base classes for event players of different kinds.

class eventsourcing.infrastructure.eventplayer.EventPlayer(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]

Bases: eventsourcing.domain.model.entity.AbstractEventPlayer

event_store
static mutate(initial, event)[source]
project_events(initial_state, domain_events)[source]

Evolves initial state using the sequence of domain events and a mutator function.

eventsourcedrepository

Base classes for event sourced repositories (not abstract, can be used directly).

class eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]

Bases: eventsourcing.infrastructure.eventplayer.EventPlayer, eventsourcing.domain.model.entity.AbstractEntityRepository

__contains__(entity_id)[source]

Returns a boolean value according to whether entity with given ID exists.

__getitem__(entity_id)[source]

Returns entity with given ID.

get_and_project_events(entity_id, gt=None, gte=None, lt=None, lte=None, limit=None, initial_state=None, query_descending=False)[source]

Reconstitutes requested domain entity from domain events found in event store.

get_entity(entity_id, at=None)[source]

Returns entity with given ID, optionally until position.

take_snapshot(entity_id, lt=None, lte=None)[source]

Takes a snapshot of the entity as it existed after the most recent event, optionally less than, or less than or equal to, a particular position.

eventstore

The event store provides the application-level interface to the event sourcing persistence mechanism.

class eventsourcing.infrastructure.eventstore.AbstractEventStore[source]

Bases: abc.ABC

Abstract base class for event stores. Defines the methods expected of an event store by other classes in the library.

all_domain_events()[source]

Returns all domain events in the event store.

get_domain_event(originator_id, position)[source]

Returns a single domain event.

get_domain_events(originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True, page_size=None)[source]

Returns domain events for given entity ID.

get_most_recent_event(originator_id, lt=None, lte=None)[source]

Returns most recent domain event for given entity ID.

store(domain_event_or_events)[source]

Put domain event in event store for later retrieval.

class eventsourcing.infrastructure.eventstore.EventStore(record_manager, sequenced_item_mapper)[source]

Bases: eventsourcing.infrastructure.eventstore.AbstractEventStore

Event store appends domain events to stored sequences. It uses a record manager to map named tuples to database records, and it uses a sequenced item mapper to map named tuples to application-level objects.

__init__(record_manager, sequenced_item_mapper)[source]

Initialises event store object.

Parameters:
  • record_manager – record manager
  • sequenced_item_mapper – sequenced item mapper
all_domain_events()[source]

Yields all domain events in the event store.

get_domain_event(originator_id, position)[source]

Gets a domain event from the sequence identified by originator_id at position eq.

Parameters:
  • originator_id – ID of a sequence of events
  • position – get item at this position
Returns:

domain event

get_domain_events(originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True, page_size=None)[source]

Gets domain events from the sequence identified by originator_id.

Parameters:
  • originator_id – ID of a sequence of events
  • gt – get items after this position
  • gte – get items at or after this position
  • lt – get items before this position
  • lte – get items before or at this position
  • limit – get limited number of items
  • is_ascending – get items from lowest position
  • page_size – restrict and repeat database query
Returns:

list of domain events

get_most_recent_event(originator_id, lt=None, lte=None)[source]

Gets a domain event from the sequence identified by originator_id at the highest position.

Parameters:
  • originator_id – ID of a sequence of events
  • lt – get highest before this position
  • lte – get highest at or before this position
Returns:

domain event

item_from_event(domain_event_or_events)[source]

Maps domain event to sequenced item namedtuple.

Parameters:domain_event_or_events – application-level object (or list)
Returns:namedtuple: sequence item namedtuple (or list)
iterator_class

alias of eventsourcing.infrastructure.iterators.SequencedItemIterator

store(domain_event_or_events)[source]

Appends given domain event, or list of domain events, to their sequence.

Parameters:domain_event_or_events – domain event, or list of domain events

integersequencegenerators

Different ways of generating sequences of integers.

class eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator[source]

Bases: object

__next__()[source]

Returns the next item in the container.

class eventsourcing.infrastructure.integersequencegenerators.base.SimpleIntegerSequenceGenerator(i=0)[source]

Bases: eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator

class eventsourcing.infrastructure.integersequencegenerators.redisincr.RedisIncr(redis=None, key=None)[source]

Bases: eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator

Generates a sequence of integers, using Redis’ INCR command.

Maximum number is 2**63, or 9223372036854775807, the maximum value of a 64 bit signed integer.

iterators

Different ways of getting sequenced items from a datastore.

class eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: abc.ABC

DEFAULT_PAGE_SIZE = 1000
__iter__()[source]

Yields a continuous sequence of items.

_inc_page_counter()[source]

Increments the page counter.

Each query result as a page, even if there are no items in the page. This really counts queries.
  • it is easy to divide the number of events by the page size if the “correct” answer is required
  • there will be a difference in the counts when the number of events can be exactly divided by the page size, because there is no way to know in advance that a full page is also the last page.
_inc_query_counter()[source]

Increments the query counter.

class eventsourcing.infrastructure.iterators.GetEntityEventsThread(record_manager, sequence_id, gt=None, gte=None, lt=None, lte=None, page_size=None, is_ascending=True, *args, **kwargs)[source]

Bases: threading.Thread

run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class eventsourcing.infrastructure.iterators.SequencedItemIterator(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator

__iter__()[source]

Yields a continuous sequence of items from “pages” of sequenced items retrieved using the record manager.

class eventsourcing.infrastructure.iterators.ThreadedSequencedItemIterator(record_manager, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]

Bases: eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator

start_thread()[source]

repositories

Repository base classes for entity classes defined in the library.

class eventsourcing.infrastructure.repositories.array.ArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.array.AbstractArrayRepository, eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

class eventsourcing.infrastructure.repositories.array.BigArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.array.AbstractBigArrayRepository, eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository

subrepo

Sub-sequence repository.

subrepo_class

alias of ArrayRepository

class eventsourcing.infrastructure.repositories.collection_repo.CollectionRepository(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]

Bases: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository, eventsourcing.domain.model.collection.AbstractCollectionRepository

Event sourced repository for the Collection domain model entity.

class eventsourcing.infrastructure.repositories.timebucketedlog_repo.TimebucketedlogRepo(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]

Bases: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository, eventsourcing.domain.model.timebucketedlog.TimebucketedlogRepository

Event sourced repository for the Example domain model entity.

sequenceditem

The persistence model for storing events.

class eventsourcing.infrastructure.sequenceditem.SequencedItem(sequence_id, position, topic, state)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, sequence_id, position, topic, state)

Create new instance of SequencedItem(sequence_id, position, topic, state)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values.

classmethod _make(iterable)

Make a new SequencedItem object from a sequence or iterable

_replace(**kwds)

Return a new SequencedItem object replacing specified fields with new values

position

Alias for field number 1

sequence_id

Alias for field number 0

state

Alias for field number 3

topic

Alias for field number 2

class eventsourcing.infrastructure.sequenceditem.SequencedItemFieldNames(sequenced_item_class)[source]

Bases: object

other_names
position
sequence_id
state
topic
class eventsourcing.infrastructure.sequenceditem.StoredEvent(originator_id, originator_version, topic, state)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

static __new__(_cls, originator_id, originator_version, topic, state)

Create new instance of StoredEvent(originator_id, originator_version, topic, state)

__repr__()

Return a nicely formatted representation string

_asdict()

Return a new OrderedDict which maps field names to their values.

classmethod _make(iterable)

Make a new StoredEvent object from a sequence or iterable

_replace(**kwds)

Return a new StoredEvent object replacing specified fields with new values

originator_id

Alias for field number 0

originator_version

Alias for field number 1

state

Alias for field number 3

topic

Alias for field number 2

sequenceditemmapper

The sequenced item mapper maps sequenced items to application-level objects.

class eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper[source]

Bases: abc.ABC

event_from_item(sequenced_item)[source]

Constructs and returns a domain event for given sequenced item.

item_from_event(domain_event)[source]

Constructs and returns a sequenced item for given domain event.

class eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper(sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=None, json_decoder_class=None, cipher=None, other_attr_names=())[source]

Bases: eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper

Uses JSON to transcode domain events.

construct_item_args(domain_event)[source]

Constructs attributes of a sequenced item from the given domain event.

construct_sequenced_item(item_args)[source]
event_from_item(sequenced_item)[source]

Reconstructs domain event from stored event topic and event attrs. Used in the event store when getting domain events.

event_from_topic_and_state(topic, state)[source]
get_event_class_and_attrs(topic, state)[source]
get_item_topic_and_state(domain_event_class, event_attrs)[source]
item_from_event(domain_event)[source]

Constructs a sequenced item from a domain event.

eventsourcing.infrastructure.sequenceditemmapper.reconstruct_object(obj_class, obj_state)[source]

snapshotting

Snapshotting avoids having to replay an entire sequence of events to obtain the current state of a projection.

class eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy[source]

Bases: abc.ABC

get_snapshot(entity_id, lt=None, lte=None)[source]

Gets the last snapshot for entity, optionally until a particular version number.

Return type:Snapshot
take_snapshot(entity_id, entity, last_event_version)[source]

Takes a snapshot of entity, using given ID, state and version number.

Return type:AbstractSnapshop
class eventsourcing.infrastructure.snapshotting.EventSourcedSnapshotStrategy(snapshot_store)[source]

Bases: eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy

Snapshot strategy that uses an event sourced snapshot.

get_snapshot(entity_id, lt=None, lte=None)[source]

Gets the last snapshot for entity, optionally until a particular version number.

Return type:Snapshot
take_snapshot(entity_id, entity, last_event_version)[source]

Creates a Snapshot from the given state, and appends it to the snapshot store.

Return type:Snapshot
eventsourcing.infrastructure.snapshotting.entity_from_snapshot(snapshot)[source]

Reconstructs domain entity from given snapshot.

sqlalchemy

Classes for event sourcing with SQLAlchemy.

class eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemyDatastore(base=<class 'sqlalchemy.ext.declarative.api.Base'>, tables=None, connection_strategy='plain', session=None, **kwargs)[source]

Bases: eventsourcing.infrastructure.datastore.Datastore

close_connection()[source]

Drops connection to a datastore.

drop_table(table)[source]
drop_tables()[source]

Drops tables used to store events.

is_sqlite()[source]
session
setup_connection()[source]

Sets up a connection to a datastore.

setup_table(table)[source]
setup_tables(tables=None)[source]

Sets up tables used to store events.

truncate_tables()[source]

Truncates tables used to store events.

class eventsourcing.infrastructure.sqlalchemy.datastore.SQLAlchemySettings(uri=None, pool_size=None)[source]

Bases: eventsourcing.infrastructure.datastore.DatastoreSettings

class eventsourcing.infrastructure.sqlalchemy.factory.SQLAlchemyInfrastructureFactory(session, uri=None, pool_size=None, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.factory.InfrastructureFactory

construct_datastore()[source]
construct_record_manager(**kwargs)[source]
integer_sequenced_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord

record_manager_class

alias of eventsourcing.infrastructure.sqlalchemy.manager.SQLAlchemyRecordManager

snapshot_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.SnapshotRecord

timestamp_sequenced_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord

eventsourcing.infrastructure.sqlalchemy.factory.construct_sqlalchemy_eventstore(session, sequenced_item_class=None, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=None, json_decoder_class=None, cipher=None, record_class=None, contiguous_record_ids=False, application_name=None, pipeline_id=0)[source]
class eventsourcing.infrastructure.sqlalchemy.manager.SQLAlchemyRecordManager(session, *args, **kwargs)[source]

Bases: eventsourcing.infrastructure.base.SQLRecordManager

_prepare_insert(tmpl, record_class, field_names, placeholder_for_id=False)[source]

With transaction isolation level of “read committed” this should generate records with a contiguous sequence of integer IDs, assumes an indexed ID column, the database-side SQL max function, the insert-select-from form, and optimistic concurrency control.

all_sequence_ids()[source]

Returns all sequence IDs.

clone(**kwargs)[source]
delete_record(record)[source]

Permanently removes record from table.

filter_by(**kwargs)[source]
get_max_record_id()[source]

Return maximum notification ID in pipeline.

get_max_tracking_record_id(upstream_application_name)[source]

Return maximum tracking record ID for notification from upstream application in pipeline.

get_notifications(start=None, stop=None, *args, **kwargs)[source]

Returns records sequenced by notification ID, from application, for pipeline, in given range.

Args ‘start’ and ‘stop’ are positions in a zero-based integer sequence.

get_record(sequence_id, position)[source]

Gets record at position in sequence.

get_record_table_name(record_class)[source]

Returns table name - used in raw queries.

Return type:str
get_records(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]

Returns records for a sequence.

has_tracking_record(upstream_application_name, pipeline_id, notification_id)[source]

True if tracking record exists for notification from upstream in pipeline.

orm_query()[source]
tracking_record_class

alias of eventsourcing.infrastructure.sqlalchemy.records.NotificationTrackingRecord

write_records(records, tracking_kwargs=None)[source]

Writes tracking, event and notification records for a process event.

class eventsourcing.infrastructure.sqlalchemy.records.EntitySnapshotRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

application_name
originator_id
originator_version
state
topic
class eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedNoIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

position
sequence_id
state
topic
eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedRecord

alias of eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord

class eventsourcing.infrastructure.sqlalchemy.records.IntegerSequencedWithIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

id
position
sequence_id
state
topic
class eventsourcing.infrastructure.sqlalchemy.records.NotificationTrackingRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

application_name
notification_id
pipeline_id
upstream_application_name
class eventsourcing.infrastructure.sqlalchemy.records.SnapshotRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

position
sequence_id
state
topic
class eventsourcing.infrastructure.sqlalchemy.records.StoredEventRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

application_name
causal_dependencies
notification_id
originator_id
originator_version
pipeline_id
state
topic
class eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

position
sequence_id
state
topic
eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedRecord

alias of eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedNoIDRecord

class eventsourcing.infrastructure.sqlalchemy.records.TimestampSequencedWithIDRecord(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Base

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

id
position
sequence_id
state
topic

timebucketedlog_reader

Reader for timebucketed logs.

class eventsourcing.infrastructure.timebucketedlog_reader.TimebucketedlogReader(log, event_store, page_size=50)[source]

Bases: object

get_events(gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=False, page_size=None)[source]
get_messages(gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=False, page_size=None)[source]
eventsourcing.infrastructure.timebucketedlog_reader.get_timebucketedlog_reader(log, event_store)[source]
Return type:TimebucketedlogReader

interface

The interface layer uses an application to service client requests.

notificationlog

Notification log is a pull-based mechanism for updating other applications.

class eventsourcing.interface.notificationlog.AbstractNotificationLog[source]

Bases: abc.ABC

Presents a sequence of sections from a sequence of notifications.

__getitem__(section_id)[source]

Get section of notification log.

Return type:Section
class eventsourcing.interface.notificationlog.BigArrayNotificationLog(big_array, section_size)[source]

Bases: eventsourcing.interface.notificationlog.LocalNotificationLog

get_items(start, stop, next_position=None)[source]

Returns items for section.

Return type:list
get_next_position()[source]

Returns items for section.

Return type:int
class eventsourcing.interface.notificationlog.LocalNotificationLog(section_size=None)[source]

Bases: eventsourcing.interface.notificationlog.AbstractNotificationLog

Presents a sequence of sections from a sequence of notifications.

static format_section_id(first_item_number, last_item_number)[source]
get_items(start, stop, next_position=None)[source]

Returns items for section.

Return type:list
get_next_position()[source]

Returns items for section.

Return type:int
class eventsourcing.interface.notificationlog.NotificationLogReader(notification_log, use_direct_query_if_available=False)[source]

Bases: abc.ABC

read(advance_by=None)[source]
read_items(stop_index=None, advance_by=None)[source]
read_list(advance_by=None)[source]
seek(position)[source]
class eventsourcing.interface.notificationlog.NotificationLogView(notification_log, json_encoder_class=None)[source]

Bases: object

present_section(section_id)[source]
class eventsourcing.interface.notificationlog.RecordManagerNotificationLog(record_manager, section_size)[source]

Bases: eventsourcing.interface.notificationlog.LocalNotificationLog

get_items(start, stop, next_position=None)[source]

Returns items for section.

Return type:list
get_next_position()[source]

Next unoccupied position in zero-based sequence.

Since the notification IDs are one-based, the next position is the current max notification ID. If there are no records, the max notification ID will be None, and the next position is zero.

class eventsourcing.interface.notificationlog.RemoteNotificationLog(base_url, json_decoder_class=None)[source]

Bases: eventsourcing.interface.notificationlog.AbstractNotificationLog

deserialize_section(section_json)[source]
get_json(section_id)[source]
get_resource(doc_url)[source]
make_notification_log_url(section_id)[source]
class eventsourcing.interface.notificationlog.Section(section_id, items, previous_id=None, next_id=None)[source]

Bases: object

Section of a notification log.

Contains items, and has an ID.

May also have either IDs of previous and next sections of the notification log.

utils

The utils package contains common functions that are used in more than one layer.

cipher

class eventsourcing.utils.cipher.aes.AESCipher(cipher_key)[source]

Bases: object

Cipher strategy that uses Crypto library AES cipher in GCM mode.

__init__(cipher_key)[source]

Initialises AES cipher strategy with cipher_key.

Parameters:cipher_key – 16, 24, or 32 random bytes
decrypt(ciphertext)[source]

Return plaintext for given ciphertext.

encrypt(plaintext)[source]

Return ciphertext for given plaintext.

time

eventsourcing.utils.times.datetime_from_timestamp(t)[source]

Returns naive UTC datetime from decimal UNIX timestamps such as time.time().

Parameters:t – timestamp, either Decimal or float
Returns:datetime.datetime object
eventsourcing.utils.times.decimaltimestamp(t=None)[source]

A UNIX timestamp as a Decimal object (exact number type).

Returns current time when called without args, otherwise converts given floating point number t to a Decimal with 9 decimal places.

Parameters:t – Floating point UNIX timestamp (“seconds since epoch”).
Returns:A Decimal with 6 decimal places, representing the given floating point or the value returned by time.time().
eventsourcing.utils.times.decimaltimestamp_from_uuid(uuid_arg)[source]

Return a floating point unix timestamp.

Parameters:uuid_arg
Returns:Unix timestamp in seconds, with microsecond precision.
Return type:float
eventsourcing.utils.times.timestamp_long_from_uuid(uuid_arg)[source]

Returns an integer value representing a unix timestamp in tenths of microseconds.

Parameters:uuid_arg
Returns:Unix timestamp integer in tenths of microseconds.
Return type:int

topic

eventsourcing.utils.topic.get_topic(domain_class)[source]

Returns a string describing a class.

Args:
domain_class: A class.
Returns:
A string describing the class.
eventsourcing.utils.topic.resolve_attr(obj, path)[source]

A recursive version of getattr for navigating dotted paths.

Args:
obj: An object for which we want to retrieve a nested attribute. path: A dot separated string containing zero or more attribute names.
Returns:
The attribute referred to by obj.a1.a2.a3…
Raises:
AttributeError: If there is no such attribute.
eventsourcing.utils.topic.resolve_topic(topic)[source]

Return class described by given topic.

Args:
topic: A string describing a class.
Returns:
A class.
Raises:
TopicResolutionError: If there is no such class.

transcoding

class eventsourcing.utils.transcoding.ObjectJSONDecoder(object_hook=None, **kwargs)[source]

Bases: json.decoder.JSONDecoder

classmethod from_jsonable(d)[source]
class eventsourcing.utils.transcoding.ObjectJSONEncoder(sort_keys=True, *args, **kwargs)[source]

Bases: json.encoder.JSONEncoder

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
iterencode(o, _one_shot=False)[source]

Encode the given object and yield each string representation as available.

For example:

for chunk in JSONEncoder().iterencode(bigobject):
    mysocket.write(chunk)
eventsourcing.utils.transcoding.json_dumps(obj, cls=None)[source]
eventsourcing.utils.transcoding.json_loads(s, cls=None)[source]

exceptions

A few exception classes are defined by the library to indicate particular kinds of error.

exception eventsourcing.exceptions.ArrayIndexError[source]

Bases: IndexError, eventsourcing.exceptions.EventSourcingError

Raised when appending item to an array that is full.

exception eventsourcing.exceptions.CausalDependencyFailed[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when a causal dependency fails (after its tracking record not found).

exception eventsourcing.exceptions.ConcurrencyError[source]

Bases: eventsourcing.exceptions.RecordConflictError

Raised when a record conflict is due to concurrency.

exception eventsourcing.exceptions.ConsistencyError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when applying an event stream to a versioned entity.

exception eventsourcing.exceptions.DataIntegrityError[source]

Bases: ValueError, eventsourcing.exceptions.EventSourcingError

Raised when a sequenced item is damaged (hash doesn’t match data)

exception eventsourcing.exceptions.DatasourceSettingsError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when an error is detected in settings for a datasource.

exception eventsourcing.exceptions.EntityIsDiscarded[source]

Bases: AssertionError

Raised when access to a recently discarded entity object is attempted.

exception eventsourcing.exceptions.EntityVersionNotFound[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raise when accessing an entity version that does not exist.

exception eventsourcing.exceptions.EventHashError[source]

Bases: eventsourcing.exceptions.DataIntegrityError

Raised when an event’s seal hash doesn’t match the hash of the state of the event.

exception eventsourcing.exceptions.EventRecordNotFound[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when an event record is not found.

exception eventsourcing.exceptions.EventSourcingError[source]

Bases: Exception

Base eventsourcing exception.

exception eventsourcing.exceptions.HeadHashError[source]

Bases: eventsourcing.exceptions.DataIntegrityError, eventsourcing.exceptions.MismatchedOriginatorError

Raised when applying an event with hash different from aggregate head.

exception eventsourcing.exceptions.MismatchedOriginatorError[source]

Bases: eventsourcing.exceptions.ConsistencyError

Raised when applying an event to an inappropriate object.

exception eventsourcing.exceptions.MutatorRequiresTypeNotInstance[source]

Bases: eventsourcing.exceptions.ConsistencyError

Raised when mutator function received a class rather than an entity.

exception eventsourcing.exceptions.OperationalError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when an operational error is encountered.

exception eventsourcing.exceptions.OriginatorIDError[source]

Bases: eventsourcing.exceptions.MismatchedOriginatorError

Raised when applying an event to the wrong entity or aggregate.

exception eventsourcing.exceptions.OriginatorVersionError[source]

Bases: eventsourcing.exceptions.MismatchedOriginatorError

Raised when applying an event to the wrong version of an entity or aggregate.

exception eventsourcing.exceptions.ProgrammingError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when programming errors are encountered.

exception eventsourcing.exceptions.PromptFailed[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when prompt fails.

exception eventsourcing.exceptions.RecordConflictError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when database raises an integrity error.

exception eventsourcing.exceptions.RepositoryKeyError[source]

Bases: KeyError, eventsourcing.exceptions.EventSourcingError

Raised when using entity repository’s dictionary like interface to get an entity that does not exist.

exception eventsourcing.exceptions.TimeSequenceError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when a time sequence error occurs e.g. trying to save a timestamp that already exists.

exception eventsourcing.exceptions.TopicResolutionError[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when unable to resolve a topic to a Python class.

exception eventsourcing.exceptions.TrackingRecordNotFound[source]

Bases: eventsourcing.exceptions.EventSourcingError

Raised when a tracking record is not found.

example

A simple, unit-tested, event sourced application.

application

class eventsourcing.example.application.ApplicationWithEventStores(entity_record_manager=None, log_record_manager=None, snapshot_record_manager=None, cipher=None, sequenced_item_mapper_class=<class 'eventsourcing.infrastructure.sequenceditemmapper.SequencedItemMapper'>)[source]

Bases: abc.ABC

Event sourced application object class.

Can construct event stores using given database records. Supports three different event stores: for log events, for entity events, and for snapshot events.

close()[source]
construct_event_store(sequenced_item_mapper_class, event_sequence_id_attr, event_position_attr, record_manager, cipher=None)[source]
construct_sequenced_item_mapper(sequenced_item_mapper_class, sequenced_item_class, event_sequence_id_attr, event_position_attr, cipher=None, json_encoder_class=<class 'eventsourcing.utils.transcoding.ObjectJSONEncoder'>, json_decoder_class=<class 'eventsourcing.utils.transcoding.ObjectJSONDecoder'>)[source]
class eventsourcing.example.application.ApplicationWithPersistencePolicies(**kwargs)[source]

Bases: eventsourcing.example.application.ApplicationWithEventStores

close()[source]
construct_entity_persistence_policy()[source]
construct_log_persistence_policy()[source]
construct_snapshot_persistence_policy()[source]
class eventsourcing.example.application.ExampleApplication(**kwargs)[source]

Bases: eventsourcing.example.application.ApplicationWithPersistencePolicies

Example event sourced application with entity factory and repository.

create_new_example(foo='', a='', b='')[source]

Entity object factory.

eventsourcing.example.application.close_example_application()[source]

Shuts down single global instance of application.

To be called when tearing down, perhaps between tests, in order to allow a subsequent call to init_example_application().

eventsourcing.example.application.construct_example_application(**kwargs)[source]

Application object factory.

eventsourcing.example.application.get_example_application()[source]

Returns single global instance of application.

To be called when handling a worker request, if required.

eventsourcing.example.application.init_example_application(**kwargs)[source]

Constructs single global instance of application.

To be called when initialising a worker process.

domainmodel

class eventsourcing.example.domainmodel.AbstractExampleRepository[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

class eventsourcing.example.domainmodel.Example(foo='', a='', b='', **kwargs)[source]

Bases: eventsourcing.domain.model.entity.EntityWithHashchain, eventsourcing.domain.model.entity.TimestampedVersionedEntity

An example event sourced domain model entity.

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when an Example is created.

class Created(**kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Created

Published when an Example is created.

class Discarded(**kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.Discarded

Published when an Example is discarded.

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Event

Supertype for events of example entities.

class Heartbeat(**kwargs)[source]

Bases: eventsourcing.example.domainmodel.Event, eventsourcing.domain.model.entity.Event

Published when a heartbeat in the entity occurs (see below).

mutate(obj)[source]

Update obj with values from self.

a

An example attribute.

b

Another example attribute.

beat_heart(number_of_beats=1)[source]
count_heartbeats()[source]
foo

An example attribute.

eventsourcing.example.domainmodel.create_new_example(foo='', a='', b='')[source]

Factory method for example entities.

Return type:Example

infrastructure

class eventsourcing.example.infrastructure.ExampleRepository(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]

Bases: eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository, eventsourcing.example.domainmodel.AbstractExampleRepository

Event sourced repository for the Example domain model entity.

interface

class eventsourcing.example.interface.flaskapp.IntegerSequencedItem(**kwargs)[source]

Bases: sqlalchemy.ext.declarative.api.Model

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

id
position
sequence_id
state
topic
eventsourcing.example.interface.flaskapp.hello()[source]
eventsourcing.example.interface.flaskapp.init_example_application_with_sqlalchemy()[source]