Source code for eventsourcing.domain.model.aggregate

from collections import deque

from eventsourcing.domain.model.entity import TimestampedVersionedEntity, WithReflexiveMutator
from eventsourcing.domain.model.events import publish


[docs]class AggregateRoot(WithReflexiveMutator, TimestampedVersionedEntity): """ Root entity for an aggregate in a domain driven design. """
[docs] class Event(TimestampedVersionedEntity.Event): """Layer supertype."""
[docs] class Created(Event, TimestampedVersionedEntity.Created): """Published when an AggregateRoot is created."""
[docs] class AttributeChanged(Event, TimestampedVersionedEntity.AttributeChanged): """Published when an AggregateRoot is changed."""
[docs] class Discarded(Event, TimestampedVersionedEntity.Discarded): """Published when an AggregateRoot is discarded."""
def __init__(self, **kwargs): super(AggregateRoot, self).__init__(**kwargs) self._pending_events = deque()
[docs] def save(self): """ Publishes pending events for others in application. """ batch_of_events = [] try: while True: batch_of_events.append(self._pending_events.popleft()) except IndexError: pass if batch_of_events: publish(batch_of_events)
def _trigger(self, event_class, **kwargs): """ Constructs, applies, and publishes domain event of given class, with given kwargs. """ domain_event = event_class( originator_id=self.id, originator_version=self.version, **kwargs ) self._apply_and_publish(domain_event) def _publish(self, event): """ Appends event to internal collection of pending events. """ self._pending_events.append(event)