domain.model

The domain model package contains classes and functions that can help develop an event sourced domain model.

events

Base classes for domain events of different kinds.

class eventsourcing.domain.model.events.DomainEvent(**kwargs)[source]

Bases: eventsourcing.domain.model.versioning.Upcastable, eventsourcing.whitehead.ActualOccasion, typing.Generic

Base class for domain model events.

Implements methods to make instances read-only, comparable for equality in Python, and have recognisable representations.Custom To make domain events hashable, this class also implements a method to create a cryptographic hash of the state of the event.

__init__(**kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__repr__() → str[source]

Creates a string representing the type and attribute values of the event.

Return type:str
__mutate__(obj: Optional[TEntity]) → Optional[TEntity][source]

Updates ‘obj’ with values from ‘self’.

Calls the ‘mutate()’ method.

Can be extended, but subclasses must call super and return an object to their caller.

Parameters:obj – object (normally a domain entity) to be mutated
Returns:mutated object
mutate(obj: TEntity) → None[source]

Updates (“mutates”) given ‘obj’.

Intended to be overridden by subclasses, as the most concise way of coding a default projection of the event (for example into the state of a domain entity).

The advantage of implementing a default projection using this method rather than __mutate__ is that you don’t need to call super or return a value.

Parameters:obj – domain entity to be mutated
__setattr__(key: Any, value: Any) → None[source]

Inhibits event attributes from being updated by assignment.

__eq__(other: object) → bool[source]

Tests for equality of two event objects.

Return type:bool
__ne__(other: object) → bool[source]

Negates the equality test.

Return type:bool
__hash__() → int[source]

Computes a Python integer hash for an event.

Supports Python equality and inequality comparisons.

Returns:Python integer hash
Return type:int
classmethod __hash_object_v2__(obj: dict) → str[source]

Calculates SHA-256 hash of JSON encoded ‘obj’.

Parameters:obj – Object to be hashed.
Returns:SHA-256 as hexadecimal string.
Return type:str
classmethod __hash_object_v1__(obj: dict) → str[source]

Calculates SHA-256 hash of JSON encoded ‘obj’.

Parameters:obj – Object to be hashed.
Returns:SHA-256 as hexadecimal string.
Return type:str
class eventsourcing.domain.model.events.EventWithHash(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Base class for domain events with a cryptographic event hash.

Extends DomainEvent by setting a cryptographic event hash when the event is originated, and checking the event hash whenever its default projection mutates an object.

__init__(**kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

__event_hash__

Returns SHA-256 hash of the original state of the event.

Returns:SHA-256 as hexadecimal string.
Return type:str
__hash__() → int[source]

Computes a Python integer hash for an event, using its pre-computed event hash.

Supports Python equality and inequality comparisons only.

Returns:Python integer hash
Return type:int
__mutate__(obj: Optional[TEntity]) → Optional[TEntity][source]

Updates ‘obj’ with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
__check_hash__() → None[source]

Raises EventHashError, unless self.__event_hash__ can be derived from the current state of the event object.

class eventsourcing.domain.model.events.EventWithOriginatorID(originator_id: uuid.UUID, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an originator ID.

__init__(originator_id: uuid.UUID, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

originator_id

Originator ID is the identity of the object that originated this event.

Returns:A UUID representing the identity of the originator.
Return type:UUID
class eventsourcing.domain.model.events.EventWithTimestamp(timestamp: Optional[decimal.Decimal] = None, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have a timestamp value.

__init__(timestamp: Optional[decimal.Decimal] = None, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

timestamp

A UNIX timestamp as a Decimal object.

class eventsourcing.domain.model.events.EventWithOriginatorVersion(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an originator version number.

__init__(originator_version: int, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

originator_version

Originator version is the version of the object that originated this event.

Returns:A integer representing the version of the originator.
class eventsourcing.domain.model.events.EventWithTimeuuid(event_id: Optional[uuid.UUID] = None, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

For events that have an UUIDv1 event ID.

__init__(event_id: Optional[uuid.UUID] = None, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

class eventsourcing.domain.model.events.CreatedEvent(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Happens when something is created.

class eventsourcing.domain.model.events.AttributeChangedEvent(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Happens when the value of an attribute changes.

class eventsourcing.domain.model.events.DiscardedEvent(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Happens when something is discarded.

class eventsourcing.domain.model.events.LoggedEvent(**kwargs)[source]

Bases: eventsourcing.domain.model.events.DomainEvent

Happens when something is logged.

eventsourcing.domain.model.events.subscribe(handler: Callable[[Sequence[TEvent]], None], predicate: Optional[Callable[[Sequence[TEvent]], bool]] = None) → None[source]

Adds ‘handler’ to list of event handlers to be called if ‘predicate’ is satisfied.

If predicate is None, the handler will be called whenever an event is published.

Parameters:
  • handler (callable) – Will be called when an event is published.
  • predicate (callable) – Conditions whether the handler will be called.
eventsourcing.domain.model.events.unsubscribe(handler: Callable[[Sequence[TEvent]], None], predicate: Optional[Callable[[Sequence[TEvent]], bool]] = None) → None[source]

Removes ‘handler’ from list of event handlers to be called if ‘predicate’ is satisfied.

Parameters:
  • handler (callable) – Previously subscribed handler.
  • predicate (callable) – Previously subscribed predicate.
eventsourcing.domain.model.events.publish(events: Sequence[TEvent]) → None[source]

Published given ‘event’ by calling subscribed event handlers with the given ‘event’, except those with predicates that are not satisfied by the event.

Handlers are called in the order they are subscribed.

Parameters:events (DomainEvent) – Domain event to be published.
exception eventsourcing.domain.model.events.EventHandlersNotEmptyError[source]

Bases: Exception

eventsourcing.domain.model.events.assert_event_handlers_empty() → None[source]

Raises EventHandlersNotEmptyError, unless there are no event handlers subscribed.

eventsourcing.domain.model.events.clear_event_handlers() → None[source]

Removes all previously subscribed event handlers.

class eventsourcing.domain.model.events.AbstractSnapshot[source]

Bases: eventsourcing.whitehead.ActualOccasion

topic

Path to the class of the snapshotted entity.

state

State of the snapshotted entity.

originator_id

ID of the snapshotted entity.

originator_version

Version of the last event applied to the entity.

__mutate__(obj: Optional[TEntity]) → Optional[TEntity][source]

Reconstructs the snapshotted entity.

entity

Base classes for domain model entities.

class eventsourcing.domain.model.entity.MetaDomainEntity(name: str, *args, **kwargs)[source]

Bases: abc.ABCMeta

class eventsourcing.domain.model.entity.DomainEntity(id: uuid.UUID)[source]

Bases: eventsourcing.domain.model.versioning.Upcastable, eventsourcing.whitehead.EnduringObject

Supertype for domain model entity.

class Event(originator_id: uuid.UUID, **kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithOriginatorID

Supertype for events of domain model entities.

__check_obj__(obj: TDomainEntity) → None[source]

Checks state of obj before mutating.

Parameters:obj – Domain entity to be checked.
Raises:OriginatorIDError – if the originator_id is mismatched
classmethod __create__(originator_id: Optional[uuid.UUID] = None, event_class: Optional[Type[DomainEntity.Created[TDomainEntity]]] = None, **kwargs) → TDomainEntity[source]

Creates a new domain entity.

Constructs a “created” event, constructs the entity object from the event, publishes the “created” event, and returns the new domain entity object.

Parameters:
  • DomainEntity (cls) – Class of domain event
  • originator_id – ID of the new domain entity (defaults to uuid4()).
  • event_class – Domain event class to be used for the “created” event.
  • kwargs – Other named attribute values of the “created” event.
Returns:

New domain entity object.

Return type:

DomainEntity

class Created(originator_topic: str, **kwargs)[source]

Bases: eventsourcing.domain.model.events.CreatedEvent, eventsourcing.domain.model.entity.Event

Triggered when an entity is created.

originator_topic

Topic (a string) representing the class of the originating domain entity.

Return type:str
__mutate__(obj: Optional[TDomainEntity]) → Optional[TDomainEntity][source]

Constructs object from an entity class, which is obtained by resolving the originator topic, unless it is given as method argument entity_class.

Parameters:entity_class – Class of domain entity to be constructed.
id

The immutable ID of the domain entity.

This value is set using the originator_id of the “created” event constructed by __create__().

An entity ID allows an instance to be referenced and distinguished from others, even though its state may change over time.

This attribute has the normal “public” format for a Python object attribute name, because by definition all domain entities have an ID.

__change_attribute__(name: str, value: Any, **kwargs) → None[source]

Changes named attribute with the given value, by triggering an AttributeChanged event.

class AttributeChanged(originator_id: uuid.UUID, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.AttributeChangedEvent

Triggered when a named attribute is assigned a new value.

__mutate__(obj: Optional[TDomainEntity]) → Optional[TDomainEntity][source]

Updates ‘obj’ with values from ‘self’.

Calls the ‘mutate()’ method.

Can be extended, but subclasses must call super and return an object to their caller.

Parameters:obj – object (normally a domain entity) to be mutated
Returns:mutated object
__discard__(**kwargs) → None[source]

Discards self, by triggering a Discarded event.

class Discarded(originator_id: uuid.UUID, **kwargs)[source]

Bases: eventsourcing.domain.model.events.DiscardedEvent, eventsourcing.domain.model.entity.Event

Triggered when a DomainEntity is discarded.

__mutate__(obj: Optional[TDomainEntity]) → Optional[TDomainEntity][source]

Updates ‘obj’ with values from ‘self’.

Calls the ‘mutate()’ method.

Can be extended, but subclasses must call super and return an object to their caller.

Parameters:obj – object (normally a domain entity) to be mutated
Returns:mutated object
__assert_not_discarded__() → None[source]

Asserts that this entity has not been discarded.

Raises EntityIsDiscarded exception if entity has been discarded already.

__trigger_event__(event_class: Type[TDomainEvent], **kwargs) → None[source]

Constructs, applies, and publishes a domain event.

__mutate__(event: TDomainEvent) → None[source]

Mutates this entity with the given event.

This method calls on the event object to mutate this entity, because the mutation behaviour of different types of events was usefully factored onto the event classes, and the event mutate() method is the most convenient way to defined behaviour in domain models.

However, as an alternative to implementing the mutate() method on domain model events, this method can be extended with a method that is capable of mutating an entity for all the domain event classes introduced by the entity class.

Similarly, this method can be overridden entirely in subclasses, so long as all of the mutation behaviour is implemented in the mutator function, including the mutation behaviour of the events defined on the library event classes that would no longer be invoked.

However, if the entity class defines a mutator function, or if a separate mutator function is used, then it must be involved in the event sourced repository used to replay events, which by default knows nothing about the domain entity class. In practice, this means having a repository for each kind of entity, rather than the application just having one repository, with each repository having a mutator function that can project the entity events into an entity.

__publish__(event: Sequence[TDomainEvent]) → None[source]

Publishes given event for subscribers in the application.

Parameters:event – domain event or list of events
__publish_to_subscribers__(events: Sequence[TDomainEvent]) → None[source]

Actually dispatches given event to publish-subscribe mechanism.

Parameters:events – list of domain events
__eq__(other: object) → bool[source]

Return self==value.

__ne__(other: object) → bool[source]

Return self!=value.

class eventsourcing.domain.model.entity.EntityWithHashchain(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithHash, eventsourcing.domain.model.entity.Event

Supertype for events of domain entities.

__mutate__(obj: Optional[TEntityWithHashchain]) → Optional[TEntityWithHashchain][source]

Updates ‘obj’ with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
__check_obj__(obj: TEntityWithHashchain) → None[source]

Extends superclass method by checking the __previous_hash__ of this event matches the __head__ hash of the entity obj.

class Created(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Created

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

__mutate__(obj: Optional[TEntityWithHashchain]) → Optional[TEntityWithHashchain][source]

Updates ‘obj’ with values from self.

Can be extended, but subclasses must call super method, and return an object.

Parameters:obj – object to be mutated
Returns:mutated object
classmethod __create__(originator_id: Optional[uuid.UUID] = None, event_class: Optional[Type[DomainEntity.Created[TEntityWithHashchain]]] = None, **kwargs) → TEntityWithHashchain[source]

Creates a new domain entity.

Constructs a “created” event, constructs the entity object from the event, publishes the “created” event, and returns the new domain entity object.

Parameters:
  • DomainEntity (cls) – Class of domain event
  • originator_id – ID of the new domain entity (defaults to uuid4()).
  • event_class – Domain event class to be used for the “created” event.
  • kwargs – Other named attribute values of the “created” event.
Returns:

New domain entity object.

Return type:

DomainEntity

__trigger_event__(event_class: Type[TDomainEvent], **kwargs) → None[source]

Constructs, applies, and publishes a domain event.

class eventsourcing.domain.model.entity.VersionedEntity(__version__: int, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

__trigger_event__(event_class: Type[TDomainEvent], **kwargs) → None[source]

Increments the version number when an event is triggered.

The event carries the version number that the originator will have when the originator is mutated with this event. (The event’s “originator” version isn’t the version of the originator before the event was triggered, but represents the result of the work of incrementing the version, which is then set in the event as normal. The Created event has version 0, and a newly created instance is at version 0. The second event has originator version 1, and so will the originator when the second event has been applied.

class Event(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.events.EventWithOriginatorVersion, eventsourcing.domain.model.entity.Event

Supertype for events of versioned entities.

__mutate__(obj: Optional[TVersionedEntity]) → Optional[TVersionedEntity][source]

Updates ‘obj’ with values from ‘self’.

Calls the ‘mutate()’ method.

Can be extended, but subclasses must call super and return an object to their caller.

Parameters:obj – object (normally a domain entity) to be mutated
Returns:mutated object
__check_obj__(obj: TVersionedEntity) → None[source]

Extends superclass method by checking the event’s originator version follows (1 +) this entity’s version.

class Created(originator_version: int = 0, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a VersionedEntity is created.

class AttributeChanged(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when a VersionedEntity is changed.

class Discarded(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

Published when a VersionedEntity is discarded.

class eventsourcing.domain.model.entity.EntityWithECC(*, event_id, correlation_id, causation_id, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

Entity whose events have event ID, correlation ID, and causation ID.

class Event(*, processed_event=None, application_name, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

class Created(originator_topic: str, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

class AttributeChanged(*, processed_event=None, application_name, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

class Discarded(*, processed_event=None, application_name, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

class eventsourcing.domain.model.entity.TimestampedEntity(__created_on__: decimal.Decimal, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class Event(originator_id: uuid.UUID, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.events.EventWithTimestamp

Supertype for events of timestamped entities.

__mutate__(obj: Optional[TTimestampedEntity]) → Optional[TTimestampedEntity][source]

Updates ‘obj’ with values from self.

class Created(originator_topic: str, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a TimestampedEntity is created.

class AttributeChanged(originator_id: uuid.UUID, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged

Published when a TimestampedEntity is changed.

class Discarded(originator_id: uuid.UUID, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded

Published when a TimestampedEntity is discarded.

class eventsourcing.domain.model.entity.TimeuuidedEntity(event_id: uuid.UUID, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.DomainEntity

class eventsourcing.domain.model.entity.TimestampedVersionedEntity(__created_on__: decimal.Decimal, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedEntity, eventsourcing.domain.model.entity.VersionedEntity

class Event(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Event

Supertype for events of timestamped, versioned entities.

class Created(originator_version: int = 0, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.entity.Event

Published when a TimestampedVersionedEntity is created.

class AttributeChanged(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.AttributeChanged, eventsourcing.domain.model.entity.AttributeChanged

Published when a TimestampedVersionedEntity is created.

class Discarded(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.entity.Discarded, eventsourcing.domain.model.entity.Discarded

Published when a TimestampedVersionedEntity is discarded.

class eventsourcing.domain.model.entity.TimeuuidedVersionedEntity(event_id: uuid.UUID, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimeuuidedEntity, eventsourcing.domain.model.entity.VersionedEntity

aggregate

Base classes for aggregates in a domain driven design.

class eventsourcing.domain.model.aggregate.BaseAggregateRoot(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity, typing.Generic

Root entity for an aggregate in a domain driven design.

class Event(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Supertype for base aggregate root events.

class Created(originator_version: int = 0, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.aggregate.Event

Triggered when an aggregate root is created.

class AttributeChanged(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.AttributeChanged

Triggered when an aggregate root attribute is changed.

class Discarded(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Discarded

Triggered when an aggregate root is discarded.

__init__(**kwargs) → None[source]

Initialize self. See help(type(self)) for accurate signature.

__publish__(event: Sequence[TDomainEvent]) → None[source]

Defers publishing event(s) to subscribers, by adding event to internal collection of pending events.

__save__() → None[source]

Publishes all pending events to subscribers.

class eventsourcing.domain.model.aggregate.AggregateRootWithHashchainedEvents(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.EntityWithHashchain, eventsourcing.domain.model.aggregate.BaseAggregateRoot

Extends aggregate root base class with hash-chained events.

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event, eventsourcing.domain.model.aggregate.Event

Supertype for aggregate events.

class Created(originator_version: int = 0, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.aggregate.Created, eventsourcing.domain.model.aggregate.Event

Triggered when an aggregate root is created.

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.AttributeChanged

Triggered when an aggregate root attribute is changed.

class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.entity.Discarded, eventsourcing.domain.model.aggregate.Discarded

Triggered when an aggregate root is discarded.

class eventsourcing.domain.model.aggregate.AggregateRoot(*args, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.AggregateRootWithHashchainedEvents

Original name for aggregate root base class with hash-chained events.

class Event(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event

Supertype for aggregate events.

class Created(originator_version: int = 0, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.Created

Triggered when an aggregate root is created.

class AttributeChanged(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.AttributeChanged

Triggered when an aggregate root attribute is changed.

class Discarded(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event, eventsourcing.domain.model.aggregate.Discarded

Triggered when an aggregate root is discarded.

command

Commands as aggregates.

class eventsourcing.domain.model.command.Command(**kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.BaseAggregateRoot

__init__(**kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

class Event(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.aggregate.Event

class Created(originator_version: int = 0, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.command.Event, eventsourcing.domain.model.aggregate.Created

class AttributeChanged(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.command.Event, eventsourcing.domain.model.aggregate.AttributeChanged

class Discarded(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.command.Event, eventsourcing.domain.model.aggregate.Discarded

class Done(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.command.Event

mutate(obj: eventsourcing.domain.model.command.Command) → None[source]

Updates (“mutates”) given ‘obj’.

Intended to be overridden by subclasses, as the most concise way of coding a default projection of the event (for example into the state of a domain entity).

The advantage of implementing a default projection using this method rather than __mutate__ is that you don’t need to call super or return a value.

Parameters:obj – domain entity to be mutated

decorators

Decorators useful in domain models based on the classes in this library.

eventsourcing.domain.model.decorators.subscribe_to(*args) → Callable[source]

Decorator for making a custom event handler function subscribe to a certain class of event.

The decorated function will be called once for each matching event that is published, and will be given one argument, the event, when it is called. If events are published in lists, for example the AggregateRoot publishes a list of pending events when its __save__() method is called, then the decorated function will be called once for each event that is an instance of the given event_class.

Please note, this decorator isn’t suitable for use with object class methods. The decorator receives in Python 3 an unbound function, and defines a handler which it subscribes that calls the decorated function for each matching event. However the method isn’t called on the object, so the object instance is never available in the decorator, so the decorator can’t call a normal object method because it doesn’t have a value for ‘self’.

Parameters:event_class – type used to match published events, an event matches if it is an instance of this type.

The following example shows a custom handler that reacts to Todo.Created event and saves a projection of a Todo model object.

@subscribe_to(Todo.Created)
def new_todo_projection(event):
    todo = TodoProjection(id=event.originator_id, title=event.title)
    todo.save()
eventsourcing.domain.model.decorators.mutator(arg: Optional[Callable] = None) → Callable[source]

Structures mutator functions by allowing handlers to be registered for different types of event. When the decorated function is called with an initial value and an event, it will call the handler that has been registered for that type of event.

It works like singledispatch, which it uses. The difference is that when the decorated function is called, this decorator dispatches according to the type of last call arg, which fits better with reduce(). The builtin Python function reduce() is used by the library to replay a sequence of events against an initial state. If a mutator function is given to reduce(), along with a list of events and an initializer, reduce() will call the mutator function once for each event in the list, but the initializer will be the first value, and the event will be the last argument, and we want to dispatch according to the type of the event. It happens that singledispatch is coded to switch on the type of the first argument, which makes it unsuitable for structuring a mutator function without the modifications introduced here.

The other aspect introduced by this decorator function is the option to set the type of the handled entity in the decorator. When an entity is replayed from scratch, in other words when all its events are replayed, the initial state is None. The handler which handles the first event in the sequence will probably construct an object instance. It is possible to write the type into the handler, but that makes the entity more difficult to subclass because you will also need to write a handler for it. If the decorator is invoked with the type, when the initial value passed as a call arg to the mutator function is None, the handler will instead receive the type of the entity, which it can use to construct the entity object.

class Entity(object):
    class Created(object):
        pass

@mutator(Entity)
def mutate(initial, event):
    raise NotImplementedError(type(event))

@mutate.register(Entity.Created)
def _(initial, event):
    return initial(**event.__dict__)

entity = mutate(None, Entity.Created())
eventsourcing.domain.model.decorators.attribute(getter: Callable) → property[source]

When used as a method decorator, returns a property object with the method as the getter and a setter defined to call instance method __change_attribute__(), which publishes an AttributeChanged event.

eventsourcing.domain.model.decorators.retry(exc: Union[Type[Exception], Sequence[Type[Exception]]] = <class 'Exception'>, max_attempts: int = 1, wait: float = 0, stall: float = 0, verbose: bool = False) → Callable[source]

Retry decorator.

Parameters:
  • exc – List of exceptions that will cause the call to be retried if raised.
  • max_attempts – Maximum number of attempts to try.
  • wait – Amount of time to wait before retrying after an exception.
  • stall – Amount of time to wait before the first attempt.
  • verbose – If True, prints a message to STDOUT when retries occur.
Returns:

Returns the value returned by decorated function.

eventsourcing.domain.model.decorators.subclassevents(cls: type) → type[source]

Decorator that avoids “boilerplate” subclassing of domain events.

For example, with this decorator you can do this:

@subclassevents
class Example(AggregateRoot):
    class SomethingHappened(DomainEvent): pass

rather than this:

class Example(AggregateRoot):
    class Event(AggregateRoot.Event): pass
    class Created(Event, AggregateRoot.Created): pass
    class Discarded(Event, AggregateRoot.Discarded): pass
    class AttributeChanged(Event, AggregateRoot.AttributeChanged): pass
    class SomethingHappened(Event): pass

You can apply this to a tree of domain event classes by defining the base class with attribute ‘subclassevents = True’.

snapshot

Snapshotting is implemented in the domain layer as an event.

class eventsourcing.domain.model.snapshot.Snapshot(originator_id: uuid.UUID, originator_version: int, topic: str, state: Optional[Dict[KT, VT]])[source]

Bases: eventsourcing.domain.model.events.EventWithTimestamp, eventsourcing.domain.model.events.EventWithOriginatorVersion, eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.events.AbstractSnapshot

__init__(originator_id: uuid.UUID, originator_version: int, topic: str, state: Optional[Dict[KT, VT]])[source]

Initialises event attribute values directly from constructor kwargs.

topic

Path to the class of the snapshotted entity.

state

State of the snapshotted entity.

__mutate__(obj: Optional[TEntity]) → Optional[TEntity][source]

Updates ‘obj’ with values from ‘self’.

Calls the ‘mutate()’ method.

Can be extended, but subclasses must call super and return an object to their caller.

Parameters:obj – object (normally a domain entity) to be mutated
Returns:mutated object

versioning

Support for upcasting the state of older version of domain events is implemented in base class Upcastable.

class eventsourcing.domain.model.versioning.Upcastable[source]

Bases: eventsourcing.whitehead.Event

For things that are upcastable.

http://code.fed.wiki.org/view/wyatt-software/view/remote-database-schema-migration

“I was sure that we could not get the schema for WyCash Plus right on the first try. I was familiar with Smaltalk-80’s object migration mechanisms. I designed a version that could serve us in a commercial software distribution environment.

“I chose to version each class independently and write that as a sequential integer in the serialized versions. Objects would be mutated to the current version on read. We supported all versions we ever had forever.

“I recorded mutation vectors for each version to the present. These could add, remove and reorder fields within an object. One-off mutation methods handled the rare case where the vectors were not enough description.

“We shipped migrations to our customers with each release to be performed on their own machines when needed without any intervention.”

Ward Cunningham

__init__()[source]

Initialize self. See help(type(self)) for accurate signature.

classmethod __upcast_state__(obj_state: Dict[KT, VT]) → Dict[KT, VT][source]

Upcasts obj_state from the version of the class when the object state was recorded, to be compatible with current version of the class.

classmethod __upcast__(obj_state: Dict[KT, VT], class_version: int) → Dict[KT, VT][source]

Must be overridden in domain event classes that set class attribute ‘__class_version__’ to a positive value.

This method is expected to upcast obj_state from method arg ‘class_version’ to the next version, and return obj_state.

One style of implementation is an if-else block, in which the conditional expressions match on the value of ‘class_version’.

The implementation of this method must support upcasting each version of the class from 0 to one less than the current version. For example: a domain class with “__class_version__ = 1” will need to support upcasting “class_version == 0” to version 1; and a domain class with “__class_version__ = 2” will need to support both upcasting “class_version == 0” to version 1, and also upcasting “class_version == 1” to version 2.

To support backward compatibility, the recorded state of old versions of an event class will need to be upcast to work with new versions of the software. Commonly, this involves supplying default values for attributes missing on old versions of events, after an event class has been changed by adding a new attribute.

In a situation where a new version of an event class needs to be used by existing software, it would be necessary also to support forward compatibility. Examples of this situation include the situation of deploying with rolling updates, and the situation where consumers would receive and attempt to handle the new version of the event but cannot be updated before the new event class version is deployed.

To support forward compatibility, it is necessary to restrict changes to be merely additive: either adding new attributes to an event, or adding new behaviours to the type of an existing attribute. Event attributes shouldn’t be removed (or renamed), existing attributes should not have aspects of their behaviour removed, and the semantics of an existing attribute should not change. If the type of an attribute value is changed, the new type should substitutable for the old type. The old software will then simply ignore new attributes, and it will simply ignore new aspects of new types of value.

For example, the type of an attribute value can be changed to use a subtype of the previous attribute value type. The possible range of a value can be reduced when changing the type of an attribute value, since all the values of the new type will be supported by the old software.

Increasing the possible range of a value will introduce values that cannot be used by the old software, and changing the type of an attribute value to a supertype will remove behaviour that the old software may depend on.

Of course, if the old software doesn’t in fact depend on aspects that are removed, then those aspects can be removed without actually breaking anything.

If backward and forward compatibility is required, and it is felt that an event class needs to be changed that would cause an attribute to be removed or the type of an attribute to be more general, or the range of values of an attribute to increase, then according to Greg Young’s book ‘Versioning in an Event Sourced System’ it is better to to add a new event type. However, if the old software would break because this new event type is not supported, then supporting forward compatibility would be elusive.

In summary, supporting forward compatibility restricts model changes to adding attributes to existing model classes (which implies the versioned event class’ upcast method will supply default values when upcasting the state of old versions of the event).

timebucketedlog

Time-bucketed logs allow a sequence of the items that is sequenced by timestamp to be split across a number of different database partitions, which avoids one partition becoming very large (and then unworkable).

class eventsourcing.domain.model.timebucketedlog.Timebucketedlog(name: uuid.UUID, bucket_size: Optional[str] = None, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

class Event(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Supertype for events of time-bucketed log.

class Started(originator_version: int = 0, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Created, eventsourcing.domain.model.timebucketedlog.Event

class BucketSizeChanged(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.timebucketedlog.Event, eventsourcing.domain.model.entity.AttributeChanged

__init__(name: uuid.UUID, bucket_size: Optional[str] = None, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

class eventsourcing.domain.model.timebucketedlog.TimebucketedlogRepository[source]

Bases: eventsourcing.domain.model.repository.AbstractEntityRepository

get_or_create(log_name: uuid.UUID, bucket_size: str) → eventsourcing.domain.model.timebucketedlog.Timebucketedlog[source]

Gets or creates a log.

class eventsourcing.domain.model.timebucketedlog.MessageLogged(message: str, originator_id: uuid.UUID)[source]

Bases: eventsourcing.domain.model.events.EventWithTimestamp, eventsourcing.domain.model.events.EventWithOriginatorID, eventsourcing.domain.model.events.LoggedEvent

__init__(message: str, originator_id: uuid.UUID)[source]

Initialises event attribute values directly from constructor kwargs.

collection

Collections.

class eventsourcing.domain.model.collection.Collection(**kwargs)[source]

Bases: eventsourcing.domain.model.entity.TimestampedVersionedEntity

class Event(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Supertype for events of collection entities.

class Created(originator_version: int = 0, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.Event, eventsourcing.domain.model.entity.Created

Published when collection is created.

class Discarded(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.Event, eventsourcing.domain.model.entity.Discarded

Published when collection is discarded.

class EventWithItem(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.Event

__init__(**kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

class ItemAdded(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.EventWithItem

mutate(obj: eventsourcing.domain.model.collection.Collection) → None[source]

Updates (“mutates”) given ‘obj’.

Intended to be overridden by subclasses, as the most concise way of coding a default projection of the event (for example into the state of a domain entity).

The advantage of implementing a default projection using this method rather than __mutate__ is that you don’t need to call super or return a value.

Parameters:obj – domain entity to be mutated
class ItemRemoved(originator_version: int, **kwargs)[source]

Bases: eventsourcing.domain.model.collection.EventWithItem

mutate(obj: eventsourcing.domain.model.collection.Collection) → None[source]

Updates (“mutates”) given ‘obj’.

Intended to be overridden by subclasses, as the most concise way of coding a default projection of the event (for example into the state of a domain entity).

The advantage of implementing a default projection using this method rather than __mutate__ is that you don’t need to call super or return a value.

Parameters:obj – domain entity to be mutated
class eventsourcing.domain.model.collection.AbstractCollectionRepository[source]

Bases: eventsourcing.domain.model.repository.AbstractEntityRepository

array

A kind of collection, indexed by integer. Doesn’t need to replay all events to exist.

class eventsourcing.domain.model.array.ItemAssigned(item, index, **kwargs)[source]

Bases: eventsourcing.domain.model.entity.Event

Occurs when an item is set at a position in an array.

__init__(item, index, **kwargs)[source]

Initialises event attribute values directly from constructor kwargs.

class eventsourcing.domain.model.array.BigArray(array_id, repo)[source]

Bases: eventsourcing.domain.model.array.Array

A virtual array holding items in indexed positions, across a number of Array instances.

Getting and setting items at index position is supported. Slices are supported, and operate across the underlying arrays. Appending is also supported.

BigArray is designed to overcome the concern of needing a single large sequence that may not be suitably stored in any single partiton. In simple terms, if events of an aggregate can fit in a partition, we can use the same size partition to make a tree of arrays that will certainly be capable of sequencing all the events of the application in a single stream.

With normal size base arrays, enterprise applications can expect read and write time to be approximately constant with respect to the number of items in the array.

The array is composed of a tree of arrays, which gives the capacity equal to the size of each array to the power of the size of each array. If the arrays are limited to be about the maximum size of an aggregate event stream (a large number but not too many that would cause there to be too much data in any one partition, let’s say 1000s to be safe) then it would be possible to fit such a large number of aggregates in the corresponding BigArray, that we can be confident it would be full.

Write access time in the worst case, and the time to identify the index of the last item in the big array, is proportional to the log of the highest assigned index to base the underlying array size. Write time on average, and read time given an index, is constant with respect to the number of items in a BigArray.

Items can be appended in log time in a single thread. However, the time between reading the current last index and claiming the next position leads to contention and retries when there are lots of threads of execution all attempting to append items, which inherently limits throughput.

Todo: Not possible in Cassandra, but maybe do it in a transaction in SQLAlchemy?

An alternative to reading the last item before writing the next is to use an integer sequence generator to generate a stream of integers. Items can be assigned to index positions in a big array, according to the integers that are issued. Throughput will then be much better, and will be limited only by the rate at which the database can have events written to it (unless the number generator is quite slow).

An external integer sequence generator, such as Redis’ INCR command, or an auto-incrementing database column, may constitute a single point of failure.

__init__(array_id, repo)[source]

Initialize self. See help(type(self)) for accurate signature.

get_last_array()[source]

Returns last array in compound.

Return type:CompoundSequenceReader
__getitem__(item)[source]

Returns item at index, or items in slice.

__setitem__(position, item)[source]

Sets item in array, at given index.

Won’t overrun the end of the array, because the position is fixed to be less than base_size.

__len__()[source]

Returns length of array.

calc_parent(i, j, h)[source]

Returns get_big_array and end of span of parent sequence that contains given child.

class eventsourcing.domain.model.array.AbstractArrayRepository(array_size=10000, *args, **kwargs)[source]

Bases: eventsourcing.domain.model.repository.AbstractEntityRepository

Repository for sequence objects.

__init__(array_size=10000, *args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

__getitem__(entity_id) → Any[source]

Returns sequence for given ID.

class eventsourcing.domain.model.array.AbstractBigArrayRepository[source]

Bases: eventsourcing.domain.model.repository.AbstractEntityRepository

Repository for compound sequence objects.

subrepo

Sub-sequence repository.

__getitem__(entity_id) → Any[source]

Returns sequence for given ID.