Source code for eventsourcing.domain.model.entity

from abc import ABCMeta, abstractmethod, abstractproperty

from six import with_metaclass

from eventsourcing.domain.model.decorators import mutator
from eventsourcing.domain.model.events import AttributeChanged, Created, Discarded, DomainEvent, \
    EventWithOriginatorID, EventWithOriginatorVersion, EventWithTimestamp, QualnameABC, publish
from eventsourcing.exceptions import EntityIsDiscarded, MismatchedOriginatorIDError, \
    MismatchedOriginatorVersionError, MutatorRequiresTypeNotInstance
from eventsourcing.utils.time import timestamp_from_uuid


[docs]class DomainEntity(QualnameABC):
[docs] class Event(EventWithOriginatorID, DomainEvent): """Layer supertype."""
[docs] class Created(Event, Created): """Published when a DomainEntity is created."""
[docs] class AttributeChanged(Event, AttributeChanged): """Published when a DomainEntity is discarded."""
[docs] class Discarded(Event, Discarded): """Published when a DomainEntity is discarded."""
def __init__(self, originator_id): self._id = originator_id self._is_discarded = False def __eq__(self, other): return type(self) == type(other) and self.__dict__ == other.__dict__ def __ne__(self, other): return not self.__eq__(other) @property def id(self): return self._id
[docs] def change_attribute(self, name, value, **kwargs): """ Changes given attribute of the entity, by constructing and applying an AttributeChanged event. """ self._assert_not_discarded() event = self.AttributeChanged( name=name, value=value, originator_id=self._id, **kwargs ) self._apply_and_publish(event)
[docs] def discard(self, **kwargs): self._assert_not_discarded() event = self.Discarded(originator_id=self._id, **kwargs) self._apply_and_publish(event)
def _validate_originator(self, event): """ Checks the event originated from (was published by) this entity. """ self._validate_originator_id(event) def _validate_originator_id(self, event): """ Checks the event's entity ID matches this entity's ID. """ if self._id != event.originator_id: raise MismatchedOriginatorIDError( "'{}' not equal to event originator ID '{}'" "".format(self.id, event.originator_id) ) def _assert_not_discarded(self): if self._is_discarded: raise EntityIsDiscarded("Entity is discarded") def _apply_and_publish(self, event): """ Applies event, by mutating self with event and then publishing event. Must be an object method, since subclass AggregateRoot._publish() will append events to a list internal to the entity object, hence it needs to work with an instance rather than the type. """ self._mutate(initial=self, event=event) self._publish(event) @classmethod def _mutate(cls, initial, event): """ Calls a mutator function with given entity and event. Passes cls if initial is None, so that Created event handler can construct an entity object with the correct subclass. Please override or extend in subclasses that extend or replace the mutate_entity() function, so that the correct mutator function will be invoked. """ return mutate_entity(initial or cls, event) def _publish(self, event): """ Publishes event for subscribers in the application. """ publish(event)
[docs]class WithReflexiveMutator(DomainEntity): """ Implements an entity mutator function by dispatching to the event itself all calls to mutate an entity with an event. This is an alternative to using an independent mutator function implemented with the @mutator decorator, or an if-else block. """ @classmethod def _mutate(cls, initial, event): """ Calls the mutate() method of the event. Passes cls if initial is None, so that handler of Created events can construct an entity object with the subclass. """ return event.mutate(initial or cls)
[docs]class VersionedEntity(DomainEntity):
[docs] class Event(EventWithOriginatorVersion, DomainEntity.Event): """Layer supertype."""
[docs] class Created(Event, DomainEntity.Created): """Published when a VersionedEntity is created.""" def __init__(self, originator_version=0, **kwargs): super(Created, self).__init__(originator_version=originator_version, **kwargs)
[docs] class AttributeChanged(Event, DomainEntity.AttributeChanged): """Published when a VersionedEntity is changed."""
[docs] class Discarded(Event, DomainEntity.Discarded): """Published when a VersionedEntity is discarded."""
def __init__(self, originator_version, **kwargs): super(VersionedEntity, self).__init__(**kwargs) self._version = originator_version @property def version(self): return self._version def _increment_version(self): if self._version is not None: self._version += 1 def _validate_originator(self, event): super(VersionedEntity, self)._validate_originator(event) self._validate_originator_version(event) def _validate_originator_version(self, event): """ Checks the event's entity version matches this entity's version. """ if self._version != event.originator_version: raise MismatchedOriginatorVersionError( ("Event originated from entity at version {}, " "but entity is currently at version {}. " "Event type: '{}', entity type: '{}', entity ID: '{}'" "".format(self._version, event.originator_version, type(event).__name__, type(self).__name__, self._id) ) )
[docs] def change_attribute(self, name, value, **kwargs): return super(VersionedEntity, self).change_attribute( name, value, originator_version=self._version, **kwargs)
[docs] def discard(self, **kwargs): return super(VersionedEntity, self).discard( originator_version=self._version, **kwargs)
[docs]class TimestampedEntity(DomainEntity):
[docs] class Event(EventWithTimestamp, DomainEntity.Event): """Layer supertype."""
[docs] class Created(Event, DomainEntity.Created): """Published when a TimestampedEntity is created."""
[docs] class AttributeChanged(Event, DomainEntity.AttributeChanged): """Published when a TimestampedEntity is changed."""
[docs] class Discarded(Event, DomainEntity.Discarded): """Published when a TimestampedEntity is discarded."""
def __init__(self, timestamp, **kwargs): super(TimestampedEntity, self).__init__(**kwargs) self._created_on = timestamp self._last_modified_on = timestamp @property def created_on(self): return self._created_on @property def last_modified_on(self): return self._last_modified_on
[docs]class TimeuuidedEntity(DomainEntity): def __init__(self, event_id, **kwargs): super(TimeuuidedEntity, self).__init__(**kwargs) self._initial_event_id = event_id self._last_event_id = event_id @property def created_on(self): return timestamp_from_uuid(self._initial_event_id) @property def last_modified_on(self): return timestamp_from_uuid(self._last_event_id)
[docs]class TimestampedVersionedEntity(TimestampedEntity, VersionedEntity):
[docs] class Event(TimestampedEntity.Event, VersionedEntity.Event): """Layer supertype."""
[docs] class Created(Event, TimestampedEntity.Created, VersionedEntity.Created): """Published when a TimestampedVersionedEntity is created."""
[docs] class AttributeChanged(Event, TimestampedEntity.AttributeChanged, VersionedEntity.AttributeChanged): """Published when a TimestampedVersionedEntity is created."""
[docs] class Discarded(Event, TimestampedEntity.Discarded, VersionedEntity.Discarded): """Published when a TimestampedVersionedEntity is discarded."""
[docs]class TimeuuidedVersionedEntity(TimeuuidedEntity, VersionedEntity): pass
[docs]@mutator def mutate_entity(initial, event): """Entity mutator function. Mutates initial state by the event. Different handlers are registered for different types of event. """ raise NotImplementedError("Event type not supported: {}".format(type(event)))
@mutate_entity.register(DomainEntity.Created) def _(cls, event): assert isinstance(event, Created), event if not isinstance(cls, type): msg = ("Mutator for Created event requires object type: {}".format(type(cls))) raise MutatorRequiresTypeNotInstance(msg) try: self = cls(**event.__dict__) except TypeError as e: raise TypeError("Class {} {}. Given {} from event type {}" "".format(cls, e, event.__dict__, type(event))) if isinstance(event, VersionedEntity.Created): self._increment_version() return self @mutate_entity.register(DomainEntity.AttributeChanged) def _(self, event): self._validate_originator(event) setattr(self, event.name, event.value) if isinstance(event, TimestampedEntity.AttributeChanged): self._last_modified_on = event.timestamp if isinstance(event, VersionedEntity.AttributeChanged): self._increment_version() return self @mutate_entity.register(DomainEntity.Discarded) def _(self, event): assert isinstance(self, DomainEntity), self self._validate_originator(event) self._is_discarded = True if isinstance(event, TimestampedEntity.Discarded): self._last_modified_on = event.timestamp if isinstance(event, VersionedEntity.Discarded): self._increment_version() return None
[docs]class AbstractEntityRepository(with_metaclass(ABCMeta)): def __init__(self, *args, **kwargs): pass @abstractmethod def __getitem__(self, entity_id): """ Returns entity for given ID. """
[docs] @abstractmethod def get_entity(self, entity_id): """ Returns entity for given ID. """
@abstractmethod def __contains__(self, entity_id): """ Returns True or False, according to whether or not entity exists. """ @abstractproperty def event_store(self): """ Returns event store object used by this repository. """