Source code for eventsourcing.domain.model.events

import os
from collections import OrderedDict
from uuid import uuid1

from eventsourcing.exceptions import EventHashError
from eventsourcing.utils.hashing import hash_object
from eventsourcing.utils.times import decimaltimestamp
from eventsourcing.utils.topic import get_topic
from eventsourcing.utils.transcoding import ObjectJSONEncoder

GENESIS_HASH = os.getenv('GENESIS_HASH', '')


[docs]def create_timesequenced_event_id(): return uuid1()
[docs]class DomainEvent(object): """ Base class for domain events. Implements methods to make instances read-only, comparable for equality in Python, and have recognisable representations. To make domain events hashable, this class also implements a method to create a cryptographic hash of the state of the event. """ __json_encoder_class__ = ObjectJSONEncoder __notifiable__ = True
[docs] def __init__(self, **kwargs): """ Initialises event attribute values directly from constructor kwargs. """ self.__dict__.update(kwargs)
[docs] def __repr__(self): """ Creates a string representing the type and attribute values of the event. :rtype: str """ sorted_items = tuple(sorted(self.__dict__.items())) args_strings = ("{0}={1!r}".format(*item) for item in sorted_items) args_string = ', '.join(args_strings) return "{}({})".format(self.__class__.__qualname__, args_string)
[docs] def __mutate__(self, obj): """ Updates 'obj' with values from 'self'. Calls the 'mutate()' method. Can be extended, but subclasses must call super and return an object to their caller. :param obj: object (normally a domain entity) to be mutated :return: mutated object """ self.mutate(obj) return obj
[docs] def mutate(self, obj): """ Updates ("mutates") given 'obj'. Intended to be overridden by subclasses, as the most concise way of coding a default projection of the event (for example into the state of a domain entity). The advantage of implementing a default projection using this method rather than __mutate__ is that you don't need to call super or return a value. :param obj: domain entity to be mutated """
[docs] def __setattr__(self, key, value): """ Inhibits event attributes from being updated by assignment. """ raise AttributeError("DomainEvent attributes are read-only")
[docs] def __eq__(self, other): """ Tests for equality of two event objects. :rtype: bool """ return isinstance(other, DomainEvent) and self.__hash__() == other.__hash__()
[docs] def __ne__(self, other): """ Negates the equality test. :rtype: bool """ return not (self == other)
[docs] def __hash__(self): """ Computes a Python integer hash for an event. Supports Python equality and inequality comparisons. :return: Python integer hash :rtype: int """ state = self.__dict__.copy() state['__event_topic__'] = get_topic(type(self)) # Calculate the cryptographic hash of the event. sha256_hash = self.__hash_object__(state) # Return the Python hash of the cryptographic hash. return hash(sha256_hash)
[docs] @classmethod def __hash_object__(cls, obj): """ Calculates SHA-256 hash of JSON encoded 'obj'. :param obj: Object to be hashed. :return: SHA-256 as hexadecimal string. :rtype: str """ return hash_object(cls.__json_encoder_class__, obj)
[docs]class EventWithHash(DomainEvent): """ Base class for domain events with a cryptographic event hash. Extends DomainEvent by setting a cryptographic event hash when the event is originated, and checking the event hash whenever its default projection mutates an object. """
[docs] def __init__(self, **kwargs): super(EventWithHash, self).__init__(**kwargs) # Set __event_topic__ to differentiate events of # different types with otherwise equal attributes. self.__dict__['__event_topic__'] = get_topic(type(self)) # Set __event_hash__ with a SHA-256 hash of the event. self.__dict__['__event_hash__'] = self.__hash_object__(self.__dict__)
@property def __event_hash__(self): """ Returns SHA-256 hash of the original state of the event. :return: SHA-256 as hexadecimal string. :rtype: str """ return self.__dict__.get('__event_hash__')
[docs] def __hash__(self): """ Computes a Python integer hash for an event, using its pre-computed event hash. Supports Python equality and inequality comparisons only. :return: Python integer hash :rtype: int """ # Return the Python hash of the cryptographic hash. return hash(self.__event_hash__)
[docs] def __mutate__(self, obj): """ Updates 'obj' with values from self. Can be extended, but subclasses must call super method, and return an object. :param obj: object to be mutated :return: mutated object """ # Check the hash. self.__check_hash__() # Call super and return value. return super(EventWithHash, self).__mutate__(obj)
[docs] def __check_hash__(self): """ Raises EventHashError, unless self.__event_hash__ can be derived from the current state of the event object. """ state = self.__dict__.copy() event_hash = state.pop('__event_hash__') if event_hash != self.__hash_object__(state): raise EventHashError()
[docs]class EventWithOriginatorID(DomainEvent): """ For events that have an originator ID. """
[docs] def __init__(self, originator_id, **kwargs): kwargs['originator_id'] = originator_id super(EventWithOriginatorID, self).__init__(**kwargs)
@property def originator_id(self): """ Originator ID is the identity of the object that originated this event. :return: A UUID representing the identity of the originator. :rtype: UUID """ return self.__dict__['originator_id']
[docs]class EventWithTimestamp(DomainEvent): """ For events that have a timestamp value. """
[docs] def __init__(self, timestamp=None, **kwargs): kwargs['timestamp'] = timestamp or decimaltimestamp() super(EventWithTimestamp, self).__init__(**kwargs)
@property def timestamp(self): """ A UNIX timestamp as a Decimal object. :rtype: Decimal """ return self.__dict__['timestamp']
[docs]class EventWithOriginatorVersion(DomainEvent): """ For events that have an originator version number. """
[docs] def __init__(self, originator_version, **kwargs): if not isinstance(originator_version, int): raise TypeError("Version must be an integer: {}".format(originator_version)) kwargs['originator_version'] = originator_version super(EventWithOriginatorVersion, self).__init__(**kwargs)
@property def originator_version(self): """ Originator version is the version of the object that originated this event. :return: A integer representing the version of the originator. :rtype: int """ return self.__dict__['originator_version']
[docs]class EventWithTimeuuid(DomainEvent): """ For events that have an UUIDv1 event ID. """
[docs] def __init__(self, event_id=None, **kwargs): kwargs['event_id'] = event_id or uuid1() super(EventWithTimeuuid, self).__init__(**kwargs)
@property def event_id(self): return self.__dict__['event_id']
[docs]class Created(DomainEvent): """ Can be originated when something is created. """
[docs]class AttributeChanged(DomainEvent): """ Can be originated when the value of an attribute changes. """ @property def name(self): return self.__dict__['name'] @property def value(self): return self.__dict__['value']
[docs]class Discarded(DomainEvent): """ Can be originated when something is discarded. """
[docs]class Logged(DomainEvent): """ Can be originated when something is logged. """
_subscriptions = []
[docs]def subscribe(handler, predicate=None): """ Adds 'handler' to list of event handlers to be called if 'predicate' is satisfied. If predicate is None, the handler will be called whenever an event is published. :param handler: Will be called when an event is published. :param predicate: Conditions whether the handler will be called. """ if (predicate, handler) not in _subscriptions: _subscriptions.append((predicate, handler))
[docs]def unsubscribe(handler, predicate=None): """ Removes 'handler' from list of event handlers to be called if 'predicate' is satisfied. :param handler: :param predicate: """ if (predicate, handler) in _subscriptions: _subscriptions.remove((predicate, handler))
[docs]def publish(event): """ Calls subscribed event handlers with the given 'event', except those with predicates that are not satisfied by the event. :param event: """ for predicate, handler in _subscriptions[:]: if predicate is None or predicate(event): handler(event)
[docs]class EventHandlersNotEmptyError(Exception): pass
[docs]def assert_event_handlers_empty(): """ Raises EventHandlersNotEmptyError, unless there are no event handlers subscribed. """ if len(_subscriptions): msg = "subscriptions still exist: %s" % _subscriptions raise EventHandlersNotEmptyError(msg)
[docs]def clear_event_handlers(): """ Removes all previously subscribed event handlers. """ _subscriptions.clear()