domain.model

The domain model package contains classes and functions that can help develop an event sourced domain model.

events

Base classes for domain events of different kinds.

class eventsourcing.domain.model.events.DomainEvent(**kwargs)[source]

Bases: object

Base class for domain model events.

Implements methods to make instances read-only, comparable for equality in Python, and have recognisable representations.Custom To make domain events hashable, this class also implements a method to create a cryptographic hash of the state of the event.

__json_encoder_class__

alias of eventsourcing.utils.transcoding.ObjectJSONEncoder

__init__(**kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__repr__()[source]

Creates a string representing the type and attribute values of the event.

Return type:str
__mutate__(obj)[source]

Updates ‘obj’ with values from ‘self’.

Calls the ‘mutate()’ method.

Can be extended, but subclasses must call super and return an object to their caller.

Parameters:obj – object (normally a domain entity) to be mutated
Returns:mutated object
mutate(obj)[source]

Updates (“mutates”) given ‘obj’.

Intended to be overridden by subclasses, as the most concise way of coding a default projection of the event (for example into the state of a domain entity).

The advantage of implementing a default projection using this method rather than __mutate__ is that you don’t need to call super or return a value.

Parameters:obj – domain entity to be mutated
__setattr__(key, value)[source]

Inhibits event attributes from being updated by assignment.

__eq__(other)[source]

Tests for equality of two event objects.

Return type:bool
__ne__(other)[source]

Negates the equality test.

Return type:bool
__hash__()[source]

Computes a Python integer hash for an event.

Supports Python equality and inequality comparisons.

Returns:Python integer hash
Return type:int
classmethod __hash_object__(obj)[source]

Calculates SHA-256 hash of JSON encoded ‘obj’.

Parameters:obj – Object to be hashed.
Returns:SHA-256 as hexadecimal string.
Return type:str
class eventsourcing.domain.model.events.EventWithHash(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Base class for domain events with a cryptographic event hash.

Extends DomainEvent by setting a cryptographic event hash when the event is originated, and checking the event hash whenever its default projection mutates an object.

__init__(**kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__event_hash__

Returns SHA-256 hash of the original state of the event.

Returns:SHA-256 as hexadecimal string.
Return type:str
__hash__()[source]

Computes a Python integer hash for an event, using its pre-computed event hash.

Supports Python equality and inequality comparisons only.

Returns:Python integer hash
Return type:int
__mutate__(obj)[source]

Updates ‘obj’ with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
__check_hash__()[source]

Raises EventHashError, unless self.__event_hash__ can be derived from the current state of the event object.

class eventsourcing.domain.model.events.EventWithOriginatorID(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an originator ID.

__init__(originator_id, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

originator_id

Originator ID is the identity of the object that originated this event.

Returns:A UUID representing the identity of the originator.
Return type:UUID
class eventsourcing.domain.model.events.EventWithTimestamp(timestamp=None, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have a timestamp value.

__init__(timestamp=None, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

timestamp

A UNIX timestamp as a Decimal object.

Return type:Decimal
class eventsourcing.domain.model.events.EventWithOriginatorVersion(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an originator version number.

__init__(originator_version, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

originator_version

Originator version is the version of the object that originated this event.

Returns:A integer representing the version of the originator.
Return type:int
class eventsourcing.domain.model.events.EventWithTimeuuid(event_id=None, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an UUIDv1 event ID.

__init__(event_id=None, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

class eventsourcing.domain.model.events.Created(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Happens when something is created.

class eventsourcing.domain.model.events.AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Happens when the value of an attribute changes.

class eventsourcing.domain.model.events.Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Happens when something is discarded.

class eventsourcing.domain.model.events.Logged(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Happens when something is logged.

eventsourcing.domain.model.events.subscribe(handler, predicate=None)[source]

Adds ‘handler’ to list of event handlers to be called if ‘predicate’ is satisfied.

If predicate is None, the handler will be called whenever an event is published.

Parameters:
  • handler (callable) – Will be called when an event is published.
  • predicate (callable) – Conditions whether the handler will be called.
eventsourcing.domain.model.events.unsubscribe(handler, predicate=None)[source]

Removes ‘handler’ from list of event handlers to be called if ‘predicate’ is satisfied.

Parameters:
  • handler (callable) – Previously subscribed handler.
  • predicate (callable) – Previously subscribed predicate.
eventsourcing.domain.model.events.publish(event)[source]

Published given ‘event’ by calling subscribed event handlers with the given ‘event’, except those with predicates that are not satisfied by the event.

Handlers are called in the order they are subscribed.

Parameters:event (DomainEvent) – Domain event to be published.
exception eventsourcing.domain.model.events.EventHandlersNotEmptyError[source]

Bases: Exception

eventsourcing.domain.model.events.assert_event_handlers_empty()[source]

Raises EventHandlersNotEmptyError, unless there are no event handlers subscribed.

eventsourcing.domain.model.events.clear_event_handlers()[source]

Removes all previously subscribed event handlers.

entity

Base classes for domain model entities.

class eventsourcing.domain.model.entity.DomainEntity(id)[source]

Bases: object

Supertype for domain model entity.

class Event(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.events.DomainEvent

Supertype for events of domain model entities.

__check_obj__(obj: eventsourcing.domain.model.entity.DomainEntity)[source]

Checks state of obj before mutating.

Parameters:obj – Domain entity to be checked.
Raises:OriginatorIDError – if the originator_id is mismatched
classmethod __create__(originator_id=None, event_class=None, **kwargs)[source]

Creates a new domain entity.

Constructs a “created” event, constructs the entity object from the event, publishes the “created” event, and returns the new domain entity object.

Parameters:
  • originator_id – ID of the new domain entity (defaults to uuid4()).
  • event_class – Domain event class to be used for the “created” event.
  • kwargs – Other named attribute values of the “created” event.
Returns:

New domain entity object.

Return type:

DomainEntity

class Created(originator_topic, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.Created

Triggered when an entity is created.

originator_topic

Topic (a string) representing the class of the originating domain entity.

Return type:str
__mutate__(entity_class=None)[source]

Constructs object from an entity class, which is obtained by resolving the originator topic, unless it is given as method argument entity_class.

Parameters:entity_class – Class of domain entity to be constructed.
id

The immutable ID of the domain entity.

This value is set using the originator_id of the “created” event constructed by __create__().

An entity ID allows an instance to be referenced and distinguished from others, even though its state may change over time.

This attribute has the normal “public” format for a Python object attribute name, because by definition all domain entities have an ID.

__change_attribute__(name, value)[source]

Changes named attribute with the given value, by triggering an AttributeChanged event.

class AttributeChanged(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.AttributeChanged

Triggered when a named attribute is assigned a new value.

__mutate__(obj)[source]

Updates ‘obj’ with values from ‘self’.

Calls the ‘mutate()’ method.

Can be extended, but subclasses must call super and return an object to their caller.

Parameters:obj – object (normally a domain entity) to be mutated
Returns:mutated object
__discard__()[source]

Discards self, by triggering a Discarded event.

class Discarded(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.events.Discarded, eventsourcing.domain.model.entity.Event

Triggered when a DomainEntity is discarded.

__mutate__(obj)[source]

Updates ‘obj’ with values from ‘self’.

Calls the ‘mutate()’ method.

Can be extended, but subclasses must call super and return an object to their caller.

Parameters:obj – object (normally a domain entity) to be mutated
Returns:mutated object
__assert_not_discarded__()[source]

Raises exception if entity has been discarded already.

__trigger_event__(event_class, **kwargs)[source]

Constructs, applies, and publishes a domain event.

__publish__(event)[source]

Publishes given event for subscribers in the application.

Parameters:event – domain event or list of events
__publish_to_subscribers__(event)[source]

Actually dispatches given event to publish-subscribe mechanism.

Parameters:event – domain event or list of events
__eq__(other)[source]

Return self==value.

__ne__(other)[source]

Return self!=value.

class eventsourcing.domain.model.entity.EntityWithHashchain(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithHash, eventsourcing.domain.model.entity.Event

Supertype for events of domain entities.

__mutate__(obj)[source]

Updates ‘obj’ with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
__check_obj__(obj)[source]

Extends superclass method by checking the __previous_hash__ of this event matches the __head__ hash of the entity obj.

class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Created

__mutate__(entity_class=None)[source]

Updates ‘obj’ with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

__mutate__(obj)[source]

Updates ‘obj’ with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
classmethod __create__(*args, **kwargs)[source]

Creates a new domain entity.

Constructs a “created” event, constructs the entity object from the event, publishes the “created” event, and returns the new domain entity object.

Parameters:
  • originator_id – ID of the new domain entity (defaults to uuid4()).
  • event_class – Domain event class to be used for the “created” event.
  • kwargs – Other named attribute values of the “created” event.
Returns:

New domain entity object.

Return type:

DomainEntity

__trigger_event__(event_class, **kwargs)[source]

Constructs, applies, and publishes a domain event.

class eventsourcing.domain.model.entity.VersionedEntity(__version__=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

__trigger_event__(event_class, **kwargs)[source]

Triggers domain event with entity’s next version number.

The event carries the version number that the originator will have when the originator is mutated with this event. (The event’s originator version isn’t the version of the originator that triggered the event. The Created event has version 0, and so a newly created instance is at version 0. The second event has originator version 1, and so will the originator when the second event has been applied.)

class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithOriginatorVersion, eventsourcing.domain.model.entity.Event

Supertype for events of versioned entities.

__mutate__(obj)[source]

Updates ‘obj’ with values from ‘self’.

Calls the ‘mutate()’ method.

Can be extended, but subclasses must call super and return an object to their caller.

Parameters:obj – object (normally a domain entity) to be mutated
Returns:mutated object
__check_obj__(obj)[source]

Extends superclass method by checking the event’s originator version follows (1 +) this entity’s version.

class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a VersionedEntity is created.

class AttributeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when a VersionedEntity is changed.

class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

Published when a VersionedEntity is discarded.

class eventsourcing.domain.model.entity.TimestampedEntity(__created_on__=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class Event(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.EventWithTimestamp

Supertype for events of timestamped entities.

__mutate__(obj)[source]

Updates ‘obj’ with values from self.

class Created(originator_topic, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a TimestampedEntity is created.

class AttributeChanged(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when a TimestampedEntity is changed.

class Discarded(originator_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

Published when a TimestampedEntity is discarded.

class eventsourcing.domain.model.entity.TimeuuidedEntity(event_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class eventsourcing.domain.model.entity.TimestampedVersionedEntity(__created_on__=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedEntity, eventsourcing.domain.model.entity.VersionedEntity

class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Event

Supertype for events of timestamped, versioned entities.

class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a TimestampedVersionedEntity is created.

class AttributeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged, eventsourcing.domain.model.entity.AttributeChanged

Published when a TimestampedVersionedEntity is created.

class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded, eventsourcing.domain.model.entity.Discarded

Published when a TimestampedVersionedEntity is discarded.

class eventsourcing.domain.model.entity.TimeuuidedVersionedEntity(event_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimeuuidedEntity, eventsourcing.domain.model.entity.VersionedEntity

aggregate

Base classes for aggregates in a domain driven design.

class eventsourcing.domain.model.aggregate.BaseAggregateRoot(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

Root entity for an aggregate in a domain driven design.

class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Supertype for base aggregate root events.

class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Created

Triggered when an aggregate root is created.

class AttributeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.AttributeChanged

Triggered when an aggregate root attribute is changed.

class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Discarded

Triggered when an aggregate root is discarded.

__init__(**kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__publish__(event)[source]

Overrides super method by adding event to internal collection of pending events, rather than actually publishing the event.

__save__()[source]

Actually publishes all pending events.

class eventsourcing.domain.model.aggregate.AggregateRootWithHashchainedEvents(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.EntityWithHashchain, eventsourcing.domain.model.aggregate.BaseAggregateRoot

Extends aggregate root base class with hash-chained events.

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.aggregate.Event

Supertype for aggregate events.

class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.aggregate.Created

Triggered when an aggregate root is created.

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.AttributeChanged

Triggered when an aggregate root attribute is changed.

class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Discarded, eventsourcing.domain.model.aggregate.Discarded

Triggered when an aggregate root is discarded.

class eventsourcing.domain.model.aggregate.AggregateRoot(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.AggregateRootWithHashchainedEvents

Original name for aggregate root base class with hash-chained events.

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event

Supertype for aggregate events.

class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.Created

Triggered when an aggregate root is created.

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.AttributeChanged

Triggered when an aggregate root attribute is changed.

class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.Discarded

Triggered when an aggregate root is discarded.

command

Commands as aggregates.

class eventsourcing.domain.model.command.Command(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.AggregateRoot

__init__(**kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event

class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.command.Event, eventsourcing.domain.model.aggregate.Created

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.command.Event, eventsourcing.domain.model.aggregate.AttributeChanged

class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.command.Event, eventsourcing.domain.model.aggregate.Discarded

class Done(**kwargs)[source]

Bases: eventsourcing.domain.model.command.Event

mutate(obj)[source]

Updates (“mutates”) given ‘obj’.

Intended to be overridden by subclasses, as the most concise way of coding a default projection of the event (for example into the state of a domain entity).

The advantage of implementing a default projection using this method rather than __mutate__ is that you don’t need to call super or return a value.

Parameters:obj – domain entity to be mutated

decorator

Decorators useful in domain models based on the classes in this library.

eventsourcing.domain.model.decorators.subscribe_to(*event_classes)[source]

Decorator for making a custom event handler function subscribe to a certain class of event.

The decorated function will be called once for each matching event that is published, and will be given one argument, the event, when it is called. If events are published in lists, for example the AggregateRoot publishes a list of pending events when its __save__() method is called, then the decorated function will be called once for each event that is an instance of the given event_class.

Please note, this decorator isn’t suitable for use with object class methods. The decorator receives in Python 3 an unbound function, and defines a handler which it subscribes that calls the decorated function for each matching event. However the method isn’t called on the object, so the object instance is never available in the decorator, so the decorator can’t call a normal object method because it doesn’t have a value for ‘self’.

event_class: type used to match published events, an event matches if it is an instance of this type

The following example shows a custom handler that reacts to Todo.Created event and saves a projection of a Todo model object.

@subscribe_to(Todo.Created)
def new_todo_projection(event):
    todo = TodoProjection(id=event.originator_id, title=event.title)
    todo.save()
eventsourcing.domain.model.decorators.mutator(arg=None)[source]

Structures mutator functions by allowing handlers to be registered for different types of event. When the decorated function is called with an initial value and an event, it will call the handler that has been registered for that type of event.

It works like singledispatch, which it uses. The difference is that when the decorated function is called, this decorator dispatches according to the type of last call arg, which fits better with reduce(). The builtin Python function reduce() is used by the library to replay a sequence of events against an initial state. If a mutator function is given to reduce(), along with a list of events and an initializer, reduce() will call the mutator function once for each event in the list, but the initializer will be the first value, and the event will be the last argument, and we want to dispatch according to the type of the event. It happens that singledispatch is coded to switch on the type of the first argument, which makes it unsuitable for structuring a mutator function without the modifications introduced here.

The other aspect introduced by this decorator function is the option to set the type of the handled entity in the decorator. When an entity is replayed from scratch, in other words when all its events are replayed, the initial state is None. The handler which handles the first event in the sequence will probably construct an object instance. It is possible to write the type into the handler, but that makes the entity more difficult to subclass because you will also need to write a handler for it. If the decorator is invoked with the type, when the initial value passed as a call arg to the mutator function is None, the handler will instead receive the type of the entity, which it can use to construct the entity object.

class Entity(object):
    class Created(object):
        pass

@mutator(Entity)
def mutate(initial, event):
    raise NotImplementedError(type(event))

@mutate.register(Entity.Created)
def _(initial, event):
    return initial(**event.__dict__)

entity = mutate(None, Entity.Created())
eventsourcing.domain.model.decorators.attribute(getter)[source]

When used as a method decorator, returns a property object with the method as the getter and a setter defined to call instance method __change_attribute__(), which publishes an AttributeChanged event.

eventsourcing.domain.model.decorators.retry(exc=<class 'Exception'>, max_attempts=1, wait=0, stall=0, verbose=False)[source]

Retry decorator.

Parameters:
  • exc – List of exceptions that will cause the call to be retried if raised.
  • max_attempts – Maximum number of attempts to try.
  • wait – Amount of time to wait before retrying after an exception.
  • stall – Amount of time to wait before the first attempt.
  • verbose – If True, prints a message to STDOUT when retries occur.
Returns:

Returns the value returned by decorated function.

snapshot

Snapshotting is implemented in the domain layer as an event.

class eventsourcing.domain.model.snapshot.AbstractSnapshop[source]

Bases: abc.ABC

topic

Path to the class of the snapshotted entity.

state

State of the snapshotted entity.

originator_id

ID of the snapshotted entity.

originator_version

Version of the last event applied to the entity.

class eventsourcing.domain.model.snapshot.Snapshot(originator_id, originator_version, topic, state)[source]

Bases: eventsourcing.domain.model.events.EventWithTimestamp, eventsourcing.domain.model.events.EventWithOriginatorVersion, eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.snapshot.AbstractSnapshop

__init__(originator_id, originator_version, topic, state)[source]

Initialises event attribute values directly from constructor kwargs.

topic

Path to the class of the snapshotted entity.

state

State of the snapshotted entity.

timebucketedlog

Time-bucketed logs allow a sequence of the items that is sequenced by timestamp to be split across a number of different database partitions, which avoids one partition becoming very large (and then unworkable).

class eventsourcing.domain.model.timebucketedlog.Timebucketedlog(name, bucket_size=None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Supertype for events of time-bucketed log.

class Started(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.timebucketedlog.Event

class BucketSizeChanged(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.timebucketedlog.Event, eventsourcing.domain.model.entity.AttributeChanged

__init__(name, bucket_size=None, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.domain.model.timebucketedlog.MessageLogged(message, originator_id)[source]

Bases: eventsourcing.domain.model.events.EventWithTimestamp, eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.events.Logged

__init__(message, originator_id)[source]

Initialises event attribute values directly from constructor kwargs.

collection

Collections.

class eventsourcing.domain.model.collection.Collection(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

class Event(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Supertype for events of collection entities.

class Created(originator_version=0, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.Event, eventsourcing.domain.model.entity.Created

Published when collection is created.

class Discarded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.Event, eventsourcing.domain.model.entity.Discarded

Published when collection is discarded.

class EventWithItem(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.Event

class ItemAdded(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.EventWithItem

__mutate__(obj)[source]

Updates ‘obj’ with values from self.

class ItemRemoved(originator_version, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.EventWithItem

__mutate__(obj)[source]

Updates ‘obj’ with values from self.

__init__(**kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

array

A kind of collection, indexed by integer. Doesn’t need to replay all events to exist.

class eventsourcing.domain.model.array.ItemAssigned(item, index, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Occurs when an item is set at a position in an array.

__init__(item, index, *args, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

class eventsourcing.domain.model.array.BigArray(array_id, repo)[source]

Bases: eventsourcing.domain.model.array.Array

A virtual array holding items in indexed positions, across a number of Array instances.

Getting and setting items at index position is supported. Slices are supported, and operate across the underlying arrays. Appending is also supported.

BigArray is designed to overcome the concern of needing a single large sequence that may not be suitably stored in any single partiton. In simple terms, if events of an aggregate can fit in a partition, we can use the same size partition to make a tree of arrays that will certainly be capable of sequencing all the events of the application in a single stream.

With normal size base arrays, enterprise applications can expect read and write time to be approximately constant with respect to the number of items in the array.

The array is composed of a tree of arrays, which gives the capacity equal to the size of each array to the power of the size of each array. If the arrays are limited to be about the maximum size of an aggregate event stream (a large number but not too many that would cause there to be too much data in any one partition, let’s say 1000s to be safe) then it would be possible to fit such a large number of aggregates in the corresponding BigArray, that we can be confident it would be full.

Write access time in the worst case, and the time to identify the index of the last item in the big array, is proportional to the log of the highest assigned index to base the underlying array size. Write time on average, and read time given an index, is constant with respect to the number of items in a BigArray.

Items can be appended in log time in a single thread. However, the time between reading the current last index and claiming the next position leads to contention and retries when there are lots of threads of execution all attempting to append items, which inherently limits throughput.

Todo: Not possible in Cassandra, but maybe do it in a transaction in SQLAlchemy?

An alternative to reading the last item before writing the next is to use an integer sequence generator to generate a stream of integers. Items can be assigned to index positions in a big array, according to the integers that are issued. Throughput will then be much better, and will be limited only by the rate at which the database can have events written to it (unless the number generator is quite slow).

An external integer sequence generator, such as Redis’ INCR command, or an auto-incrementing database column, may constitute a single point of failure.

__init__(array_id, repo)[source]

Initialize self. See help(type(self)) for accurate signature.

get_last_array()[source]

Returns last array in compound.

Return type:CompoundSequenceReader
__getitem__(item)[source]

Returns item at index, or items in slice.

__setitem__(position, item)[source]

Sets item in array, at given index.

Won’t overrun the end of the array, because the position is fixed to be less than base_size.

__len__()[source]

Returns length of array.

calc_parent(i, j, h)[source]

Returns get_big_array and end of span of parent sequence that contains given child.

class eventsourcing.domain.model.array.AbstractArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

Repository for sequence objects.

__init__(array_size=10000, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__getitem__(array_id)[source]

Returns sequence for given ID.

class eventsourcing.domain.model.array.AbstractBigArrayRepository[source]

Bases: eventsourcing.domain.model.entity.AbstractEntityRepository

Repository for compound sequence objects.

subrepo

Sub-sequence repository.

__getitem__(array_id)[source]

Returns sequence for given ID.