from collections import deque
from typing import Any, Deque, Generic, List, Sequence, TypeVar
from eventsourcing.domain.model.entity import (
DomainEntity,
EntityWithHashchain,
TDomainEvent,
TimestampedVersionedEntity,
)
TAggregate = TypeVar("TAggregate", bound="BaseAggregateRoot")
TAggregateEvent = TypeVar("TAggregateEvent", bound="BaseAggregateRoot.Event")
[docs]class BaseAggregateRoot(TimestampedVersionedEntity, Generic[TAggregateEvent]):
"""
Root entity for an aggregate in a domain driven design.
"""
[docs] class Event(TimestampedVersionedEntity.Event[TAggregate]):
"""Supertype for base aggregate root events."""
[docs] class Created(TimestampedVersionedEntity.Created[TAggregate], Event[TAggregate]):
"""Triggered when an aggregate root is created."""
[docs] class AttributeChanged(
Event[TAggregate], TimestampedVersionedEntity.AttributeChanged[TAggregate]
):
"""Triggered when an aggregate root attribute is changed."""
[docs] class Discarded(
Event[TAggregate], TimestampedVersionedEntity.Discarded[TAggregate]
):
"""Triggered when an aggregate root is discarded."""
[docs] def __init__(self, **kwargs: Any) -> None:
super(BaseAggregateRoot, self).__init__(**kwargs)
self.__pending_events__: Deque[DomainEntity.Event] = deque()
[docs] def __publish__(self, event: Sequence[TDomainEvent]) -> None:
"""
Defers publishing event(s) to subscribers, by adding
event to internal collection of pending events.
"""
self.__pending_events__.extend(event)
[docs] def __save__(self) -> None:
"""
Publishes all pending events to subscribers.
"""
batch_of_events = self.__batch_pending_events__()
if batch_of_events:
self.__publish_to_subscribers__(batch_of_events)
# Don't catch exception and put the events back on the queue.
# Losing them here is consistent with the behaviour of DomainEntity
# when an event cannot be stored: the event is effectively lost, the
# state of the entity must be reset, and the operation repeated.
# In case of an aggregate sequence conflict, developers need to
# know what has happened since the last save, so can retry only the
# command(s) that have caused conflict. Best to save once per command,
# so the command can be retried cleanly. The purpose of the save
# method is to allow many events from one command to be persisted
# together. The save command can be used to persist all the events
# from many commands, but in case of a failed save after several
# commands have been executed, it is important to know which
# commands to retry.
def __batch_pending_events__(self) -> List[DomainEntity.Event]:
batch_of_events: List[DomainEntity.Event] = []
try:
while True:
batch_of_events.append(self.__pending_events__.popleft())
except IndexError:
pass
return batch_of_events
TAggregateRootWithHashchainedEvents = TypeVar(
"TAggregateRootWithHashchainedEvents", bound="AggregateRootWithHashchainedEvents"
)
[docs]class AggregateRootWithHashchainedEvents(EntityWithHashchain, BaseAggregateRoot):
"""Extends aggregate root base class with hash-chained events."""
[docs] class Event(
EntityWithHashchain.Event[TAggregateRootWithHashchainedEvents],
BaseAggregateRoot.Event[TAggregateRootWithHashchainedEvents],
):
"""Supertype for aggregate events."""
[docs] class Created(
EntityWithHashchain.Created[TAggregateRootWithHashchainedEvents],
BaseAggregateRoot.Created[TAggregateRootWithHashchainedEvents],
Event[TAggregateRootWithHashchainedEvents],
):
"""Triggered when an aggregate root is created."""
[docs] class AttributeChanged(
Event[TAggregateRootWithHashchainedEvents],
BaseAggregateRoot.AttributeChanged[TAggregateRootWithHashchainedEvents],
):
"""Triggered when an aggregate root attribute is changed."""
[docs] class Discarded(
Event[TAggregateRootWithHashchainedEvents],
EntityWithHashchain.Discarded[TAggregateRootWithHashchainedEvents],
BaseAggregateRoot.Discarded[TAggregateRootWithHashchainedEvents],
):
"""Triggered when an aggregate root is discarded."""
# For backwards compatibility.
[docs]class AggregateRoot(AggregateRootWithHashchainedEvents):
"""Original name for aggregate root base class with hash-chained events."""
[docs] class Event(
AggregateRootWithHashchainedEvents.Event[TAggregateRootWithHashchainedEvents]
):
"""Supertype for aggregate events."""
[docs] class Created(
Event[TAggregateRootWithHashchainedEvents],
AggregateRootWithHashchainedEvents.Created[TAggregateRootWithHashchainedEvents],
):
"""Triggered when an aggregate root is created."""
[docs] class AttributeChanged(
Event[TAggregateRootWithHashchainedEvents],
AggregateRootWithHashchainedEvents.AttributeChanged[
TAggregateRootWithHashchainedEvents
],
):
"""Triggered when an aggregate root attribute is changed."""
[docs] class Discarded(
Event[TAggregateRootWithHashchainedEvents],
AggregateRootWithHashchainedEvents.Discarded[
TAggregateRootWithHashchainedEvents
],
):
"""Triggered when an aggregate root is discarded."""