Source code for eventsourcing.domain.model.entity

"""
The entity module provides base classes for domain entities.
"""
from abc import ABCMeta, abstractmethod
from uuid import uuid4

from six import with_metaclass

from eventsourcing.domain.model.events import AttributeChanged, Created, Discarded, DomainEvent, \
    EventWithOriginatorID, \
    EventWithOriginatorVersion, EventWithTimestamp, GENESIS_HASH, QualnameABC, publish
from eventsourcing.exceptions import EntityIsDiscarded, HeadHashError, OriginatorIDError, \
    OriginatorVersionError
from eventsourcing.utils.times import decimaltimestamp_from_uuid
from eventsourcing.utils.topic import get_topic, resolve_topic


[docs]class DomainEntity(QualnameABC): """ Base class for domain entities. """ __with_data_integrity__ = True __genesis_hash__ = GENESIS_HASH def __init__(self, id): self._id = id self.__is_discarded__ = False if self.__with_data_integrity__: self.__head__ = type(self).__genesis_hash__ @property def id(self): """Entity ID allows an entity instance to be referenced and distinguished from others, even though its state may change over time. """ return self._id
[docs] class Event(EventWithOriginatorID, DomainEvent): """ Supertype for events of domain entities. """ __with_data_integrity__ = True def __init__(self, **kwargs): super(DomainEntity.Event, self).__init__(**kwargs) def __mutate__(self, obj): # Check the object. self.__check_obj__(obj) # Call super method. obj = super(DomainEntity.Event, self).__mutate__(obj) # Update __head__. if getattr(type(obj), '__with_data_integrity__', True): assert self.__with_data_integrity__ obj.__head__ = self.__event_hash__ return obj
[docs] def __check_obj__(self, obj): """ Checks obj state before mutating. """ # Check ID matches originator ID. if obj.id != self.originator_id: raise OriginatorIDError( "'{}' not equal to event originator ID '{}'" "".format(obj.id, self.originator_id) ) # Check __head__ matches previous hash. if getattr(type(obj), '__with_data_integrity__', True): assert self.__with_data_integrity__ if obj.__head__ != self.__dict__.get('__previous_hash__'): raise HeadHashError(obj.id, obj.__head__, type(self))
@classmethod def __create__(cls, originator_id=None, event_class=None, **kwargs): if originator_id is None: originator_id = uuid4() if getattr(cls, '__with_data_integrity__', True): genesis_hash = getattr(cls, '__genesis_hash__', GENESIS_HASH) kwargs['__previous_hash__'] = genesis_hash event = (event_class or cls.Created)( originator_id=originator_id, originator_topic=get_topic(cls), **kwargs ) obj = event.__mutate__() obj.__publish__(event) return obj
[docs] class Created(Event, Created): """ Published when an entity is created. """ def __init__(self, originator_topic, **kwargs): super(DomainEntity.Created, self).__init__( originator_topic=originator_topic, **kwargs ) @property def originator_topic(self): return self.__dict__['originator_topic'] def __mutate__(self, entity_class=None): if entity_class is None: entity_class = resolve_topic(self.originator_topic) with_data_integrity = getattr(entity_class, '__with_data_integrity__', True) if with_data_integrity: self.__check_hash__() obj = entity_class(**self.__entity_kwargs__) if with_data_integrity: obj.__head__ = self.__event_hash__ return obj @property def __entity_kwargs__(self): kwargs = self.__dict__.copy() kwargs['id'] = kwargs.pop('originator_id') kwargs.pop('originator_topic', None) kwargs.pop('__event_hash__', None) kwargs.pop('__event_topic__', None) kwargs.pop('__previous_hash__', None) return kwargs
[docs] def __change_attribute__(self, name, value): """ Changes named attribute with the given value, by triggering an AttributeChanged event. """ self.__trigger_event__(self.AttributeChanged, name=name, value=value)
[docs] class AttributeChanged(Event, AttributeChanged): """ Published when a DomainEntity is discarded. """ def __mutate__(self, obj): obj = super(DomainEntity.AttributeChanged, self).__mutate__(obj) setattr(obj, self.name, self.value) return obj
[docs] def __discard__(self): """ Discards self, by triggering a Discarded event. """ self.__trigger_event__(self.Discarded)
[docs] class Discarded(Discarded, Event): """ Published when a DomainEntity is discarded. """ def __mutate__(self, obj): obj = super(DomainEntity.Discarded, self).__mutate__(obj) obj.__is_discarded__ = True return None
[docs] def __assert_not_discarded__(self): """ Raises exception if entity has been discarded already. """ if self.__is_discarded__: raise EntityIsDiscarded("Entity is discarded")
[docs] def __trigger_event__(self, event_class, **kwargs): """ Constructs, applies, and publishes a domain event. """ self.__assert_not_discarded__() if type(self).__with_data_integrity__: kwargs['__previous_hash__'] = self.__head__ event = event_class( originator_id=self._id, **kwargs ) event.__mutate__(self) self.__publish__(event)
[docs] def __publish__(self, event): """ Publishes given event for subscribers in the application. :param event: domain event or list of events """ self.__publish_to_subscribers__(event)
[docs] def __publish_to_subscribers__(self, event): """ Actually dispatches given event to publish-subscribe mechanism. :param event: domain event or list of events """ publish(event)
__hash__ = None # For Python 2.7, so hash(obj) raises TypeError. def __eq__(self, other): return type(self) == type(other) and self.__dict__ == other.__dict__ def __ne__(self, other): return not self.__eq__(other)
[docs]class VersionedEntity(DomainEntity): def __init__(self, __version__=None, **kwargs): super(VersionedEntity, self).__init__(**kwargs) self.___version__ = __version__ @property def __version__(self): return self.___version__
[docs] def __trigger_event__(self, event_class, **kwargs): """ Triggers domain event with entity's next version number. """ return super(VersionedEntity, self).__trigger_event__( event_class=event_class, originator_version=self.__version__ + 1, **kwargs )
[docs] class Event(EventWithOriginatorVersion, DomainEntity.Event): """Supertype for events of versioned entities.""" def __mutate__(self, obj): obj = super(VersionedEntity.Event, self).__mutate__(obj) if obj is not None: obj.___version__ = self.originator_version return obj
[docs] def __check_obj__(self, obj): """ Also checks the event's originator version follows this entity's version. """ super(VersionedEntity.Event, self).__check_obj__(obj) if obj.__version__ + 1 != self.originator_version: raise OriginatorVersionError( ("Event originated from entity at version {}, " "but entity is currently at version {}. " "Event type: '{}', entity type: '{}', entity ID: '{}'" "".format(self.originator_version, obj.__version__, type(self).__name__, type(obj).__name__, obj._id) ) )
[docs] class Created(DomainEntity.Created, Event): """Published when a VersionedEntity is created.""" def __init__(self, originator_version=0, **kwargs): super(VersionedEntity.Created, self).__init__(originator_version=originator_version, **kwargs) @property def __entity_kwargs__(self): kwargs = super(VersionedEntity.Created, self).__entity_kwargs__ kwargs['__version__'] = kwargs.pop('originator_version') return kwargs
[docs] class AttributeChanged(Event, DomainEntity.AttributeChanged): """Published when a VersionedEntity is changed."""
[docs] class Discarded(Event, DomainEntity.Discarded): """Published when a VersionedEntity is discarded."""
[docs]class TimestampedEntity(DomainEntity): def __init__(self, __created_on__, **kwargs): super(TimestampedEntity, self).__init__(**kwargs) self.___created_on__ = __created_on__ self.___last_modified__ = __created_on__ @property def __created_on__(self): return self.___created_on__ @property def __last_modified__(self): return self.___last_modified__
[docs] class Event(DomainEntity.Event, EventWithTimestamp): """Supertype for events of timestamped entities."""
[docs] def __mutate__(self, obj): """Update obj with values from self.""" obj = super(TimestampedEntity.Event, self).__mutate__(obj) if obj is not None: assert isinstance(obj, TimestampedEntity), obj obj.___last_modified__ = self.timestamp return obj
[docs] class Created(DomainEntity.Created, Event): """Published when a TimestampedEntity is created.""" @property def __entity_kwargs__(self): kwargs = super(TimestampedEntity.Created, self).__entity_kwargs__ kwargs['__created_on__'] = kwargs.pop('timestamp') return kwargs
[docs] class AttributeChanged(Event, DomainEntity.AttributeChanged): """Published when a TimestampedEntity is changed."""
[docs] class Discarded(Event, DomainEntity.Discarded): """Published when a TimestampedEntity is discarded."""
[docs]class TimeuuidedEntity(DomainEntity): def __init__(self, event_id, **kwargs): super(TimeuuidedEntity, self).__init__(**kwargs) self.___initial_event_id__ = event_id self.___last_event_id__ = event_id @property def __created_on__(self): return decimaltimestamp_from_uuid(self.___initial_event_id__) @property def __last_modified__(self): return decimaltimestamp_from_uuid(self.___last_event_id__)
[docs]class TimestampedVersionedEntity(TimestampedEntity, VersionedEntity):
[docs] class Event(TimestampedEntity.Event, VersionedEntity.Event): """Supertype for events of timestamped, versioned entities."""
[docs] class Created(TimestampedEntity.Created, VersionedEntity.Created, Event): """Published when a TimestampedVersionedEntity is created."""
[docs] class AttributeChanged(Event, TimestampedEntity.AttributeChanged, VersionedEntity.AttributeChanged): """Published when a TimestampedVersionedEntity is created."""
[docs] class Discarded(Event, TimestampedEntity.Discarded, VersionedEntity.Discarded): """Published when a TimestampedVersionedEntity is discarded."""
[docs]class TimeuuidedVersionedEntity(TimeuuidedEntity, VersionedEntity): pass
[docs]class AbstractEventPlayer(with_metaclass(ABCMeta)): pass
[docs]class AbstractEntityRepository(AbstractEventPlayer):
[docs] @abstractmethod def __getitem__(self, entity_id): """ Returns entity for given ID. """
[docs] @abstractmethod def __contains__(self, entity_id): """ Returns True or False, according to whether or not entity exists. """
[docs] @abstractmethod def get_entity(self, entity_id, at=None): """ Returns entity for given ID. """
@property @abstractmethod def event_store(self): """ Returns event store object used by this repository. """
[docs] @abstractmethod def take_snapshot(self, entity_id, lt=None, lte=None): """ Takes snapshot of entity state, using stored events. :return: Snapshot """