import os
from abc import ABCMeta
from collections import OrderedDict
from uuid import uuid1
import six
from six import with_metaclass
from eventsourcing.exceptions import EventHashError
from eventsourcing.utils.hashing import hash_for_data_integrity
from eventsourcing.utils.times import decimaltimestamp
from eventsourcing.utils.topic import get_topic
from eventsourcing.utils.transcoding import ObjectJSONEncoder
GENESIS_HASH = os.getenv('GENESIS_HASH', '')
[docs]def create_timesequenced_event_id():
return uuid1()
[docs]class QualnameABC(with_metaclass(QualnameABCMeta)):
"""
Base class that introduces __qualname__ for objects in Python 2.7.
"""
[docs]class DomainEvent(QualnameABC):
"""
Base class for domain events.
Implements methods to make instances read-only, comparable
for equality, have recognisable representations, and hashable.
"""
__with_data_integrity__ = False
__json_encoder_class__ = ObjectJSONEncoder
[docs] def __init__(self, **kwargs):
"""
Initialises event attribute values directly from constructor kwargs.
"""
if self.__with_data_integrity__:
kwargs['__event_topic__'] = get_topic(type(self))
# Seal the event with a hash of the other values.
assert '__event_hash__' not in kwargs
event_hash = self.__hash_for_data_integrity__(kwargs)
kwargs['__event_hash__'] = event_hash
self.__dict__.update(kwargs)
@classmethod
def __hash_for_data_integrity__(cls, obj):
return hash_for_data_integrity(cls.__json_encoder_class__, obj)
@property
def __event_hash__(self):
return self.__dict__.get('__event_hash__')
def __check_hash__(self):
state = self.__dict__.copy()
event_hash = state.pop('__event_hash__')
if event_hash != self.__hash_for_data_integrity__(state):
raise EventHashError()
[docs] def __mutate__(self, obj):
"""
Update obj with values from self.
Can be extended, but subclasses must call super
method, and return an object.
:param obj: object to be mutated
:return: mutated object
"""
# Check the event and the object.
if self.__with_data_integrity__:
# Todo: Refactor: "replace assert with test" (ie, an if statement).
assert self.__dict__['__event_topic__'] == get_topic(type(self))
self.__check_hash__()
# Call mutate() method.
self.mutate(obj)
return obj
[docs] def mutate(self, obj):
"""
Convenience for use in custom models, to update
obj with values from self without needing to call
super method and return obj (two extra lines).
Can be overridden by subclasses. Any value returned
by this method will be ignored.
Please note, subclasses that extend mutate() might
not have fully completed that method before this method
is called. To ensure all base classes have completed
their mutate behaviour before mutating an event in a concrete
class, extend mutate() instead of overriding this method.
:param obj: object to be mutated
"""
[docs] def __setattr__(self, key, value):
"""
Inhibits event attributes from being updated by assignment.
"""
raise AttributeError("DomainEvent attributes are read-only")
[docs] def __eq__(self, other):
"""
Tests for equality of two event objects.
"""
return isinstance(other, DomainEvent) and self.__hash__() == other.__hash__()
[docs] def __ne__(self, other):
"""
Negates the equality test.
"""
return not (self == other)
[docs] def __hash__(self):
"""
Computes a Python integer hash for an event,
using its event hash string if available.
Supports equality and inequality comparisons.
"""
return hash((
self.__event_hash__ or self.__hash_for_data_integrity__(
self.__dict__
), self.__class__
))
[docs] def __repr__(self):
"""
Returns string representing the type and attribute values of the event.
"""
sorted_items = tuple(sorted(self.__dict__.items()))
args_strings = ("{0}={1!r}".format(*item) for item in sorted_items)
args_string = ', '.join(args_strings)
return "{}({})".format(self.__class__.__qualname__, args_string)
[docs]class EventWithOriginatorID(DomainEvent):
def __init__(self, originator_id, **kwargs):
kwargs['originator_id'] = originator_id
super(EventWithOriginatorID, self).__init__(**kwargs)
@property
def originator_id(self):
return self.__dict__['originator_id']
[docs]class EventWithTimestamp(DomainEvent):
"""
For events that have a timestamp value.
"""
def __init__(self, timestamp=None, **kwargs):
kwargs['timestamp'] = timestamp or decimaltimestamp()
super(EventWithTimestamp, self).__init__(**kwargs)
@property
def timestamp(self):
return self.__dict__['timestamp']
[docs]class EventWithOriginatorVersion(DomainEvent):
"""
For events that have an originator version number.
"""
def __init__(self, originator_version, **kwargs):
if not isinstance(originator_version, six.integer_types):
raise TypeError("Version must be an integer: {}".format(originator_version))
kwargs['originator_version'] = originator_version
super(EventWithOriginatorVersion, self).__init__(**kwargs)
@property
def originator_version(self):
return self.__dict__['originator_version']
[docs]class EventWithTimeuuid(DomainEvent):
"""
For events that have an UUIDv1 event ID.
"""
def __init__(self, event_id=None, **kwargs):
kwargs['event_id'] = event_id or uuid1()
super(EventWithTimeuuid, self).__init__(**kwargs)
@property
def event_id(self):
return self.__dict__['event_id']
[docs]class Created(DomainEvent):
"""
Can be published when an entity is created.
"""
[docs]class AttributeChanged(DomainEvent):
"""
Can be published when an attribute of an entity is created.
"""
@property
def name(self):
return self.__dict__['name']
@property
def value(self):
return self.__dict__['value']
[docs]class Discarded(DomainEvent):
"""
Published when something is discarded.
"""
[docs]class Logged(DomainEvent):
"""
Published when something is logged.
"""
_event_handlers = OrderedDict()
[docs]def subscribe(handler, predicate=None):
if predicate not in _event_handlers:
_event_handlers[predicate] = []
_event_handlers[predicate].append(handler)
[docs]def unsubscribe(handler, predicate=None):
if predicate in _event_handlers:
handlers = _event_handlers[predicate]
if handler in handlers:
handlers.remove(handler)
if not handlers:
_event_handlers.pop(predicate)
[docs]def publish(event):
for predicate, handlers in _event_handlers.items():
for handler in handlers:
if predicate is None or predicate(event):
handler(event)
[docs]class EventHandlersNotEmptyError(Exception):
pass
[docs]def assert_event_handlers_empty():
if len(_event_handlers):
msg = "Event handlers are still subscribed: %s" % _event_handlers
raise EventHandlersNotEmptyError(msg)