import os
from abc import ABCMeta
from decimal import Decimal
from typing import Any, Dict, Optional, Sequence, Type, TypeVar
from uuid import UUID, uuid4
from eventsourcing.domain.model.decorators import subclassevents
from eventsourcing.domain.model.events import (
AttributeChangedEvent,
CreatedEvent,
DiscardedEvent,
DomainEvent,
EventWithHash,
EventWithOriginatorID,
EventWithOriginatorVersion,
EventWithTimestamp,
publish,
)
from eventsourcing.domain.model.versioning import Upcastable
from eventsourcing.exceptions import (
EntityIsDiscarded,
HeadHashError,
OriginatorIDError,
OriginatorVersionError,
)
from eventsourcing.utils.times import decimaltimestamp_from_uuid
from eventsourcing.utils.topic import get_topic, resolve_topic
from eventsourcing.whitehead import EnduringObject
# Need to deal with the fact that Python3.6 had GenericMeta.
# Todo: Delete this try/except when dropping support for Python 3.6.
try:
from typing import GenericMeta
ABCMeta = GenericMeta # type: ignore
except ImportError:
pass
[docs]class MetaDomainEntity(ABCMeta):
__subclassevents__ = False
# Todo: Delete the '**kwargs' when library no longer supports Python3.6.
# - When we started using typing.Generic, we started getting
# an error in 3.6 (only) "unexpected keyword argument 'tvars'"
# which was cured by adding **kwargs here. It's not needed
# for Python3.7, and only supports backward compatibility.
# So it can be removed when support for Python 3.6 dropped.
def __init__(cls, name: str, *args: Any, **kwargs: Any) -> None:
super().__init__(name, *args, **kwargs)
if name == "_gorg":
# Todo: Also Remove this block when dropping support for Python 3.6.
# Needed in 3.6 only, stops infinite recursion between typing and abc
# doing subclass checks. Don't know why. Seems issue fixed in Python 3.7.
pass
elif cls.__subclassevents__ is True:
# Redefine entity domain events.
subclassevents(cls)
TDomainEntity = TypeVar("TDomainEntity", bound="DomainEntity")
TDomainEvent = TypeVar("TDomainEvent", bound="DomainEntity.Event")
[docs]class DomainEntity(Upcastable, EnduringObject, metaclass=MetaDomainEntity):
"""
Supertype for domain model entity.
"""
__subclassevents__ = False
[docs] class Event(EventWithOriginatorID[TDomainEntity]):
"""
Supertype for events of domain model entities.
"""
[docs] def __check_obj__(self, obj: TDomainEntity) -> None:
"""
Checks state of obj before mutating.
:param obj: Domain entity to be checked.
:raises OriginatorIDError: if the originator_id is mismatched
"""
assert isinstance(obj, DomainEntity) # For PyCharm navigation.
# Assert ID matches originator ID.
if obj.id != self.originator_id:
raise OriginatorIDError(
"'{}' not equal to event originator ID '{}'"
"".format(obj.id, self.originator_id)
)
[docs] @classmethod
def __create__(
cls: Type[TDomainEntity],
originator_id: Optional[UUID] = None,
event_class: Optional[Type["DomainEntity.Created[TDomainEntity]"]] = None,
**kwargs: Any,
) -> TDomainEntity:
"""
Creates a new domain entity.
Constructs a "created" event, constructs the entity object
from the event, publishes the "created" event, and returns
the new domain entity object.
:param cls DomainEntity: Class of domain event
:param originator_id: ID of the new domain entity (defaults to ``uuid4()``).
:param event_class: Domain event class to be used for the "created" event.
:param kwargs: Other named attribute values of the "created" event.
:return: New domain entity object.
:rtype: DomainEntity
"""
if originator_id is None:
originator_id = uuid4()
if event_class is None:
assert issubclass(cls, DomainEntity) # For navigation in PyCharm.
created_event_class: Type[DomainEntity.Created[TDomainEntity]] = cls.Created
else:
created_event_class = event_class
event = created_event_class(
originator_id=originator_id, originator_topic=get_topic(cls), **kwargs
)
obj = event.__mutate__(None)
assert obj is not None, "{} returned None".format(
type(event).__mutate__.__qualname__
)
obj.__publish__([event])
return obj
[docs] class Created(CreatedEvent[TDomainEntity], Event[TDomainEntity]):
"""
Triggered when an entity is created.
"""
def __init__(self, originator_topic: str, **kwargs: Any):
super(DomainEntity.Created, self).__init__(
originator_topic=originator_topic, **kwargs
)
@property
def originator_topic(self) -> str:
"""
Topic (a string) representing the class of the originating domain entity.
:rtype: str
"""
return self.__dict__["originator_topic"]
[docs] def __mutate__(self, obj: Optional[TDomainEntity]) -> Optional[TDomainEntity]:
"""
Constructs object from an entity class,
which is obtained by resolving the originator topic,
unless it is given as method argument ``entity_class``.
:param entity_class: Class of domain entity to be constructed.
"""
entity_class: Type[TDomainEntity] = resolve_topic(self.originator_topic)
return entity_class(**self.__entity_kwargs__)
@property
def __entity_kwargs__(self) -> Dict[str, Any]:
kwargs = self.__dict__.copy()
kwargs["id"] = kwargs.pop("originator_id")
kwargs.pop("originator_topic", None)
kwargs.pop("__event_topic__", None)
kwargs.pop("__event_hash_method_name__", None)
return kwargs
def __init__(self, id: UUID):
self._id = id
self.__is_discarded__ = False
@property
def id(self) -> UUID:
"""The immutable ID of the domain entity.
This value is set using the ``originator_id`` of the
"created" event constructed by ``__create__()``.
An entity ID allows an instance to be
referenced and distinguished from others, even
though its state may change over time.
This attribute has the normal "public" format for a Python object
attribute name, because by definition all domain entities have an ID.
"""
return self._id
[docs] def __change_attribute__(
self: TDomainEntity, name: str, value: Any, **kwargs
) -> None:
"""
Changes named attribute with the given value,
by triggering an AttributeChanged event.
"""
event_class: Type[
"DomainEntity.AttributeChanged[TDomainEntity]"
] = self.AttributeChanged
assert isinstance(self, DomainEntity) # For PyCharm navigation.
self.__trigger_event__(
event_class=event_class, name=name, value=value, **kwargs
)
[docs] class AttributeChanged(Event[TDomainEntity], AttributeChangedEvent[TDomainEntity]):
"""
Triggered when a named attribute is assigned a new value.
"""
[docs] def __mutate__(self, obj: Optional[TDomainEntity]) -> Optional[TDomainEntity]:
obj = super(DomainEntity.AttributeChanged, self).__mutate__(obj)
setattr(obj, self.name, self.value)
return obj
[docs] def __discard__(self: TDomainEntity, **kwargs) -> None:
"""
Discards self, by triggering a Discarded event.
"""
event_class: Type["DomainEntity.Discarded[TDomainEntity]"] = self.Discarded
assert isinstance(self, DomainEntity) # For PyCharm navigation.
self.__trigger_event__(event_class=event_class, **kwargs)
[docs] class Discarded(DiscardedEvent[TDomainEntity], Event[TDomainEntity]):
"""
Triggered when a DomainEntity is discarded.
"""
[docs] def __mutate__(self, obj: Optional[TDomainEntity]) -> Optional[TDomainEntity]:
obj = super(DomainEntity.Discarded, self).__mutate__(obj)
if obj is not None:
assert isinstance(obj, DomainEntity) # For PyCharm navigation.
obj.__is_discarded__ = True
return None
[docs] def __assert_not_discarded__(self) -> None:
"""
Asserts that this entity has not been discarded.
Raises EntityIsDiscarded exception if entity has been discarded already.
"""
if self.__is_discarded__:
raise EntityIsDiscarded("Entity is discarded")
[docs] def __trigger_event__(self, event_class: Type[TDomainEvent], **kwargs: Any) -> None:
"""
Constructs, applies, and publishes a domain event.
"""
self.__assert_not_discarded__()
event: TDomainEvent = event_class(originator_id=self.id, **kwargs)
self.__mutate__(event)
self.__publish__([event])
[docs] def __mutate__(self, event: TDomainEvent) -> None:
"""
Mutates this entity with the given event.
This method calls on the event object to mutate this
entity, because the mutation behaviour of different types
of events was usefully factored onto the event classes, and
the event mutate() method is the most convenient way to
defined behaviour in domain models.
However, as an alternative to implementing the mutate()
method on domain model events, this method can be extended
with a method that is capable of mutating an entity for all
the domain event classes introduced by the entity class.
Similarly, this method can be overridden entirely in subclasses,
so long as all of the mutation behaviour is implemented in the
mutator function, including the mutation behaviour of the events
defined on the library event classes that would no longer be invoked.
However, if the entity class defines a mutator function, or if a
separate mutator function is used, then it must be involved in
the event sourced repository used to replay events, which by default
knows nothing about the domain entity class. In practice, this
means having a repository for each kind of entity, rather than
the application just having one repository, with each repository
having a mutator function that can project the entity events
into an entity.
"""
assert isinstance(event, DomainEntity.Event)
event.__mutate__(self)
[docs] def __publish__(self, event: Sequence[TDomainEvent]) -> None:
"""
Publishes given event for subscribers in the application.
:param event: domain event or list of events
"""
self.__publish_to_subscribers__(event)
[docs] def __publish_to_subscribers__(self, events: Sequence[TDomainEvent]) -> None:
"""
Actually dispatches given event to publish-subscribe mechanism.
:param events: list of domain events
"""
publish(events)
[docs] def __eq__(self, other: object) -> bool:
return type(self) == type(other) and self.__dict__ == other.__dict__
[docs] def __ne__(self, other: object) -> bool:
return not self.__eq__(other)
TEntityWithHashchain = TypeVar("TEntityWithHashchain", bound="EntityWithHashchain")
GENESIS_HASH: str = os.getenv("GENESIS_HASH", "")
[docs]class EntityWithHashchain(DomainEntity):
__genesis_hash__ = GENESIS_HASH
def __init__(self, *args: Any, **kwargs: Any):
super(EntityWithHashchain, self).__init__(*args, **kwargs)
self.__head__: str = type(self).__genesis_hash__
[docs] class Event(
EventWithHash[TEntityWithHashchain], DomainEntity.Event[TEntityWithHashchain]
):
"""
Supertype for events of domain entities.
"""
[docs] def __mutate__(
self, obj: Optional[TEntityWithHashchain]
) -> Optional[TEntityWithHashchain]:
# Call super method.
obj = super(EntityWithHashchain.Event, self).__mutate__(obj)
# Set entity head from event hash.
# - unless just discarded...
if obj is not None:
assert isinstance(obj, EntityWithHashchain)
obj.__head__ = self.__event_hash__
return obj
[docs] def __check_obj__(self, obj: TEntityWithHashchain) -> None:
"""
Extends superclass method by checking the __previous_hash__
of this event matches the __head__ hash of the entity obj.
"""
# Call super method.
super(EntityWithHashchain.Event, self).__check_obj__(obj)
assert isinstance(obj, EntityWithHashchain) # For PyCharm navigation.
# Assert __head__ equals __previous_hash__.
if obj.__head__ != self.__dict__.get("__previous_hash__"):
raise HeadHashError(obj.id, obj.__head__, type(self))
[docs] class Created(
Event[TEntityWithHashchain], DomainEntity.Created[TEntityWithHashchain]
):
@property
def __entity_kwargs__(self) -> Dict[str, Any]:
# Get super property.
kwargs = super(EntityWithHashchain.Created, self).__entity_kwargs__
# Drop the event hashes.
kwargs.pop("__event_hash__", None)
kwargs.pop("__previous_hash__", None)
return kwargs
[docs] class AttributeChanged(
Event[TEntityWithHashchain], DomainEntity.AttributeChanged[TEntityWithHashchain]
):
pass
[docs] class Discarded(
Event[TEntityWithHashchain], DomainEntity.Discarded[TEntityWithHashchain]
):
[docs] def __mutate__(
self, obj: Optional[TEntityWithHashchain]
) -> Optional[TEntityWithHashchain]:
# Set entity head from event hash.
if obj:
assert isinstance(obj, EntityWithHashchain) # For PyCharm navigation.
obj.__head__ = self.__event_hash__
# Call super method.
return super(EntityWithHashchain.Discarded, self).__mutate__(obj)
[docs] @classmethod
def __create__(
cls: Type[TEntityWithHashchain],
originator_id: Optional[UUID] = None,
event_class: Optional[
Type["DomainEntity.Created[TEntityWithHashchain]"]
] = None,
**kwargs: Any,
) -> TEntityWithHashchain:
# Initialise the hash-chain with "genesis hash".
kwargs["__previous_hash__"] = getattr(cls, "__genesis_hash__", GENESIS_HASH)
assert issubclass(cls, EntityWithHashchain) # For PyCharm navigation.
obj = super(EntityWithHashchain, cls).__create__(
originator_id=originator_id, event_class=event_class, **kwargs
)
assert isinstance(obj, EntityWithHashchain) # For PyCharm type checking.
return obj
[docs] def __trigger_event__(self, event_class: Type[TDomainEvent], **kwargs: Any) -> None:
kwargs["__previous_hash__"] = self.__head__
super(EntityWithHashchain, self).__trigger_event__(event_class, **kwargs)
TVersionedEntity = TypeVar("TVersionedEntity", bound="VersionedEntity")
TVersionedEvent = TypeVar("TVersionedEvent", bound="VersionedEntity.Event")
[docs]class VersionedEntity(DomainEntity):
def __init__(self, __version__: int, **kwargs: Any):
super().__init__(**kwargs)
self.___version__: int = __version__
@property
def __version__(self) -> int:
return self.___version__
[docs] def __trigger_event__(self, event_class: Type[TDomainEvent], **kwargs: Any) -> None:
"""
Increments the version number when an event is triggered.
The event carries the version number that the originator
will have when the originator is mutated with this event.
(The event's "originator" version isn't the version of the
originator before the event was triggered, but represents
the result of the work of incrementing the version, which
is then set in the event as normal. The Created event has
version 0, and a newly created instance is at version 0.
The second event has originator version 1, and so will the
originator when the second event has been applied.
"""
# Do the work of incrementing the version number.
next_version = self.__version__ + 1
# Trigger an event with the result of this work.
super(VersionedEntity, self).__trigger_event__(
event_class=event_class, originator_version=next_version, **kwargs
)
[docs] class Event(
EventWithOriginatorVersion[TVersionedEntity],
DomainEntity.Event[TVersionedEntity],
):
"""Supertype for events of versioned entities."""
[docs] def __mutate__(
self, obj: Optional[TVersionedEntity]
) -> Optional[TVersionedEntity]:
obj = super(VersionedEntity.Event, self).__mutate__(obj)
if obj is not None:
assert isinstance(obj, VersionedEntity) # For PyCharm navigation.
obj.___version__ = self.originator_version
return obj
[docs] def __check_obj__(self, obj: TVersionedEntity) -> None:
"""
Extends superclass method by checking the event's
originator version follows (1 +) this entity's version.
"""
super(VersionedEntity.Event, self).__check_obj__(obj)
assert isinstance(obj, VersionedEntity) # For PyCharm navigation.
# Assert the version sequence is correct.
if self.originator_version != obj.__version__ + 1:
raise OriginatorVersionError(
(
"Event takes entity to version {}, "
"but entity is currently at version {}. "
"Event type: '{}', entity type: '{}', entity ID: '{}'"
"".format(
self.originator_version,
obj.__version__,
type(self).__name__,
type(obj).__name__,
obj._id,
)
)
)
[docs] class Created(DomainEntity.Created[TVersionedEntity], Event[TVersionedEntity]):
"""Published when a VersionedEntity is created."""
def __init__(self, originator_version: int = 0, *args: Any, **kwargs: Any):
super(VersionedEntity.Created, self).__init__(
originator_version=originator_version, *args, **kwargs
)
@property
def __entity_kwargs__(self) -> Dict[str, Any]:
# Get super property.
kwargs = super(VersionedEntity.Created, self).__entity_kwargs__
kwargs["__version__"] = kwargs.pop("originator_version")
return kwargs
[docs] class AttributeChanged(
Event[TVersionedEntity], DomainEntity.AttributeChanged[TVersionedEntity]
):
"""Published when a VersionedEntity is changed."""
[docs] class Discarded(Event[TVersionedEntity], DomainEntity.Discarded[TVersionedEntity]):
"""Published when a VersionedEntity is discarded."""
[docs]class EntityWithECC(DomainEntity):
"""
Entity whose events have event ID, correlation ID, and causation ID.
"""
[docs] class Event(DomainEntity.Event):
def __init__(self, *, processed_event=None, application_name, **kwargs):
event_id = kwargs.get("event_id") or "{}:{}:{}".format(
application_name, kwargs["originator_id"], kwargs["originator_version"]
)
kwargs["event_id"] = event_id
if processed_event:
kwargs["causation_id"] = processed_event.event_id
kwargs["correlation_id"] = processed_event.correlation_id
else:
kwargs["causation_id"] = None
kwargs["correlation_id"] = event_id
super().__init__(**kwargs)
@property
def event_id(self):
return self.__dict__["event_id"]
@property
def correlation_id(self):
return self.__dict__["correlation_id"]
@property
def causation_id(self):
return self.__dict__["causation_id"]
[docs] class Created(DomainEntity.Created, Event):
pass
[docs] class AttributeChanged(Event, DomainEntity.AttributeChanged):
pass
[docs] class Discarded(Event, DomainEntity.Discarded):
pass
def __init__(self, *, event_id, correlation_id, causation_id, **kwargs):
_ = event_id, correlation_id, causation_id # accept, but ignore
super(EntityWithECC, self).__init__(**kwargs)
TTimestampedEntity = TypeVar("TTimestampedEntity", bound="TimestampedEntity")
[docs]class TimestampedEntity(DomainEntity):
def __init__(self, __created_on__: Decimal, **kwargs: Any):
super(TimestampedEntity, self).__init__(**kwargs)
self.___created_on__ = __created_on__
self.___last_modified__ = __created_on__
@property
def __created_on__(self) -> Decimal:
return self.___created_on__
@property
def __last_modified__(self) -> Decimal:
return self.___last_modified__
[docs] class Event(
DomainEntity.Event[TTimestampedEntity], EventWithTimestamp[TTimestampedEntity]
):
"""Supertype for events of timestamped entities."""
[docs] def __mutate__(
self, obj: Optional[TTimestampedEntity]
) -> Optional[TTimestampedEntity]:
"""Updates 'obj' with values from self."""
obj = super(TimestampedEntity.Event, self).__mutate__(obj)
if obj is not None:
assert isinstance(obj, TimestampedEntity) # For PyCharm navigation.
obj.___last_modified__ = self.timestamp
return obj
[docs] class Created(DomainEntity.Created[TTimestampedEntity], Event[TTimestampedEntity]):
"""Published when a TimestampedEntity is created."""
@property
def __entity_kwargs__(self) -> Dict[str, Any]:
# Get super property.
kwargs = super(TimestampedEntity.Created, self).__entity_kwargs__
kwargs["__created_on__"] = kwargs.pop("timestamp")
return kwargs
[docs] class AttributeChanged(
Event[TTimestampedEntity], DomainEntity.AttributeChanged[TTimestampedEntity]
):
"""Published when a TimestampedEntity is changed."""
[docs] class Discarded(
Event[TTimestampedEntity], DomainEntity.Discarded[TTimestampedEntity]
):
"""Published when a TimestampedEntity is discarded."""
# Todo: Move stuff from "test_customise_with_alternative_domain_event_type" in here (
# to define event classes
# and update ___last_event_id__ in mutate method).
[docs]class TimeuuidedEntity(DomainEntity):
def __init__(self, event_id: UUID, **kwargs: Any) -> None:
super(TimeuuidedEntity, self).__init__(**kwargs)
self.___initial_event_id__ = event_id
self.___last_event_id__ = event_id
@property
def __created_on__(self) -> Decimal:
return decimaltimestamp_from_uuid(self.___initial_event_id__)
@property
def __last_modified__(self) -> Decimal:
return decimaltimestamp_from_uuid(self.___last_event_id__)
TTimestampedVersionedEntity = TypeVar(
"TTimestampedVersionedEntity", bound="TimestampedVersionedEntity"
)
[docs]class TimestampedVersionedEntity(TimestampedEntity, VersionedEntity):
[docs] class Event(
TimestampedEntity.Event[TTimestampedVersionedEntity],
VersionedEntity.Event[TTimestampedVersionedEntity],
):
"""Supertype for events of timestamped, versioned entities."""
[docs] class Created(
TimestampedEntity.Created[TTimestampedVersionedEntity],
VersionedEntity.Created,
Event[TTimestampedVersionedEntity],
):
"""Published when a TimestampedVersionedEntity is created."""
[docs] class AttributeChanged(
Event[TTimestampedVersionedEntity],
TimestampedEntity.AttributeChanged[TTimestampedVersionedEntity],
VersionedEntity.AttributeChanged[TTimestampedVersionedEntity],
):
"""Published when a TimestampedVersionedEntity is created."""
[docs] class Discarded(
Event[TTimestampedVersionedEntity],
TimestampedEntity.Discarded[TTimestampedVersionedEntity],
VersionedEntity.Discarded[TTimestampedVersionedEntity],
):
"""Published when a TimestampedVersionedEntity is discarded."""
[docs]class TimeuuidedVersionedEntity(TimeuuidedEntity, VersionedEntity):
pass