Aggregates in DDD¶
Eric Evans’ book Domain Driven Design describes an abstraction called “aggregate”:
“An aggregate is a cluster of associated objects that we treat as a unit for the purpose of data changes. Each aggregate has a root and a boundary.”
Therefore,
“Cluster the entities and value objects into aggregates and define boundaries around each. Choose one entity to be the root of each aggregate, and control all access to the objects inside the boundary through the root. Allow external objects to hold references to the root only.”
Which seems to suggest an event sourced aggregate must have a set of events and a mutator function that pertain to a cluster of objects within a boundary. Also an entity that can function as the root of the cluster of objects, with identity distinguishable across the application, and methods that exclusively operate on the objects of the aggregate.
Since one command may result in several events, it is also important never to persist only some events that result from executing a command. And so events must be appended to the event store in a single atomic transaction, so that if some of the events resulting from executing a command cannot be stored then none of them will be stored.
Aggregate root¶
Let’s define an aggregate root using class TimestampedVersionedEntity
from the library. The Example
class used in the previous
section on snapshotting also derives from TimestampedVersionedEntity
.
The example aggregate root class below defines (as as inner class) the
domain event class ExampleCreated
which will be published by the aggregate
when creating “example” objects, and a method count_examples()
that
can operate on all the “example” objects of the aggregate.
from eventsourcing.domain.model.entity import TimestampedVersionedEntity
class ExampleAggregateRoot(TimestampedVersionedEntity):
"""
Root entity of example aggregate.
"""
class Event(TimestampedVersionedEntity.Event):
"""Layer supertype."""
class Created(Event, TimestampedVersionedEntity.Created):
"""Published when aggregate is created."""
class Discarded(Event, TimestampedVersionedEntity.Discarded):
"""Published when aggregate is discarded."""
class ExampleCreated(Event):
"""Published when an "example" object in the aggregate is created."""
def __init__(self, **kwargs):
super(ExampleAggregateRoot, self).__init__(**kwargs)
self._pending_events = []
self._examples = {}
def count_examples(self):
return len(self._examples)
def create_new_example(self):
assert not self._is_discarded
event = ExampleAggregateRoot.ExampleCreated(
example_id=uuid.uuid4(),
originator_id=self.id,
originator_version=self.version,
)
mutate_aggregate(self, event)
self._publish(event)
def _publish(self, event):
self._pending_events.append(event)
def save(self):
publish(self._pending_events[:])
self._pending_events = []
class Example(object):
"""
Example entity, exists only within the example aggregate boundary.
"""
def __init__(self, example_id):
self._id = example_id
@property
def id(self):
return self._id
The methods of the aggregate, and the factory below, are similar to previous
examples. But instead of immediately publishing events using the publish()
function, the events are appended to an internal list of pending events
using the aggregate’s method _publish()
. The aggregate then has a save()
method which is used to publish all the pending events in a single list using
the function publish()
.
As before, we’ll also need a factory and a mutator function. The factory function here works in the same way as before.
def create_example_aggregate():
"""
Factory function for example aggregate.
"""
# Construct event.
event = ExampleAggregateRoot.Created(originator_id=uuid.uuid4())
# Mutate aggregate.
aggregate = mutate_aggregate(aggregate=None, event=event)
# Publish event to internal list only.
aggregate._publish(event)
# Return the new aggregate object.
return aggregate
The mutator function mutate_aggregate()
below handles events Created
and
Discarded
similarly to the previous examples. It also handles ExampleCreated
,
by constructing an object class Example
that it adds to the aggregate’s internal
collection of examples.
def mutate_aggregate(aggregate, event):
"""
Mutator function for example aggregate.
"""
# Handle "created" events by constructing the aggregate object.
if isinstance(event, ExampleAggregateRoot.Created):
aggregate = ExampleAggregateRoot(**event.__dict__)
aggregate._version += 1
return aggregate
# Handle "example entity created" events by adding a new entity
# to the aggregate's dict of entities.
elif isinstance(event, ExampleAggregateRoot.ExampleCreated):
aggregate._assert_not_discarded()
entity = Example(example_id=event.example_id)
aggregate._examples[str(entity.id)] = entity
aggregate._version += 1
aggregate._last_modified_on = event.timestamp
return aggregate
# Handle "discarded" events by returning 'None'.
elif isinstance(event, ExampleAggregateRoot.Discarded):
aggregate._assert_not_discarded()
aggregate._version += 1
aggregate._is_discarded = True
return None
else:
raise NotImplementedError(type(event))
Application and infrastructure¶
Set up a database table using library classes.
from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemySettings, SQLAlchemyDatastore
from eventsourcing.infrastructure.sqlalchemy.activerecords import IntegerSequencedItemRecord
datastore = SQLAlchemyDatastore(
settings=SQLAlchemySettings(uri='sqlite:///:memory:'),
tables=(IntegerSequencedItemRecord,),
)
datastore.setup_connection()
datastore.setup_tables()
Define an application class that uses the domain model code above, and infrastructure and policy classes from the library.
import uuid
import time
from eventsourcing.application.policies import PersistencePolicy
from eventsourcing.domain.model.events import publish
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
from eventsourcing.infrastructure.sqlalchemy.activerecords import SQLAlchemyActiveRecordStrategy
class ExampleDDDApplication(object):
def __init__(self, session):
self.event_store = EventStore(
active_record_strategy=SQLAlchemyActiveRecordStrategy(
session=session,
active_record_class=IntegerSequencedItemRecord,
),
sequenced_item_mapper=SequencedItemMapper(
sequence_id_attr_name='originator_id',
position_attr_name='originator_version',
)
)
self.aggregate_repository = EventSourcedRepository(
event_store=self.event_store,
mutator=mutate_aggregate,
)
self.persistence_policy = PersistencePolicy(
event_store=self.event_store,
event_type=ExampleAggregateRoot.Event
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.persistence_policy.close()
Run the code¶
The application can be used to create new aggregates, and aggregates can be used to
create new entities. Events are published in batches when the aggregate’s save()
method is called.
with ExampleDDDApplication(datastore.session) as app:
# Create a new aggregate.
aggregate = create_example_aggregate()
aggregate.save()
# Check it exists in the repository.
assert aggregate.id in app.aggregate_repository, aggregate.id
# Check the aggregate has zero entities.
assert aggregate.count_examples() == 0
# Check the aggregate has zero entities.
assert aggregate.count_examples() == 0
# Ask the aggregate to create an entity within itself.
aggregate.create_new_example()
# Check the aggregate has one entity.
assert aggregate.count_examples() == 1
# Check the aggregate in the repo still has zero entities.
assert app.aggregate_repository[aggregate.id].count_examples() == 0
# Call save().
aggregate.save()
# Check the aggregate in the repo now has one entity.
assert app.aggregate_repository[aggregate.id].count_examples() == 1
# Create two more entities within the aggregate.
aggregate.create_new_example()
aggregate.create_new_example()
# Save both "entity created" events in one atomic transaction.
aggregate.save()
# Check the aggregate in the repo now has three entities.
assert app.aggregate_repository[aggregate.id].count_examples() == 3
# Discard the aggregate, but don't call save() yet.
aggregate.discard()
# Check the aggregate still exists in the repo.
assert aggregate.id in app.aggregate_repository
# Call save().
aggregate.save()
# Check the aggregate no longer exists in the repo.
assert aggregate.id not in app.aggregate_repository
The library has an AggregateRoot
class that is slightly more developed than the code in this example.