Domain model

The library’s domain layer has base classes for domain events and entities. These classes show how to write a domain model that uses the library’s event sourcing infrastructure. They can also be used to develop an event-sourced application as a domain driven design.

Domain events

The purpose of a domain event is to be published when something happens, normally the results from the work of a command. The library has a base class for domain events called DomainEvent.

Domain events can be freely constructed from the DomainEvent class. Attributes are set directly from the constructor keyword arguments.

from eventsourcing.domain.model.events import DomainEvent

domain_event = DomainEvent(a=1)
assert domain_event.a == 1

The attributes of domain events are read-only. New values cannot be assigned to existing objects. Domain events are immutable in that sense.

# Fail to set attribute of already-existing domain event.
try:
    domain_event.a = 2
except AttributeError:
    pass
else:
    raise Exception("Shouldn't get here")

Domain events can be compared for equality as value objects, instances are equal if they have the same type and the same attributes.

DomainEvent(a=1) == DomainEvent(a=1)

DomainEvent(a=1) != DomainEvent(a=2)

DomainEvent(a=1) != DomainEvent(b=1)

Publish-subscribe

Domain events can be published, using the library’s publish-subscribe mechanism.

The publish() function is used to publish events. The event arg is required.

from eventsourcing.domain.model.events import publish

publish(event=domain_event)

The subscribe() function is used to subscribe a handler that will receive events.

The optional predicate arg can be used to provide a function that will decide whether or not the subscribed handler will actually be called when an event is published.

from eventsourcing.domain.model.events import subscribe

received_events = []

def receive_event(event):
    received_events.append(event)

def is_domain_event(event):
    return isinstance(event, DomainEvent)

subscribe(handler=receive_event, predicate=is_domain_event)

# Publish the domain event.
publish(domain_event)

assert len(received_events) == 1
assert received_events[0] == domain_event

The unsubscribe() function can be used to stop the handler receiving further events.

from eventsourcing.domain.model.events import unsubscribe

unsubscribe(handler=receive_event, predicate=is_domain_event)

# Clean up.
del received_events[:]  # received_events.clear()

Event library

The library has a small collection of domain event subclasses, such as EventWithOriginatorID, EventWithOriginatorVersion, EventWithTimestamp, EventWithTimeuuid, Created, AttributeChanged, Discarded.

Some of these classes provide useful defaults for particular attributes, such as a timestamp. Timestamps can be used to sequence events.

from eventsourcing.domain.model.events import EventWithTimestamp
from eventsourcing.domain.model.events import EventWithTimeuuid
from decimal import Decimal
from uuid import UUID

# Automatic timestamp.
assert isinstance(EventWithTimestamp().timestamp, Decimal)

# Automatic UUIDv1.
assert isinstance(EventWithTimeuuid().event_id, UUID)

Some classes require particular arguments when constructed. The originator_id can be used to identify a sequence to which an event belongs. The originator_version can be used to position the event in a sequence.

from eventsourcing.domain.model.events import EventWithOriginatorVersion
from eventsourcing.domain.model.events import EventWithOriginatorID
from uuid import uuid4

# Requires originator_id.
EventWithOriginatorID(originator_id=uuid4())

# Requires originator_version.
EventWithOriginatorVersion(originator_version=0)

Some are just useful for their distinct type, for example in subscription predicates.

from eventsourcing.domain.model.events import Created, AttributeChanged, Discarded

def is_created(event):
    return isinstance(event, Created)

def is_attribute_changed(event):
    return isinstance(event, AttributeChanged)

def is_discarded(event):
    return isinstance(event, Discarded)

assert is_created(Created()) is True
assert is_created(Discarded()) is False
assert is_created(DomainEvent()) is False

assert is_discarded(Created()) is False
assert is_discarded(Discarded()) is True
assert is_discarded(DomainEvent()) is False

assert is_domain_event(Created()) is True
assert is_domain_event(Discarded()) is True
assert is_domain_event(DomainEvent()) is True

Custom events

Custom domain events can be coded by subclassing the library’s domain event classes.

Domain events are normally named using the past participle of a common verb, for example a regular past participle such as “started”, “paused”, “stopped”, or an irregular past participle such as “chosen”, “done”, “found”, “paid”, “quit”, “seen”.

class SomethingHappened(DomainEvent):
    """
    Published whenever something happens.
    """

It is possible to code domain events as inner or nested classes.

class Job(object):

    class Seen(EventWithTimestamp):
        """
        Published when the job is seen.
        """

    class Done(EventWithTimestamp):
        """
        Published when the job is done.
        """

Inner or nested classes can be used, and are used in the library, to define the domain events of a domain entity on the entity class itself.

seen = Job.Seen(job_id='#1')
done = Job.Done(job_id='#1')

assert done.timestamp > seen.timestamp

So long as the entity event classes inherit ultimately from library class QualnameABC, which DomainEvent does, the utility functions get_topic() and resolve_topic() can work with domain events defined as inner or nested classes in all versions of Python. These functions are used in the DomainEntity.Created event class, and in the infrastructure class SequencedItemMapper. The requirement to inherit from QualnameABC actually only applies when using nested classes in Python 2.7 with the utility functions get_topic() and resolve_topic(). Events classes that are not nested, or that will not be run with Python 2.7, do not need to inherit from QualnameABC in order to work with these two functions (and hence the library domain and infrastructure classes which use those functions).

Domain entities

A domain entity is an object that is not defined by its attributes, but rather by a thread of continuity and its identity. The attributes of a domain entity can change, directly by assignment, or indirectly by calling a method of the object.

The library has a base class for domain entities called DomainEntity, which has an id attribute.

from eventsourcing.domain.model.entity import DomainEntity

entity_id = uuid4()

entity = DomainEntity(id=entity_id)

assert entity.id == entity_id

Entity library

The library also has a domain entity class called VersionedEntity, which extends the DomainEntity class with a __version__ attribute.

from eventsourcing.domain.model.entity import VersionedEntity

entity = VersionedEntity(id=entity_id, __version__=1)

assert entity.id == entity_id
assert entity.__version__ == 1

The library also has a domain entity class called TimestampedEntity, which extends the DomainEntity class with attributes __created_on__ and __last_modified__.

from eventsourcing.domain.model.entity import TimestampedEntity

entity = TimestampedEntity(id=entity_id, __created_on__=123)

assert entity.id == entity_id
assert entity.__created_on__ == 123
assert entity.__last_modified__ == 123

There is also a TimestampedVersionedEntity that has id, __version__, __created_on__, and __last_modified__ attributes.

from eventsourcing.domain.model.entity import TimestampedVersionedEntity

entity = TimestampedVersionedEntity(id=entity_id, __version__=1, __created_on__=123)

assert entity.id == entity_id
assert entity.__created_on__ == 123
assert entity.__last_modified__ == 123
assert entity.__version__ == 1

A timestamped, versioned entity is both a timestamped entity and a versioned entity.

assert isinstance(entity, TimestampedEntity)
assert isinstance(entity, VersionedEntity)

Naming style

The double leading and trailing underscore naming style, seen above, is used consistently in the library’s domain entity and event base classes for attribute and method names, so that developers can begin with a clean namespace. The intention is that the library functionality is included in the application by aliasing these library names with names that work within the project’s ubiquitous language.

This style breaks PEP8, but it seems worthwhile in order to keep the “normal” Python object namespace free for domain modelling. It is a style used by other libraries (such as SQLAlchemy and Django) for similar reasons.

The exception is the id attribute of the domain entity base class, which is assumed to be required by all domain entities (or aggregates) in all domains.

Entity events

The library’s domain entity classes have domain events defined as inner classes: Event, Created, AttributeChanged, and Discarded.

DomainEntity.Event
DomainEntity.Created
DomainEntity.AttributeChanged
DomainEntity.Discarded

The domain event class DomainEntity.Event is a super type of the others. The others also inherit from the library base classes Created, AttributeChanged, and Discarded. All these domain events classes are subclasses of DomainEvent.

assert issubclass(DomainEntity.Created, DomainEntity.Event)
assert issubclass(DomainEntity.AttributeChanged, DomainEntity.Event)
assert issubclass(DomainEntity.Discarded, DomainEntity.Event)

assert issubclass(DomainEntity.Created, Created)
assert issubclass(DomainEntity.AttributeChanged, AttributeChanged)
assert issubclass(DomainEntity.Discarded, Discarded)

assert issubclass(DomainEntity.Event, DomainEvent)

These entity event classes can be freely constructed, with suitable arguments.

All events need an originator_id. Events of versioned entities also need an originator_version. Events of timestamped entities generate a current timestamp value, unless one is given. Created events also need an originator_topic. The other events need an __previous_hash__. AttributeChanged events also need name and value.

All the events of DomainEntity use SHA-256 to generate an event_hash from the event attribute values when constructed for the first time. Events can be chained together by constructing each subsequent event to have its __previous_hash__ as the event_hash of the previous event.

from eventsourcing.utils.topic import get_topic

entity_id = UUID('b81d160d-d7ef-45ab-a629-c7278082a845')

created = VersionedEntity.Created(
    originator_version=0,
    originator_id=entity_id,
    originator_topic=get_topic(VersionedEntity)
)

attribute_a_changed = VersionedEntity.AttributeChanged(
    name='a',
    value=1,
    originator_version=1,
    originator_id=entity_id,
    __previous_hash__=created.__event_hash__,
)

attribute_b_changed = VersionedEntity.AttributeChanged(
    name='b',
    value=2,
    originator_version=2,
    originator_id=entity_id,
    __previous_hash__=attribute_a_changed.__event_hash__,
)

entity_discarded = VersionedEntity.Discarded(
    originator_version=3,
    originator_id=entity_id,
    __previous_hash__=attribute_b_changed.__event_hash__,
)

The events have a __mutate__() function, which can be used to mutate the state of a given object appropriately.

For example, the DomainEntity.Created event mutates to an entity instance. The class that is instantiated is determined by the originator_topic attribute of the DomainEntity.Created event.

A domain event’s __mutate__() method normally requires an obj argument, but that is not required for DomainEntity.Created events. The default is None, but if a value is provided it must be callable that returns an object, such as a domain entity class. If a domain entity class is provided, the originator_topic will be ignored.

entity = created.__mutate__()

assert entity.id == entity_id

As another example, when a versioned entity is mutated by an event of the VersionedEntity class, the entity version number is set to the event originator_version.

assert entity.__version__ == 0

entity = attribute_a_changed.__mutate__(entity)
assert entity.__version__ == 1
assert entity.a == 1

entity = attribute_b_changed.__mutate__(entity)
assert entity.__version__ == 2
assert entity.b == 2

Similarly, when a timestamped entity is mutated by an event of the TimestampedEntity class, the __last_modified__ attribute of the entity is set to have the event’s timestamp value.

Factory method

The DomainEntity has a class method __create__() which can return new entity objects. When called, it constructs the Created event of the concrete class with suitable arguments such as a unique ID, and a topic representing the concrete entity class, and then it projects that event into an entity object using the event’s __mutate__() method. Then it publishes the event, and then it returns the new entity to the caller. This technique works correctly for subclasses of both the entity and the event class.

entity = DomainEntity.__create__()
assert entity.id
assert entity.__class__ is DomainEntity


entity = VersionedEntity.__create__()
assert entity.id
assert entity.__version__ == 0
assert entity.__class__ is VersionedEntity


entity = TimestampedEntity.__create__()
assert entity.id
assert entity.__created_on__
assert entity.__last_modified__
assert entity.__class__ is TimestampedEntity


entity = TimestampedVersionedEntity.__create__()
assert entity.id
assert entity.__created_on__
assert entity.__last_modified__
assert entity.__version__ == 0
assert entity.__class__ is TimestampedVersionedEntity

Triggering events

Commands methods will construct, apply, and publish events, using the results from working on command arguments. The events need to be constructed with suitable arguments.

To help trigger events in an extensible manner, the DomainEntity class has a method called __trigger_event__(), that is extended by subclasses in the library, which can be used in command methods to construct, apply, and publish events with suitable arguments. The events’ __mutate__() methods update the entity appropriately.

For example, triggering an AttributeChanged event on a timestamped, versioned entity will cause the attribute value to be updated, but it will also cause the version number to increase, and it will update the last modified time.

entity = TimestampedVersionedEntity.__create__()
assert entity.__version__ == 0
assert entity.__created_on__ == entity.__last_modified__

# Trigger domain event.
entity.__trigger_event__(entity.AttributeChanged, name='c', value=3)

# Check the event was applied.
assert entity.c == 3
assert entity.__version__ == 1
assert entity.__last_modified__ > entity.__created_on__

The command method __change_attribute__() triggers an AttributeChanged event. In the code below, the attribute full_name is set to ‘Mr Boots’. A subscriber receives the event.

subscribe(handler=receive_event, predicate=is_domain_event)
assert len(received_events) == 0

entity = VersionedEntity.__create__(entity_id)

# Change an attribute.
entity.__change_attribute__(name='full_name', value='Mr Boots')

# Check the event was applied.
assert entity.full_name == 'Mr Boots'

# Check two events were published.
assert len(received_events) == 2

first_event = received_events[0]
assert first_event.__class__ == VersionedEntity.Created
assert first_event.originator_id == entity_id
assert first_event.originator_version == 0

last_event = received_events[1]
assert last_event.__class__ == VersionedEntity.AttributeChanged
assert last_event.name == 'full_name'
assert last_event.value == 'Mr Boots'
assert last_event.originator_version == 1

# Check the event hash is the current entity head.
assert last_event.__event_hash__ == entity.__head__

# Clean up.
unsubscribe(handler=receive_event, predicate=is_domain_event)
del received_events[:]  # received_events.clear()

Data integrity

Domain events that are triggered in this way are hash-chained together by default.

The state of each event, including the hash of the last event, is hashed using SHA-256. Before an event is applied to an entity, it is validated in itself (the event hash represents the state of the event) and as a part of the chain (the previous event hash is included in the next event state). If the sequence of events is accidentally damaged in any way, then a DataIntegrityError will almost certainly be raised from the domain layer when the sequence is replayed.

The hash of the last event applied to an entity is available as an attribute called __head__.

# Entity's head hash is determined exclusively
# by the entire sequence of events and SHA-256.
assert entity.__head__ == 'ae7688000c38b2bd504b3eb3cd8e015144dd9a3c4992951c87cef9cce047f86c'

# Entity's head hash is simply the event hash
# of the last event that mutated the entity.
assert entity.__head__ == last_event.__event_hash__

A different sequence of events will almost certainly result a different head hash. So the entire history of an entity can be verified by checking the head hash. This feature could be used to protect against tampering.

The hashes can be salted by setting environment variable SALT_FOR_DATA_INTEGRITY, perhaps with random bytes encoded as Base64.

from eventsourcing.utils.random import encode_random_bytes

# Keep this safe.
salt = encode_random_bytes(num_bytes=32)

# Configure environment (before importing library).
import os
os.environ['SALT_FOR_DATA_INTEGRITY'] = salt

Discarding entities

The entity method __discard__() can be used to discard the entity, by triggering a Discarded event, after which the entity is unavailable for further changes.

from eventsourcing.exceptions import EntityIsDiscarded

entity.__discard__()

# Fail to change an attribute after entity was discarded.
try:
    entity.__change_attribute__('full_name', 'Mr Boots')
except EntityIsDiscarded:
    pass
else:
    raise Exception("Shouldn't get here")

Custom entities

The library entity classes can be subclassed.

class User(VersionedEntity):
    def __init__(self, full_name, *args, **kwargs):
        super(User, self).__init__(*args, **kwargs)
        self.full_name = full_name

Subclasses can extend the entity base classes, by adding event-based properties and methods.

Custom attributes

The library’s @attribute decorator provides a property getter and setter, which will triggers an AttributeChanged event when the property is assigned. Simple mutable attributes can be coded as decorated functions without a body, such as the full_name function of User below.

from eventsourcing.domain.model.decorators import attribute


class User(VersionedEntity):

    def __init__(self, full_name, *args, **kwargs):
        super(User, self).__init__(*args, **kwargs)
        self._full_name = full_name

    @attribute
    def full_name(self):
        """Full name of the user."""

In the code below, after the entity has been created, assigning to the full_name attribute causes the entity to be updated. An AttributeChanged event is published. Both the Created and AttributeChanged events are received by a subscriber.

assert len(received_events) == 0
subscribe(handler=receive_event, predicate=is_domain_event)

# Publish a Created event.
user = User.__create__(full_name='Mrs Boots')

# Publish an AttributeChanged event.
user.full_name = 'Mr Boots'

assert len(received_events) == 2
assert received_events[0].__class__ == VersionedEntity.Created
assert received_events[0].full_name == 'Mrs Boots'
assert received_events[0].originator_version == 0
assert received_events[0].originator_id == user.id

assert received_events[1].__class__ == VersionedEntity.AttributeChanged
assert received_events[1].value == 'Mr Boots'
assert received_events[1].name == '_full_name'
assert received_events[1].originator_version == 1
assert received_events[1].originator_id == user.id

# Clean up.
unsubscribe(handler=receive_event, predicate=is_domain_event)
del received_events[:]  # received_events.clear()

Custom commands

The entity base classes can be extended with custom command methods. In general, the arguments of a command will be used to perform some work. Then, the result of the work will be used to trigger a domain event that represents what happened. Please note, command methods normally have no return value.

For example, the set_password() method of the User entity below is given a raw password. It creates an encoded string from the raw password, and then uses the __change_attribute__() method to trigger an AttributeChanged event for the _password attribute with the encoded password.

from eventsourcing.domain.model.decorators import attribute


class User(VersionedEntity):

    def __init__(self, *args, **kwargs):
        super(User, self).__init__(*args, **kwargs)
        self._password = None

    def set_password(self, raw_password):
        # Do some work using the arguments of a command.
        password = self._encode_password(raw_password)

        # Change private _password attribute.
        self.__change_attribute__('_password', password)

    def check_password(self, raw_password):
        password = self._encode_password(raw_password)
        return self._password == password

    def _encode_password(self, password):
        return ''.join(reversed(password))


user = User(id='1', __version__=0)

user.set_password('password')
assert user.check_password('password')

Custom events

Custom events can be defined as inner or nested classes of the custom entity class. In the code below, the entity class World has a custom event called SomethingHappened.

Custom event classes can extend the __mutate__() method, so it affects entities in a way that is specific to that type of event. More conveniently, event classes can implement a mutate() method, which avoids the need to call the super method and return the obj. For example, the SomethingHappened event class has a mutate() method which simply appends the event object to the entity’s history attribute.

Custom events are normally triggered by custom commands. In the example below, the command method make_it_so() triggers the custom event SomethingHappened.

from eventsourcing.domain.model.decorators import mutator

class World(VersionedEntity):

    def __init__(self, *args, **kwargs):
        super(World, self).__init__(*args, **kwargs)
        self.history = []

    def make_it_so(self, something):
        # Do some work using the arguments of a command.
        what_happened = something

        # Trigger event with the results of the work.
        self.__trigger_event__(World.SomethingHappened, what=what_happened)

    class SomethingHappened(VersionedEntity.Event):
        """Published when something happens in the world."""
        def mutate(self, obj):
            obj.history.append(self)

A new world can now be created, using the __create__() method. The command make_it_so() can be used to make things happen in this world. When something happens, the history of the world is augmented with the new event.

world = World.__create__()

world.make_it_so('dinosaurs')
world.make_it_so('trucks')
world.make_it_so('internet')

assert world.history[0].what == 'dinosaurs'
assert world.history[1].what == 'trucks'
assert world.history[2].what == 'internet'

Aggregate root

Eric Evans’ book Domain Driven Design describes an abstraction called “aggregate”:

“An aggregate is a cluster of associated objects that we treat as a unit for the purpose of data changes. Each aggregate has a root and a boundary.”

Therefore,

“Cluster the entities and value objects into aggregates and define boundaries around each. Choose one entity to be the root of each aggregate, and control all access to the objects inside the boundary through the root. Allow external objects to hold references to the root only.”

In this situation, one aggregate command may result in many events. In order to construct a consistency boundary, we need to prevent the situation where other threads pick up only some of the events, but not all of them, which could present the aggregate in an inconsistent, or unusual, and perhaps unworkable state.

In other words, we need to avoid the situation where some of the events have been stored successfully but others have not been. If the events from a command were stored in a series of independent database transactions, then some would be written before others. If another thread needs the aggregate and gets its events whilst a series of new event are being written, it would not receive some of the events, but not the events that have not yet been written. Worse still, events could be lost due to an inconvenient database server problem, or sudden termination of the client. Even worse, later events in the series could fall into conflict because another thread has started appending events to the same sequence, potentially causing an incoherent state that would be difficult to repair.

Therefore, to implement the aggregate as a consistency boundary, all the events from a command on an aggregate must be appended to the event store in a single atomic transaction, so that if some of the events resulting from executing a command cannot be stored then none of them will be stored. If all the events from an aggregate are to be written to a database as a single atomic operation, then they must have been published by the entity as a single list.

The library has a domain entity class called AggregateRoot that can be useful in a domain driven design, especially where a single command can cause many events to be published. The AggregateRoot entity class extends TimestampedVersionedEntity. It overrides the __publish__() method of the base class, so that triggered events are published only to a private list of pending events, rather than directly to the publish-subscribe mechanism. It also adds a method called __save__(), which publishes all pending events to the publish-subscribe mechanism as a single list.

It can be subclassed by custom aggregate root entities. In the example below, the entity class World inherits from AggregateRoot.

from eventsourcing.domain.model.aggregate import AggregateRoot


class World(AggregateRoot):
    """
    Example domain entity, with mutator function on domain event.
    """
    def __init__(self, *args, **kwargs):
        super(World, self).__init__(*args, **kwargs)
        self.history = []

    def make_things_so(self, *somethings):
        for something in somethings:
            self.__trigger_event__(World.SomethingHappened, what=something)

    class SomethingHappened(AggregateRoot.Event):
        def mutate(self, obj):
            obj.history.append(self)

The World aggregate root has a command method make_things_so() which publishes SomethingHappened events. The mutate() method of the SomethingHappened class simply appends the event (self) to the aggregate object obj.

We can see the events that are published by subscribing to the handler receive_events().

assert len(received_events) == 0
subscribe(handler=receive_event)

# Create new world.
world = World.__create__()
assert isinstance(world, World)

# Command that publishes many events.
world.make_things_so('dinosaurs', 'trucks', 'internet')

# State of aggregate object has changed
# but no events have been published yet.
assert len(received_events) == 0
assert world.history[0].what == 'dinosaurs'
assert world.history[1].what == 'trucks'
assert world.history[2].what == 'internet'

Events are pending, and will not be published until the __save__() method is called.

# Has pending events.
assert len(world.__pending_events__) == 4

# Publish pending events.
world.__save__()

# Pending events published as a list.
assert len(received_events) == 1
assert len(received_events[0]) == 4

# No longer any pending events.
assert len(world.__pending_events__) == 0

# Clean up.
unsubscribe(handler=receive_event)
del received_events[:]  # received_events.clear()