Source code for eventsourcing.domain.model.aggregate

"""
aggregate
~~~~~~~~~

Base classes for aggregates in a domain driven design.
"""
from collections import deque

from eventsourcing.domain.model.entity import TimestampedVersionedEntity, EntityWithHashchain


[docs]class BaseAggregateRoot(TimestampedVersionedEntity): """ Root entity for an aggregate in a domain driven design. """
[docs] class Event(TimestampedVersionedEntity.Event): """Supertype for aggregate events."""
[docs] class Created(Event, TimestampedVersionedEntity.Created): """Published when an AggregateRoot is created."""
[docs] class AttributeChanged(Event, TimestampedVersionedEntity.AttributeChanged): """Published when an AggregateRoot is changed."""
[docs] class Discarded(Event, TimestampedVersionedEntity.Discarded): """Published when an AggregateRoot is discarded."""
[docs] def __init__(self, **kwargs): super(BaseAggregateRoot, self).__init__(**kwargs) self.__pending_events__ = deque()
[docs] def __save__(self): """ Publishes pending events for others in application. """ batch_of_events = self.__batch_pending_events__() if batch_of_events: self.__publish_to_subscribers__(batch_of_events)
# Don't catch exception and put the events back on the queue. # Losing them here is consistent with the behaviour of DomainEntity # when an event cannot be stored: the event is effectively lost, the # state of the entity must be reset, and the operation repeated. # In case of an aggregate sequence conflict, developers need to # know what has happened since the last save, so can retry only the # command(s) that have caused conflict. Best to save once per command, # so the command can be retried cleanly. The purpose of the save # method is to allow many events from one command to be persisted # together. The save command can be used to persist all the events # from many commands, but in case of a failed save after several # commands have been executed, it is important to know which # commands to retry.
[docs] def __batch_pending_events__(self): batch_of_events = [] try: while True: batch_of_events.append(self.__pending_events__.popleft()) except IndexError: pass return batch_of_events
[docs] def __publish__(self, event): """ Appends event to internal collection of pending events. """ self.__pending_events__.append(event)
[docs]class AggregateRootWithHashchainedEvents(EntityWithHashchain, BaseAggregateRoot):
[docs] class Event(EntityWithHashchain.Event, BaseAggregateRoot.Event): """Supertype for aggregate events."""
[docs] class Created(Event, EntityWithHashchain.Created, BaseAggregateRoot.Created): """Published when an AggregateRoot is created."""
[docs] class AttributeChanged(Event, BaseAggregateRoot.AttributeChanged): """Published when an AggregateRoot is changed."""
[docs] class Discarded(Event, EntityWithHashchain.Discarded, BaseAggregateRoot.Discarded): """Published when an AggregateRoot is discarded."""
[docs]class AggregateRoot(AggregateRootWithHashchainedEvents):
[docs] class Event(AggregateRootWithHashchainedEvents.Event): """Supertype for aggregate events."""
[docs] class Created(Event, AggregateRootWithHashchainedEvents.Created): """Published when an AggregateRoot is created."""
[docs] class AttributeChanged(Event, AggregateRootWithHashchainedEvents.AttributeChanged): """Published when an AggregateRoot is changed."""
[docs] class Discarded(Event, AggregateRootWithHashchainedEvents.Discarded): """Published when an AggregateRoot is discarded."""