Domain model¶
The library’s domain layer has base classes for domain events and entities. These classes show how to write a domain model that uses the library’s event sourcing infrastructure. They can also be used to develop an event-sourced application as a domain driven design.
Domain events¶
The purpose of a domain event is to be published when something happens, normally the results from the
work of a command. The library has a base class for domain events called DomainEvent
.
Domain events can be freely constructed from the DomainEvent
class. Attributes are
set directly from the constructor keyword arguments.
from eventsourcing.domain.model.events import DomainEvent
domain_event = DomainEvent(a=1)
assert domain_event.a == 1
The attributes of domain events are read-only. New values cannot be assigned to existing objects. Domain events are immutable in that sense.
# Fail to set attribute of already-existing domain event.
try:
domain_event.a = 2
except AttributeError:
pass
else:
raise Exception("Shouldn't get here")
Domain events can be compared for equality as value objects, instances are equal if they have the same type and the same attributes.
DomainEvent(a=1) == DomainEvent(a=1)
DomainEvent(a=1) != DomainEvent(a=2)
DomainEvent(a=1) != DomainEvent(b=1)
Publish-subscribe¶
Domain events can be published, using the library’s publish-subscribe mechanism.
The publish()
function is used to publish events. The event
arg is required.
from eventsourcing.domain.model.events import publish
publish(event=domain_event)
The subscribe()
function is used to subscribe a handler
that will receive events.
The optional predicate
arg can be used to provide a function that will decide whether
or not the subscribed handler will actually be called when an event is published.
from eventsourcing.domain.model.events import subscribe
received_events = []
def receive_event(event):
received_events.append(event)
def is_domain_event(event):
return isinstance(event, DomainEvent)
subscribe(handler=receive_event, predicate=is_domain_event)
# Publish the domain event.
publish(domain_event)
assert len(received_events) == 1
assert received_events[0] == domain_event
The unsubscribe()
function can be used to stop the handler receiving further events.
from eventsourcing.domain.model.events import unsubscribe
unsubscribe(handler=receive_event, predicate=is_domain_event)
# Clean up.
del received_events[:] # received_events.clear()
Event library¶
The library has a small collection of domain event subclasses, such as EventWithOriginatorID
,
EventWithOriginatorVersion
, EventWithTimestamp
, EventWithTimeuuid
, Created
, AttributeChanged
,
Discarded
.
Some of these classes provide useful defaults for particular attributes, such as a timestamp
.
Timestamps can be used to sequence events.
from eventsourcing.domain.model.events import EventWithTimestamp
from eventsourcing.domain.model.events import EventWithTimeuuid
from uuid import UUID
# Automatic timestamp.
assert isinstance(EventWithTimestamp().timestamp, float)
# Automatic UUIDv1.
assert isinstance(EventWithTimeuuid().event_id, UUID)
Some classes require particular arguments when constructed. The originator_id
can be used
to identify a sequence to which an event belongs. The originator_version
can be used to
position the event in a sequence.
from eventsourcing.domain.model.events import EventWithOriginatorVersion
from eventsourcing.domain.model.events import EventWithOriginatorID
from uuid import uuid4
# Requires originator_id.
EventWithOriginatorID(originator_id=uuid4())
# Requires originator_version.
EventWithOriginatorVersion(originator_version=0)
Some are just useful for their distinct type, for example in subscription predicates.
from eventsourcing.domain.model.events import Created, Discarded
def is_created(event):
return isinstance(event, Created)
def is_discarded(event):
return isinstance(event, Discarded)
assert is_created(Created()) is True
assert is_created(Discarded()) is False
assert is_created(DomainEvent()) is False
assert is_discarded(Created()) is False
assert is_discarded(Discarded()) is True
assert is_discarded(DomainEvent()) is False
assert is_domain_event(Created()) is True
assert is_domain_event(Discarded()) is True
assert is_domain_event(DomainEvent()) is True
Custom events¶
Custom domain events can be coded by subclassing the library’s domain event classes.
Domain events are normally named using the past participle of a common verb, for example a regular past participle such as “started”, “paused”, “stopped”, or an irregular past participle such as “chosen”, “done”, “found”, “paid”, “quit”, “seen”.
class SomethingHappened(DomainEvent):
"""
Published whenever something happens.
"""
It is possible to code domain events as inner or nested classes.
class Job(object):
class Seen(EventWithTimestamp):
"""
Published when the job is seen.
"""
class Done(EventWithTimestamp):
"""
Published when the job is done.
"""
Inner or nested classes can be used, and are used in the library, to define the domain events of a domain entity on the entity class itself.
seen = Job.Seen(job_id='#1')
done = Job.Done(job_id='#1')
assert done.timestamp > seen.timestamp
Domain entities¶
A domain entity is an object that is not defined by its attributes, but rather by a thread of continuity and its identity. The attributes of a domain entity can change, directly by assignment, or indirectly by calling a method of the object.
The library provides a domain entity class VersionedEntity
, which has an id
attribute, and a version
attribute.
from eventsourcing.domain.model.entity import VersionedEntity
entity_id = uuid4()
entity = VersionedEntity(id=entity_id, version=0)
assert entity.id == entity_id
assert entity.version == 0
Entity library¶
There is a TimestampedEntity
that has id
and created_on
attributes. It also has a last_modified
attribute which is normally updated as events are applied.
from eventsourcing.domain.model.entity import TimestampedEntity
entity_id = uuid4()
entity = TimestampedEntity(id=entity_id, timestamp=123456789)
assert entity.id == entity_id
assert entity.created_on == 123456789
assert entity.last_modified == 123456789
There is also a TimestampedVersionedEntity
that has id
, version
, created_on
, and last_modified
attributes.
from eventsourcing.domain.model.entity import TimestampedVersionedEntity
entity_id = uuid4()
entity = TimestampedVersionedEntity(id=entity_id, version=0, timestamp=123456789)
assert entity.id == entity_id
assert entity.version == 0
assert entity.created_on == 123456789
assert entity.last_modified == 123456789
A timestamped, versioned entity is both a timestamped entity and a versioned entity.
assert isinstance(entity, TimestampedEntity)
assert isinstance(entity, VersionedEntity)
Entity events¶
The library’s domain entities have domain events as inner classes: Event
, Created
, AttributeChanged
, and
Discarded
. These inner event classes are all subclasses of DomainEvent
and can be freely constructed, with
suitable arguments.
created = VersionedEntity.Created(
originator_version=0,
originator_id=entity_id,
)
attribute_a_changed = VersionedEntity.AttributeChanged(
name='a',
value=1,
originator_version=1,
originator_id=entity_id
)
attribute_b_changed = VersionedEntity.AttributeChanged(
name='b',
value=2,
originator_version=2,
originator_id=entity_id
)
entity_discarded = VersionedEntity.Discarded(
originator_version=3,
originator_id=entity_id
)
The class VersionedEntity
has a method _increment_version()
which can be used, for example by a mutator
function, to increment the version number each time an event is applied.
entity._increment_version()
assert entity.version == 1
Mutator functions¶
For an application to be event sourced, the state of the application must be mutated by applying domain events.
The entity mutator function mutate_entity()
can be used to apply a domain event to an entity.
from eventsourcing.domain.model.entity import mutate_entity
entity = mutate_entity(entity, attribute_a_changed)
assert entity.a == 1
When a versioned entity is updated in this way, the version number is normally incremented.
assert entity.version == 2
Apply and publish¶
Events are normally published after they are applied. The method _apply_and_publish()
can be used to both apply and then publish, so the event mutates the entity
and is then received by subscribers.
# Apply and publish a domain event.
entity._apply_and_publish(attribute_b_changed)
# Check the event was applied.
assert entity.b == 2
assert entity.version == 3
For example, the method change_attribute()
constructs an AttributeChanged
event and then calls
_apply_and_publish()
. In the code below, the event is received and checked.
entity = VersionedEntity(id=entity_id, version=0)
assert len(received_events) == 0
subscribe(handler=receive_event, predicate=is_domain_event)
# Apply and publish an AttributeChanged event.
entity.change_attribute(name='full_name', value='Mr Boots')
# Check the event was applied.
assert entity.full_name == 'Mr Boots'
# Check the event was published.
assert received_events[0].__class__ == VersionedEntity.AttributeChanged
assert received_events[0].name == 'full_name'
assert received_events[0].value == 'Mr Boots'
# Clean up.
unsubscribe(handler=receive_event, predicate=is_domain_event)
del received_events[:] # received_events.clear()
Discarding entities¶
The entity method discard()
can be used to discard the entity, by applying and publishing
a Discarded
event, after which the entity is unavailable for further changes.
from eventsourcing.exceptions import EntityIsDiscarded
entity.discard()
# Fail to change an attribute after entity was discarded.
try:
entity.change_attribute('full_name', 'Mr Boots')
except EntityIsDiscarded:
pass
else:
raise Exception("Shouldn't get here")
The mutator function will return None
after mutating an entity with a Discarded
event.
entity = VersionedEntity(id=entity_id, version=3)
entity = mutate_entity(entity, entity_discarded)
assert entity is None
That means a sequence of events that ends with a Discarded
event will result in the same
state as an empty sequence of events, when the sequence is replayed by an event player for example.
Custom entities¶
The library entity classes can be subclassed.
from eventsourcing.domain.model.decorators import attribute
class User(VersionedEntity):
def __init__(self, full_name, *args, **kwargs):
super(User, self).__init__(*args, **kwargs)
self.full_name = full_name
An entity factory method can construct, apply, and publish the first event of an entity’s lifetime. After the event is published, the new entity will be returned by the factory method.
def create_user(full_name):
created_event = User.Created(full_name=full_name, originator_id='1')
assert created_event.originator_id
user_entity = mutate_entity(event=created_event, initial=User)
publish(created_event)
return user_entity
user = create_user(full_name='Mrs Boots')
assert user.full_name == 'Mrs Boots'
Subclasses can extend the entity base classes, by adding event-based properties and methods.
Custom attributes¶
The library’s @attribute
decorator provides a property getter and setter, which will apply and publish an
AttributeChanged
event when the property is assigned. Simple mutable attributes can be coded as
decorated functions without a body, such as the full_name
function of User
below.
from eventsourcing.domain.model.decorators import attribute
class User(VersionedEntity):
def __init__(self, full_name, *args, **kwargs):
super(User, self).__init__(*args, **kwargs)
self._full_name = full_name
@attribute
def full_name(self):
pass
In the code below, after the entity has been created, assigning to the full_name
attribute causes the entity to be
updated, and an AttributeChanged
event to be published. Both the Created
and AttributeChanged
events are
received by a subscriber.
assert len(received_events) == 0
subscribe(handler=receive_event, predicate=is_domain_event)
# Publish a Created event.
user = create_user('Mrs Boots')
assert user.full_name == 'Mrs Boots'
# Publish an AttributeChanged event.
user.full_name = 'Mr Boots'
assert user.full_name == 'Mr Boots'
assert len(received_events) == 2
assert received_events[0].__class__ == VersionedEntity.Created
assert received_events[0].full_name == 'Mrs Boots'
assert received_events[1].__class__ == VersionedEntity.AttributeChanged
assert received_events[1].value == 'Mr Boots'
assert received_events[1].name == '_full_name'
# Clean up.
unsubscribe(handler=receive_event, predicate=is_domain_event)
del received_events[:] # received_events.clear()
Custom commands¶
The entity base classes can also be extended by adding “command” methods that publish events. In general, the arguments of a command will be used to perform some work. Then, the result of the work will be used to construct a domain event that represents what happened. And then, the domain event will be applied and published.
Methods like this, for example the set_password()
method of the User
entity below, normally have no return
value. The method creates an encoded string from a raw password, and then uses the change_attribute()
method to
apply and publish an AttributeChanged
event for the _password
attribute with the encoded password.
from eventsourcing.domain.model.decorators import attribute
class User(VersionedEntity):
def __init__(self, *args, **kwargs):
super(User, self).__init__(*args, **kwargs)
self._password = None
def set_password(self, raw_password):
# Do some work using the arguments of a command.
password = self._encode_password(raw_password)
# Construct, apply, and publish an event.
self.change_attribute('_password', password)
def check_password(self, raw_password):
password = self._encode_password(raw_password)
return self._password == password
def _encode_password(self, password):
return ''.join(reversed(password))
user = User(id='1')
user.set_password('password')
assert user.check_password('password')
A custom entity can also have custom methods that publish custom events. In the example below, a method
make_it_so()
publishes a domain event called SomethingHappened
.
Custom mutator¶
To be applied to an entity, custom event classes must be supported by a custom mutator function. In the code below,
the mutate_world()
mutator function extends the library’s mutate_entity
function to support the event
SomethingHappened
. The _mutate()
function of DomainEntity
has been overridden so that mutate_world()
will be called when events are applied.
from eventsourcing.domain.model.decorators import mutator
class World(VersionedEntity):
def __init__(self, *args, **kwargs):
super(World, self).__init__(*args, **kwargs)
self.history = []
class SomethingHappened(VersionedEntity.Event):
"""Published when something happens in the world."""
def make_it_so(self, something):
# Do some work using the arguments of a command.
what_happened = something
# Construct an event with the results of the work.
event = World.SomethingHappened(
what=what_happened,
originator_id=self.id,
originator_version=self.version
)
# Apply and publish the event.
self._apply_and_publish(event)
@classmethod
def _mutate(cls, initial, event):
return mutate_world(event=event, initial=initial)
@mutator
def mutate_world(initial, event):
return mutate_entity(initial, event)
@mutate_world.register(World.SomethingHappened)
def _(self, event):
self.history.append(event)
self._increment_version()
return self
world = World(id='1')
world.make_it_so('dinosaurs')
world.make_it_so('trucks')
world.make_it_so('internet')
assert world.history[0].what == 'dinosaurs'
assert world.history[1].what == 'trucks'
assert world.history[2].what == 'internet'
Reflexive mutator¶
The WithReflexiveMutator
class tries to call a function called mutate()
on the
event class itself. This means each event class can define how an entity is mutated by it.
A custom base entity class, for example Entity
in the code below, may help to adopt
this style across all entity classes in an application.
from eventsourcing.domain.model.entity import WithReflexiveMutator
class Entity(WithReflexiveMutator, VersionedEntity):
"""
Custom base class for domain entities in this example.
"""
class World(Entity):
"""
Example domain entity, with mutator function on domain event.
"""
def __init__(self, *args, **kwargs):
super(World, self).__init__(*args, **kwargs)
self.history = []
def make_it_so(self, something):
what_happened = something
event = World.SomethingHappened(
what=what_happened,
originator_id=self.id,
originator_version=self.version
)
self._apply_and_publish(event)
class SomethingHappened(VersionedEntity.Event):
# Define mutator function for entity on the event class.
def mutate(self, entity):
entity.history.append(self)
entity._increment_version()
world = World(id='1')
world.make_it_so('dinosaurs')
world.make_it_so('trucks')
world.make_it_so('internet')
assert world.history[0].what == 'dinosaurs'
assert world.history[1].what == 'trucks'
assert world.history[2].what == 'internet'
Aggregate root¶
The library has a domain entity class called AggregateRoot
that can be useful in a domain driven design, where a
command can cause many events to be published. The AggregateRoot
class has a save()
method, which publishes
a list of pending events, and overrides the _publish()
method of the base class to append events to a pending list.
The AggregateRoot
class inherits from both TimestampedVersionedEntity
and
WithReflexiveMutator
, and can be subclassed to define custom aggregate root entities.
from eventsourcing.domain.model.aggregate import AggregateRoot
class World(AggregateRoot):
"""
Example domain entity, with mutator function on domain event.
"""
def __init__(self, *args, **kwargs):
super(World, self).__init__(*args, **kwargs)
self.history = []
def make_things_so(self, *somethings):
for something in somethings:
self._trigger(World.SomethingHappened, what=something)
class SomethingHappened(VersionedEntity.Event):
def mutate(self, entity):
entity.history.append(self)
entity._increment_version()
# World factory.
def create_new_world():
created = World.Created(originator_id=1)
world = World._mutate(event=created)
world._publish(created)
return world
An AggregateRoot
entity will postpone the publishing of all events, pending the next call to its
save()
method.
assert len(received_events) == 0
subscribe(handler=receive_event)
# Create new entity.
world = create_new_world()
assert isinstance(world, World)
# Command that publishes many events.
world.make_things_so('dinosaurs', 'trucks', 'internet')
assert world.history[0].what == 'dinosaurs'
assert world.history[1].what == 'trucks'
assert world.history[2].what == 'internet'
When the save()
method is called, all such pending events are published as a
single list of events to the publish-subscribe mechanism.
# Events are pending actual publishing until the save() method is called.
assert len(received_events) == 0
world.save()
# Pending events were published as a single list of events.
assert len(received_events) == 1
assert len(received_events[0]) == 4
Publishing all events from a single command in a single list allows all the events to be written to a database as a single atomic operation.
That avoids the risk that some events will be stored successfully but other events from the same command will fall into conflict and be lost, because another thread has operated on the same aggregate at the same time, causing an inconsistent state that would also be difficult to repair.
It also avoids the risk of other threads picking up only some events caused by a command, presenting the aggregate in an inconsistent or unusual and perhaps unworkable state.
unsubscribe(handler=receive_event)
del received_events[:] # received_events.clear()