Event Sourcing in Python¶
A library for event sourcing in Python. This project is hosted on GitHub.
Contents¶
Introduction¶
What is event sourcing?¶
One definition of event sourcing suggests the state of an event-sourced application is determined by a sequence of events. Another definition has event sourcing as a persistence mechanism for domain-driven design.
Whilst the basic event sourcing patterns are quite simple and can be reproduced in code for each project, event sourcing as a persistence mechanism for domain-driven design appears as a “conceptually cohesive mechanism” and so can be partitioned into a “separate lightweight framework”.
Quoting from Eric Evans’ book Domain-Driven Design:
“Partition a conceptually COHESIVE MECHANISM into a separate lightweight framework. Particularly watch for formalisms for well-documented categories of algorithms. Expose the capabilities of the framework with an INTENTION-REVEALING INTERFACE. Now the other elements of the domain can focus on expressing the problem (‘what’), delegating the intricacies of the solution (‘how’) to the framework.”
This library¶
This is a library for event sourcing in Python. At its core, this library supports storing and retrieving sequences of events, such as the domain events of event-sourced aggregates in a domain-driven design, and snapshots of those aggregates. A variety of schemas and technologies can be used for storing events, and this library supports several of these possibilities.
To demonstrate how storing and retrieving domain events can be used effectively as a persistence mechanism in an event-sourced application, this library includes base classes and examples of event-sourced aggregates and event-sourced applications.
It is possible using this library to define an entire event-driven system of event-sourced applications independently of infrastructure and mode of running. That means system behaviours can be rapidly developed whilst running the entire system synchronously in a single thread with a single in-memory database. And then the system can be run asynchronously on a cluster with durable databases, with the system effecting exactly the same behaviour.
Synopsis¶
Use the library’s Aggregate
base class to define event-sourced aggregates.
Use the @event
decorator on command methods to define aggregate events.
from eventsourcing.domain import Aggregate, event
class World(Aggregate):
@event('Created')
def __init__(self, name):
self.name = name
self.history = []
@event('SomethingHappened')
def make_it_so(self, what):
self.history.append(what)
Use the library’s Application
class to define an event-sourced application.
Add command and query methods that use event-sourced aggregates.
from eventsourcing.application import Application
class Universe(Application):
def create_world(self, name):
world = World(name)
self.save(world)
return world.id
def make_it_so(self, world_id, what):
world = self.repository.get(world_id)
world.make_it_so(what)
self.save(world)
def get_history(self, world_id):
world = self.repository.get(world_id)
return world.history
Construct an application object by calling the application class.
application = Universe()
Evolve the state of the application by calling the application command methods.
world_id = application.create_world('Earth')
application.make_it_so(world_id, 'dinosaurs')
application.make_it_so(world_id, 'trucks')
application.make_it_so(world_id, 'internet')
Access the state of the application by calling the application query methods.
history = application.get_history(world_id)
assert history == ['dinosaurs', 'trucks', 'internet']
Configure an application by setting environment variables.
application = Universe(
env={
'FACTORY_TOPIC': 'eventsourcing.sqlite:Factory',
'SQLITE_DBNAME': ':memory:',
}
)
Please follow the Tutorial for more information.
Features¶
Flexible event store — flexible persistence of domain events. Combines an event mapper and an event recorder in ways that can be easily extended. Mapper uses a transcoder that can be easily extended to support custom model object types. Recorders supporting different databases can be easily substituted and configured with environment variables.
Domain models and applications — base classes for domain model aggregates and applications. Suggests how to structure an event-sourced application.
Application-level encryption and compression — encrypts and decrypts events inside the application. This means data will be encrypted in transit across a network (“on the wire”) and at disk level including backups (“at rest”), which is a legal requirement in some jurisdictions when dealing with personally identifiable information (PII) for example the EU’s GDPR. Compression reduces the size of stored domain events and snapshots, usually by around 25% to 50% of the original size. Compression reduces the size of data in the database and decreases transit time across a network.
Snapshotting — reduces access-time for aggregates with many domain events.
Versioning - allows domain model changes to be introduced after an application has been deployed. Both domain events and aggregate classes can be versioned. The recorded state of an older version can be upcast to be compatible with a new version. Stored events and snapshots are upcast from older versions to new versions before the event or aggregate object is reconstructed.
Optimistic concurrency control — ensures a distributed or horizontally scaled application doesn’t become inconsistent due to concurrent method execution. Leverages optimistic concurrency controls in adapted database management systems.
Notifications and projections — reliable propagation of application events with pull-based notifications allows the application state to be projected accurately into replicas, indexes, view models, and other applications. Supports materialized views and CQRS.
Event-driven systems — reliable event processing. Event-driven systems can be defined independently of particular persistence infrastructure and mode of running.
Detailed documentation — documentation provides general overview, introduction of concepts, explanation of usage, and detailed descriptions of library classes. All code is annotated with type hints.
Worked examples — includes examples showing how to develop aggregates, applications and systems.
Design overview¶
The design of the library follows the notion of a “layered architecture” in that there are distinct and separate layers for interfaces, application, domain, and infrastructure. It also follows the “onion” or “hexagonal” or “clean” architecture, in that the domain layer has no dependencies on any other layer. The application layer depends on the domain and infrastructure layers, and the interface layer depends only on the application layer.
Register issues¶
This project is hosted on GitHub. Please register any issues, questions, and requests you may have.
Installation guide¶
This version of the library is compatible with Python versions 3.7, 3.8, 3.9, and 3.10. The library’s suite of tests is run against these versions and has 100% line and branch coverage.
You can use pip to install the library from the Python Package Index. It is recommended always to install into a virtual environment.
$ pip install eventsourcing
When including the library in a list of project dependencies, in order to avoid installing future incompatible releases, it is recommended to specify the major and minor version numbers.
As an example, the expression below would install the latest version of the v9.1.x release, allowing future bug fixes released with point version number increments.
eventsourcing<=9.1.99999
Specifying the major and minor version number in this way will avoid any potentially destabilising additional features introduced with minor version number increments, and also any backwards incompatible changes introduced with major version number increments.
This package depends only on modules from the Python Standard Library, except for the extra options described below.
Install options¶
Running the install command with different options will install the extra dependencies associated with that option. If you installed without any options, you can easily install optional dependencies later by running the install command again with the options you want.
For example, if you want to store cryptographically encrypted events,
then install with the crypto
option. This simply installs
PyCryptodome
so feel free to make your project depend on that instead.
$ pip install "eventsourcing[crypto]"
If you want to store events with PostgreSQL, then install with
the postgres
option. This simply installs
Psycopg2 so feel
free to make your project depend on that instead. Please note,
the binary version psycopg2-binary
is a convenient alternative for development and testing, but the main
package is recommended by the Psycopg2 developers for production usage.
$ pip install "eventsourcing[postgres]"
Options can be combined, so that if you want to store encrypted events in PostgreSQL,
then install with the crypto
and postgres
options.
$ pip install "eventsourcing[crypto,postgres]"
Developers¶
If you want to install the code for the purpose of developing the library, then fork and clone the GitHub repository and install from the root folder with the ‘dev’ option. This option will install a number of packages that help with development and documentation, such as the above extra dependencies along with Sphinx, Coverage.py, Black, mypy, Flake8, and isort.
$ pip install ".[dev]"
Alternatively, the project’s Makefile can be used to the same effect with the following command.
$ make install
Once installed, you can check the unit tests pass and the code is 100% covered by the tests with the following command.
$ make test
Before the tests will pass, you will need setup PostgreSQL. The following commands will install PostgreSQL on MacOS and setup the database and database user. If you already have PostgreSQL installed, just create the database and user. If you prefer to run PostgreSQL in a Docker container, feel free to do that too.
$ brew install postgresql
$ brew services start postgresql
$ psql postgres
postgres=# CREATE DATABASE eventsourcing;
postgres=# CREATE USER eventsourcing WITH PASSWORD 'eventsourcing';
You can also check the syntax and static types are correct with the following command (which uses isort, Black, Flake8, and mypy).
$ make lint
The code can be automatically reformatted using the following command (which uses isort and Black). Flake8 and mypy errors will often need to be fixed by hand.
$ make fmt
You can build the docs, and make sure they build, with the following command (which uses Sphinx).
$ make docs
If you wish to submit changes to the library, before submitting a pull request please check all three things (lint, docs, and test) which you can do conveniently with the following command.
$ make prepush
If you wish to submit a pull request on GitHub, please target the main branch. Improvements of any size are always welcome.
Support options¶
Thank you for your interest in this library. It has taken a lot of time to create this library. Similarly, it may take some time to understand the library, and to develop well-designed event-sourced applications.
To supplement the detailed documentation, professional training workshops and development services are available. Friendly community support is also available on the Slack channel.
If you have any issues using the library or reading the documentation, please raise an issue on GitHub, feel free to start a discussion in the Slack channel, or create a pull request.
Please consider supporting the continuing development and maintenance of this library by starring the project on GitHub by sponsoring the project, or by making a donation.
Community support¶
The library has a growing community that may be able to help.
You can ask questions on the Slack channel.
You can also register issues and requests on our issue tracker.
Training workshops¶
Training workshops are available to help developers more quickly learn how to use the library. Workshop participants will be guided through a series of topics, gradually discovering what the library is capable of doing, and learning how to use the library effectively.
Please contact John Bywater via the Slack channel for more information about training workshops.
Professional support¶
Design and development services are available to help developers and managers with the development and management of their event-sourced applications and systems.
Development of working applications and systems for production use.
Development of sample applications and systems for guidance or demonstration purposes.
Overall assessment of your existing implementation, with recommendations for improvement.
Address specific concerns with how your event-sourced application or system is built and run.
Coaching developers in the use of the library.
Please contact John Bywater via the Slack channel for more information about professional support.
Tutorial¶
A tutorial for event sourcing in Python.
Tutorial - Part 1 - Getting Started¶
Python classes¶
This tutorial depends on a basic understanding of Python classes.
For example, using Python we can define a World
class as follows.
class World:
def __init__(self, name):
self.name = name
self.history = []
def make_it_so(self, what):
self.history.append(what)
Having defined a Python class, we can use it to create an instance.
world = World('Earth')
As we might expect, the world object is an instance of the World
class.
assert isinstance(world, World)
We can see from the the __init__()
method
that attributes name
and history
will be initialised.
assert world.name == 'Earth'
assert world.history == []
A World
instance has a method make_it_so()
which
appends the value of what
to history
.
world.make_it_so('Python')
assert world.history == ['Python']
This is a basic example of how Python classes work. However, if we want to use this object in future, we will want to save and reconstruct it. We will want it to be ‘persistent’.
Event-sourced aggregate¶
A persistent object that changes through a sequence of decisions
corresponds to the notion of an ‘aggregate’ in Domain-Driven Design.
An ‘event-sourced’ aggregate is persisted by persisting the decisions
as a sequence of ‘events’.
We can use the aggregate base class Aggregate
and the @event
decorator from the domain module to define
event-sourced aggregates.
from eventsourcing.domain import Aggregate, event
Let’s convert World
into an event-sourced aggregate. The changes are highlighted below.
class World(Aggregate):
@event('Created')
def __init__(self, name):
self.name = name
self.history = []
@event('SomethingHappened')
def make_it_so(self, what):
self.history.append(what)
As before, we can call the class to create a new instance.
world = World('Earth')
The object is an instance of World
. It is also an Aggregate
.
assert isinstance(world, World)
assert isinstance(world, Aggregate)
As we might expect, the attributes name
and history
have been initialised.
assert world.name == 'Earth'
assert world.history == []
The aggregate also has an id
attribute. The ID is used to uniquely identify the
aggregate within a collection of aggregates. It happens to be a UUID.
from uuid import UUID
assert isinstance(world.id, UUID)
We can call the aggregate method make_it_so()
. The given value is appended to history
.
world.make_it_so('Python')
assert world.history == ['Python']
By redefining the World
class as an event-sourced aggregate in this way,
when we call the class object and the decorated methods, we construct a sequence
of event objects that can be used to reconstruct the aggregate. We can get
the events from the aggregate by calling collect_events()
.
events = world.collect_events()
We can also reconstruct the aggregate by calling mutate()
on the collected event objects.
copy = None
for e in events:
copy = e.mutate(copy)
assert copy == world
Interactions with aggregates usually occur in an application, where collected events can be persisted and used to reconstruct aggregates.
Event-sourced application¶
An event-sourced application comprises many event-sourced aggregates,
and a persistence mechanism to store and retrieve aggregate events.
We can use the library’s Application
base class to define
event-sourced applications.
from eventsourcing.application import Application
Let’s define a Universe
application that interacts with World
aggregates.
We can add command methods to create and change aggregates,
and query methods to view current state.
We can save aggregates with the application save()
method, and
get previously saved aggregates with the repository get()
method.
class Universe(Application):
def create_world(self, name):
world = World(name)
self.save(world)
return world.id
def make_it_so(self, world_id, what):
world = self.repository.get(world_id)
world.make_it_so(what)
self.save(world)
def get_history(self, world_id):
world = self.repository.get(world_id)
return world.history
We can construct an instance of the application by calling the application class.
application = Universe()
We can then create and update aggregates by calling methods of the application.
world_id = application.create_world('Earth')
application.make_it_so(world_id, 'dinosaurs')
application.make_it_so(world_id, 'trucks')
application.make_it_so(world_id, 'internet')
We can also view the current state of the application by calling the application query method.
history = application.get_history(world_id)
assert history == ['dinosaurs', 'trucks', 'internet']
Any number of different kinds of event-sourced applications can be defined in this way.
Project structure¶
You are free to structure your project files however you wish. You
may wish to put your aggregate classes in a file named
domainmodel.py
and your application class in a file named
application.py
.
myproject/
myproject/application.py
myproject/domainmodel.py
myproject/tests.py
Writing tests¶
You can get started with your event sourcing project by first writing a failing test
in tests.py
, then define your application and aggregate classes in the test module.
You can then refactor by moving aggregate and application classes to separate Python modules.
You can also convert these modules to packages if you want to break things up into smaller
modules.
def test():
# Construct application object.
app = Universe()
# Call application command methods.
world_id = app.create_world('Earth')
app.make_it_so(world_id, 'dinosaurs')
app.make_it_so(world_id, 'trucks')
# Call application query method.
assert app.get_history(world_id) == [
'dinosaurs',
'trucks'
]
Exercise¶
Try it for yourself by copying the code snippets above and running the test.
test()
Next steps¶
For more information about event-sourced aggregates, please read through Part 2 of this tutorial.
Tutorial - Part 2 - Aggregates¶
As we saw in Part 1, we can
use the aggregate base class Aggregate
, combined with the
@event
decorator, to define event-sourced aggregates in Python.
from eventsourcing.domain import Aggregate, event
Let’s look at how event-sourced aggregates work in more detail.
Aggregates in more detail¶
Let’s define the simplest possible event-sourced aggregate, by
simply subclassing Aggregate
.
class World(Aggregate):
pass
In the usual way with Python classes, we can create a new class instance by calling the class object.
world = World()
assert isinstance(world, World)
Normally when a class instance is constructed by calling the class object, Python directly
instantiates and initialises the class instance. However, when a subclass of Aggregate
is called, the class instance is constructed in a slightly indirect way.
Firstly, an event object is constructed. This event object represents the fact the aggregate
was “created”. Then, this event object is used to construct and initialise the aggregate
object. The point being, that same event object can be used again to reconstruct the aggregate
object in future.
To reconstruct the aggregate object from the event object, we firstly need to get hold
of the new event object. Fortunately, the new event object is not lost. It is held by
the aggregate in an internal list. We can collect the event object from our aggregate by
calling the aggregate’s collect_events()
method. This method is kindly provided by the
aggregate base class.
events = world.collect_events()
assert len(events) == 1
The “created” event object can be used to reconstruct the aggregate
object. To reconstruct the aggregate object, we can simply call the
event object’s mutate()
method.
copy = events[0].mutate(None)
assert copy == world
Using events to determine the state of an aggregate is the essence of
event sourcing. Calling the event’s mutate()
method is exactly how
the aggregate object was constructed when the aggregate class was called.
Next, let’s talk about aggregate events in more detail.
“Created” events¶
When the aggregate class code was interpreted by Python, a “created” event class was automatically defined on the aggregate class object. The name of the “created” event class was given the default name “Created”.
The general occurrence of creating aggregate objects requires a general name. The term “created” is used here for this purpose. Naturally, we will need to think of suitable names for the particular aggregate events we will define in our domain models, but sadly the library can’t us help with that.
assert isinstance(World.Created, type)
The event we collected from the aggregate is an instance of World.Created
.
assert isinstance(events[0], World.Created)
We can specify an aggregate event class by decorating an aggregate method
with the @event
decorator. The event specified by the decorator will
be triggered when the decorated method is called. This happens by default
for the __init__()
method. But we can also decorate an __init__()
method to specify the name of the “created” event.
Let’s redefine the event-sourced aggregate above, using the
@event
decorator on an __init__()
method so that we can specify the
name of the “created” event.
Let’s also define the __init__()
method so that it accepts a name
argument and initialises a name
attribute with the given value of the argument.
The changes are highlighted below.
class World(Aggregate):
@event('Started')
def __init__(self, name):
self.name = name
By specifying the name of the “created” event to be 'Started'
, an event
class with this name is defined on the aggregate class.
assert isinstance(World.Started, type)
We can call such events “created” events. They are the initial
event in the aggregate’s sequence of aggregate events. The inherit the base
class “created” event, which has a method mutate()
that knows how to
construct and initialise aggregate objects.
assert issubclass(World.Started, Aggregate.Created)
Again, as above, we can create a new aggregate instance by calling
the aggregate class. But this time, we need to provide a value for
the name
argument.
world = World('Earth')
As we might expect, the given name
is used to initialise the name
attribute of the aggregate.
assert world.name == 'Earth'
We can call collect_events()
to get the “created” event from
the aggregate object. We can see the event object is an instance of
the class World.Started
.
events = world.collect_events()
assert len(events) == 1
assert isinstance(events[0], World.Started)
The attributes of an event class specified by using the @event
decorator
are derived from the signature of the decorated method. Hence, the event
object has a name
attribute, which follows from the signature of the
aggregate’s __init__()
method.
assert events[0].name == 'Earth'
The “created” event object can be used to reconstruct the initial state of the aggregate.
assert events[0].mutate(None) == world
Subsequent events¶
We can take this further by defining a second method that will be used to change the aggregate object after it has been created.
Let’s firstly adjust the __init__()
to initialise a history
attribute with an empty list. Then let’s also define a make_it_so()
method that appends to this list, and decorate this method with
the @event
decorator. The changes are highlighted below.
from eventsourcing.domain import Aggregate, event
class World(Aggregate):
@event('Started')
def __init__(self, name):
self.name = name
self.history = []
@event('SomethingHappened')
def make_it_so(self, what):
self.history.append(what)
By decorating the make_it_so()
method with the @event
decorator,
an event class SomethingHappened
was automatically defined on the
aggregate class.
assert isinstance(World.SomethingHappened, type)
The event will be triggered when the method is called. The body of the method will be used by the event to mutate the state of the aggregate object.
Let’s create an aggregate instance.
world = World('Earth')
As we might expect, the name
of the aggregate object is 'Earth
,
and the history
attribute is an empty list.
assert world.name == 'Earth'
assert world.history == []
Now let’s call make_it_so()
with the value 'Python'
as the argument.
world.make_it_so('Python')
The history
list now has one item, 'Python'
,
the value we passed when calling make_it_so()
.
assert world.history == ['Python']
Creating and updating the aggregate caused two events to occur,
a “started” event and a “something happened” event. We can collect
these two events by calling collect_events()
.
events = world.collect_events()
assert len(events) == 2
Just like the “started” event has a name
attribute, so the
“something happened” event has a what
attribute.
assert isinstance(events[0], World.Started)
assert events[0].name == 'Earth'
assert isinstance(events[1], World.SomethingHappened)
assert events[1].what == 'Python'
The attributes of the event objects follow from the signatures of the
decorated methods. The __init__()
method has a name
argument
and so the “started” event has a name
attribute. The make_it_so()
method has a what
attribute, and so the “something happened” event
has a what
attribute. The arguments of a method decorated with @event
are used to define the attributes of an event class. When the method is called,
the values of the method arguments are used to construct an event object. The
method body is then executed with the attributes of the event. The resulting
state of the aggregate is the same as if the method were not decorated. The
difference is that a sequence of events is generated. The point being, this
sequence of events can be used in future to reconstruct the current state
of the aggregate.
copy = None
for e in events:
copy = e.mutate(copy)
assert copy == world
Calling the aggregate’s collect_events()
method is what happens when
an application’s save()
method is called. Calling the mutate()
methods of saved events’ is how an application repository reconstructs
aggregates from saved events when its get()
is called.
You can try all of this for yourself by copying the code snippets above.
Exercise¶
Define a Dog
aggregate, that has a given name
and a list of tricks
.
Define a method add_trick()
that adds a new trick. Specify the name of
the “created” event to be 'Named'
and the name of the subsequent event
to be 'TrickAdded'
. Copy the test below and make it pass.
def test():
# Give a dog a name, and some tricks.
fido = Dog(name='Fido')
fido.add_trick('fetch ball')
fido.add_trick('roll over')
fido.add_trick('play dead')
# Check the state of the aggregate.
assert fido.name == 'Fido'
assert fido.tricks == [
'fetch ball',
'roll over',
'play dead',
]
# Check the aggregate events.
events = fido.collect_events()
assert len(events) == 4
assert isinstance(events[0], Dog.Named)
assert events[0].name == 'Fido'
assert isinstance(events[1], Dog.TrickAdded)
assert events[1].trick == 'fetch ball'
assert isinstance(events[2], Dog.TrickAdded)
assert events[2].trick == 'roll over'
assert isinstance(events[3], Dog.TrickAdded)
assert events[3].trick == 'play dead'
# Reconstruct aggregate from events.
copy = None
for e in events:
copy = e.mutate(copy)
assert copy == fido
# Create and test another aggregate.
buddy = Dog(name='Buddy')
assert fido != buddy
events = buddy.collect_events()
assert len(events) == 1
assert isinstance(events[0], Dog.Named)
assert events[0].name == 'Buddy'
assert events[0].mutate(None) == buddy
Next steps¶
For more information about event-sourced applications, please read through Part 3 of this tutorial.
Tutorial - Part 3 - Applications¶
As we saw in Part 1, we can
use the library’s Application
class to define event-sourced
applications.
For example, the Universe
application class, defined below, has a
command method create_world()
that creates and saves a new aggregate.
It has a command method make_it_so()
that retrieves a previously saved
aggregate, calls make_it_so()
on the aggregate, and then saves the
modified aggregate. And it has a query method get_history()
that
retrieves and returns the history
of an aggregate object. The
World
aggregate is used by the application.
from eventsourcing.application import Application
from eventsourcing.domain import Aggregate, event
class Universe(Application):
def create_world(self, name):
world = World(name)
self.save(world)
return world.id
def make_it_so(self, world_id, what):
world = self.repository.get(world_id)
world.make_it_so(what)
self.save(world)
def get_history(self, world_id):
world = self.repository.get(world_id)
return world.history
class World(Aggregate):
@event('Started')
def __init__(self, name):
self.name = name
self.history = []
@event('SomethingHappened')
def make_it_so(self, what):
self.history.append(what)
We can construct an application object and call its methods.
application = Universe()
world_id = application.create_world('Earth')
application.make_it_so(world_id, 'dinosaurs')
application.make_it_so(world_id, 'trucks')
application.make_it_so(world_id, 'internet')
history = application.get_history(world_id)
assert history == ['dinosaurs', 'trucks', 'internet']
Let’s explore how this works in more detail.
Applications in more detail¶
An event-sourced application comprises many event-sourced aggregates, and a persistence mechanism to store and retrieve aggregate events. Constructing an application object constructs a persistence mechanism the application will use to store and retrieve events. The construction of the persistence mechanism can be easily configured, with alternatives constructed instead of the standard defaults.
application = Universe()
assert application.repository
assert application.repository.event_store
assert application.repository.event_store.mapper
assert application.repository.event_store.mapper.transcoder
assert application.repository.event_store.mapper.compressor is None
assert application.repository.event_store.mapper.cipher is None
assert application.repository.event_store.recorder
assert application.log
assert application.log.recorder
To be specific, an application object has a repository object. The repository object has an event store. The event store object has a mapper. The mapper object has a transcoder, an optional compressor, and an optional cipher. The application also has a notification log. The notification log object has a recorder.
The event store converts aggregate events to a common type of object called “stored events”, using the mapper, and then records the stored event objects in the database using the recorder. The mapper uses the transcoder to serialize aggregate events, and optionally to compress and encrypt the serialised state. The recorder adapts a particular database, supporting the recording of stored events in that database.
The repository reconstructs aggregate objects from aggregate event objects that it retrieves from the event store. The event store gets stored events from the recorder, and uses the mapper to reconstruct aggregate event objects. The mapper uses the transcoder to optionally decrypt and decompress the serialised state, and to deserialize stored events to aggregate events.
An application’s recorder also puts the stored events in a total order, and allows this order to be selected from. The notification log selects events from this order as the event notifications of the application.
In addition to these attributes, an application object has a method save()
which is responsible for collecting new aggregate events and putting them in
the event store.
The application save()
method saves aggregates by
collecting and storing pending aggregate events. The save()
method calls the given aggregates’ collect_events()
method and
puts the pending aggregate events in the event store, with a
guarantee that either all of the events will be stored or none of
them will be.
The repository has a get()
method which is responsible
for reconstructing aggregates that have been previously saved.
The get()
method is called with an aggregate ID. It retrieves
stored events for an aggregate from an event store, selecting them
using the given ID. It then reconstructs the aggregate object from its
previously stored events calling the mutate()
method of aggregate
event objects, and returns the reconstructed aggregate object to
the caller.
In addition to these attributes and these methods, a subclass of
Application
will usually define command and query methods, which
make use of the application’s save()
method and the repository’s
get()
method.
For example, the Universe
class has a create_world()
method
and a make_it_so()
method, which can be considered a command methods.
It also has a get_history()
method, which can be considered a query
method.
Command methods¶
Let’s consider the create_world()
and make_it_so()
methods
of the Universe
application.
Firstly, let’s create a new aggregate by calling the application method create_world()
.
world_id = application.create_world('Earth')
When the application command method create_world()
is called, a new World
aggregate object is created, by calling
the aggregate class. The new aggregate object is saved by calling
the application’s save()
method, and then the ID of the aggregate
is returned to the caller.
We can then evolve the state of the aggregate by calling the
application command method make_it_so()
.
application.make_it_so(world_id, 'dinosaurs')
application.make_it_so(world_id, 'trucks')
application.make_it_so(world_id, 'internet')
When the application command method make_it_so()
is called with
the ID of an aggregate, the get()
method of the repository
is
used to get the aggregate, the aggregate’s make_it_so()
method is
called with the given value of what
, and the aggregate is then
saved by calling the application’s save()
method.
Query methods¶
We can access the state of the application’s aggregate by calling the
application query method get_history()
.
history = application.get_history(world_id)
assert history == ['dinosaurs', 'trucks', 'internet']
When the application query method get_history()
is called with
the ID of an aggregate, the get()
method of the repository
is used to reconstruct the aggregate from saved events, and the value
of the aggregate’s history
attribute is returned to the caller.
Event notifications¶
The Application
class has a log
attribute,
which is a ‘notification log’ (aka the ‘outbox pattern’).
This pattern avoids the “dual writing” problem of recording
application state and separately sending messages about
the changes. Please note, it is equally important to avoid
“dual writing” in the consumption of event notifications.
The notification log can be used to propagate the state of the application in a manner that supports deterministic processing of the application state in event-driven systems. It presents all the aggregate events that have been stored across all the aggregates of an application as a sequence of event notifications.
The log presents the aggregate events in the order in which
they were stored. Each of the event notifications has an integer
ID which increases along the sequence. An event notification is
simply a stored event (see above) that also has an id
attribute.
Therefore, depending on the configuration of the application, it
may be already compressed and encrypted.
The select()
method of the notification log can be used
to obtain a selection of the application’s event notifications.
The argument start
can be used to progressively read all
of a potentially very large number of event notifications.
The limit
argument can be used to restrict the number
of event notifications that will be returned when the method
is called.
notifications = application.log.select(start=1, limit=4)
assert [n.id for n in notifications] == [1, 2, 3, 4]
assert 'World.Started' in notifications[0].topic
assert b'Earth' in notifications[0].state
assert world_id == notifications[0].originator_id
assert 'World.SomethingHappened' in notifications[1].topic
assert b'dinosaurs' in notifications[1].state
assert world_id == notifications[1].originator_id
assert 'World.SomethingHappened' in notifications[2].topic
assert b'trucks' in notifications[2].state
assert world_id == notifications[2].originator_id
assert 'World.SomethingHappened' in notifications[3].topic
assert b'internet' in notifications[3].state
assert world_id == notifications[3].originator_id
Application configuration¶
An application object can be configured to use one of many different ways of storing and retrieving events.
The application object can be configured using environment variables to work with different databases, and optionally to encrypt and compress stored events. By default, the application serialises aggregate events using JSON, and stores them in memory as “plain old Python objects”. The library also supports storing events in SQLite and PostgreSQL databases. Other databases are available. See the library’s extension projects for more information about what is currently supported.
The test()
function below demonstrates the example Universe
application in more detail, by creating many aggregates in one
application, by reading event notifications from the application log,
by retrieving historical versions of an aggregate, and so on. The
optimistic concurrency control, and the compression and encryption
features are also demonstrated. The steps are commented for greater
readability. Below, the test()
function is used several times
with different configurations of persistence for our application
object: with “plain old Python objects”, with SQLite, and then
with PostgreSQL.
from eventsourcing.persistence import IntegrityError
from eventsourcing.system import NotificationLogReader
def test(app: Universe, expect_visible_in_db: bool):
# Check app has zero event notifications.
assert len(app.log['1,10'].items) == 0
# Create a new aggregate.
world_id = app.create_world('Earth')
# Execute application commands.
app.make_it_so(world_id, 'dinosaurs')
app.make_it_so(world_id, 'trucks')
# Check recorded state of the aggregate.
assert app.get_history(world_id) == [
'dinosaurs',
'trucks'
]
# Execute another command.
app.make_it_so(world_id, 'internet')
# Check recorded state of the aggregate.
assert app.get_history(world_id) == [
'dinosaurs',
'trucks',
'internet'
]
# Check values are (or aren't visible) in the database.
values = [b'dinosaurs', b'trucks', b'internet']
if expect_visible_in_db:
expected_num_visible = len(values)
else:
expected_num_visible = 0
actual_num_visible = 0
reader = NotificationLogReader(app.log)
for notification in reader.read(start=1):
for what in values:
if what in notification.state:
actual_num_visible += 1
break
assert expected_num_visible == actual_num_visible
# Get historical state (at version 3, before 'internet' happened).
old = app.repository.get(world_id, version=3)
assert len(old.history) == 2
assert old.history[-1] == 'trucks' # last thing to have happened was 'trucks'
# Check app has four event notifications.
assert len(app.log['1,10'].items) == 4
# Optimistic concurrency control (no branches).
old.make_it_so('future')
try:
app.save(old)
except IntegrityError:
pass
else:
raise Exception("Shouldn't get here")
# Check app still has only four event notifications.
assert len(app.log['1,10'].items) == 4
# Read event notifications.
reader = NotificationLogReader(app.log)
notifications = list(reader.read(start=1))
assert len(notifications) == 4
# Create eight more aggregate events.
world_id = app.create_world('Mars')
app.make_it_so(world_id, 'plants')
app.make_it_so(world_id, 'fish')
app.make_it_so(world_id, 'mammals')
world_id = app.create_world('Venus')
app.make_it_so(world_id, 'morning')
app.make_it_so(world_id, 'afternoon')
app.make_it_so(world_id, 'evening')
# Get the new event notifications from the reader.
last_id = notifications[-1].id
notifications = list(reader.read(start=last_id + 1))
assert len(notifications) == 8
# Get all the event notifications from the application log.
notifications = list(reader.read(start=1))
assert len(notifications) == 12
Development environment¶
We can run the test in a “development” environment using the application’s default “plain old Python objects” infrastructure which keeps stored events in memory. The example below runs without compression or encryption of the stored events. This is how the application objects have been working in this tutorial so far.
# Construct an application object.
app = Universe()
# Run the test.
test(app, expect_visible_in_db=True)
SQLite environment¶
We can also configure an application to use SQLite for storing events.
To use the library’s SQLite infrastructure,
set INFRASTRUCTURE_FACTORY
to the value 'eventsourcing.sqlite:Factory'
.
When using the library’s SQLite infrastructure, the environment variable
SQLITE_DBNAME
must also be set. This value will be passed to Python’s
sqlite3.connect()
.
import os
# Use SQLite infrastructure.
os.environ['INFRASTRUCTURE_FACTORY'] = 'eventsourcing.sqlite:Factory'
# Configure SQLite database URI. Either use a file-based DB;
os.environ['SQLITE_DBNAME'] = '/path/to/your/sqlite-db'
# or use an in-memory DB with cache not shared, only works with single thread;
os.environ['SQLITE_DBNAME'] = ':memory:'
# or use an in-memory DB with shared cache, works with multiple threads;
os.environ['SQLITE_DBNAME'] = ':memory:?mode=memory&cache=shared'
# or use a named in-memory DB, allows distinct databases in same process.
os.environ['SQLITE_DBNAME'] = 'file:application1?mode=memory&cache=shared'
# Set optional lock timeout (default 5s).
os.environ['SQLITE_LOCK_TIMEOUT'] = '10' # seconds
Having configured the application with these environment variables, we can construct the application and run the test using SQLite.
# Construct an application object.
app = Universe()
# Run the test.
test(app, expect_visible_in_db=True)
In this example, stored events are neither compressed nor encrypted. In consequence, we can expect the recorded values to be visible in the database records.
PostgreSQL environment¶
We can also configure a “production” environment to use PostgreSQL. Using the library’s PostgresSQL infrastructure will keep stored events in a PostgresSQL database.
Please note, to use the library’s PostgreSQL functionality, please install the library with the postgres option (or just install the psycopg2 package.)
$ pip install eventsourcing[postgres]
Please note, the library option postgres_dev will install the psycopg2-binary which is much faster to install, but this option is not recommended for production use. The binary package is a practical choice for development and testing but in production it is advised to use the package built from sources.
The example below also uses zlib and AES to compress and encrypt the stored events (but this is optional). To use the library’s encryption functionality with PostgreSQL, please install the library with both the crypto and the postgres option (or just install the pycryptodome and psycopg2 packages.)
$ pip install eventsourcing[crypto,postgres]
It is assumed for this example that the database and database user have already been created, and the database server is running locally.
import os
from eventsourcing.cipher import AESCipher
# Generate a cipher key (keep this safe).
cipher_key = AESCipher.create_key(num_bytes=32)
# Cipher key.
os.environ['CIPHER_KEY'] = cipher_key
# Cipher topic.
os.environ['CIPHER_TOPIC'] = 'eventsourcing.cipher:AESCipher'
# Compressor topic.
os.environ['COMPRESSOR_TOPIC'] = 'eventsourcing.compressor:ZlibCompressor'
# Use Postgres infrastructure.
os.environ['INFRASTRUCTURE_FACTORY'] = 'eventsourcing.postgres:Factory'
# Configure database connections.
os.environ['POSTGRES_DBNAME'] = 'eventsourcing'
os.environ['POSTGRES_HOST'] = '127.0.0.1'
os.environ['POSTGRES_PORT'] = '5432'
os.environ['POSTGRES_USER'] = 'eventsourcing'
os.environ['POSTGRES_PASSWORD'] = 'eventsourcing'
Having configured the application with these environment variables, we can construct the application and run the test using PostgreSQL.
# Construct an application object.
app = Universe()
# Run the test.
test(app, expect_visible_in_db=False)
In this example, stored events are both compressed and encrypted. In consequence, we can expect the recorded values not to be visible in the database records.
Exercise¶
Follow the steps in this tutorial in your development environment.
Firstly, configure and run the application code you have written with an SQLite database. Secondly, create a PostgreSQL database, and configure and run your application with a PostgreSQL database. Connect to the databases with the command line clients for SQLite and PostgreSQL, and examine the database tables to verify that stored events have been recorded.
Next steps¶
For more information about event-sourced aggregates, please read through the domain module documentation. For more information about event-sourced applications, please read through the application module documentation. For more information about the persistence mechanism for event-sourced applications, please read through the the persistence module documentation.
Modules¶
This library contains several modules that implement event sourcing in Python.
domain
— Domain models¶
This module supports the development of event-sourced domain models.
Following the terminology of Domain-Driven Design, an event-sourced domain model has many event-sourced aggregates. The state of an event-sourced aggregate is determined by a sequence of immutable events. The time needed to reconstruct an aggregate from its domain events can be reduced by using snapshots.
The classes in this module were first introduced merely as a way of showing
how the persistence module can be used. The persistence module is the original
core of this library, and is the cohesive mechanism for storing and retrieving
sequences (“streams”) of immutable events. But without showing how this mechanism
can be used to develop a domain model, there is perhaps a gap where application
developers may expect some solid ground on which to build a “domain object” model
for their applications. Through a process of refinement over several years, the
classes this module have become useful as base classes for a stand-alone
event-sourced domain model. It’s certainly possible to do without these classes,
and alternative ways of coding domain models are possible. However, if you want
to make progress efficiently, you may find that using the Aggregate
base class
and the @event
decorator will speed your creativity in developing a most
compact and effective event-sourced domain model.
This module appears first in the documentation, not because it is the most important section, but rather because this was overwhelmingly the order preferred by the community of users of the library, as expressed in a vote on the matter in 2021.
Aggregates in DDD¶
Aggregates are enduring objects which enjoy adventures of change. The book Domain-Driven Design by Eric Evans describes a design pattern called ‘aggregate’ in the following way.
“An aggregate is a cluster of associated objects that we treat as a unit for the purpose of data changes. Each aggregate has a root and a boundary… Therefore… Cluster the entities and value objects into aggregates and define boundaries around each. Choose one entity to be the root of each aggregate, and control all access to the objects inside the boundary through the root. Allow external objects to hold references to the root only.”
That is to say, an ‘aggregate’ is a cluster of ‘entities’ and ‘value objects’. An ‘entity’ is an object with a fixed unique identity and other attributes that may vary. A ‘value object’ does not vary, nor does it necessarily have a unique identity. This basic notion of a cluster of software objects is understandable as straightforward object-oriented programming.
An aggregate has a ‘root’. The ‘root’ of an aggregate is an entity. This entity is known as the ‘root entity’ or the ‘aggregate root’. Entities have IDs and the ID of the root entity is used to uniquely identify the cluster of objects in a domain model. Access to the cluster of objects is made through the root entity, which means that the root entity will have those methods that are required to interact with the aggregate.
Changes to the cluster of objects are made using ‘command methods’ defined on the root entity, and the state of the cluster of objects is obtained by using either ‘query methods’ or properties of the root entity. The idea of distinguishing between command methods (methods that change state but do not return values) and query methods (methods that return values but do not change state) is known as ‘command-query separation’ or CQS. CQS was devised by Bertrand Meyer and described in his book Object Oriented Software Construction.
The ‘boundary’ of the aggregate is defined by the extent of the cluster of objects. The ‘consistency’ of the cluster of objects is maintained by making sure all the changes that result from a single command are recorded atomically. There is only ever one cluster of objects for any given aggregate, so there is no branching, and the atomic changes have a serial order. These two notions of ‘consistency’ and ‘boundary’ are combined in the notion in Domain-Driven Design of ‘consistency boundary’. Whilst we can recognise the cluster of objects as basic object-orientated programming, and we can recognise the use of command and query methods as the more refined pattern called CQS, the ‘consistency boundary’ notion gives to the aggregates in Domain-Driven Design their distinctive character.
Event-sourced aggregates¶
It is in the Zen of Python that explicit is better than implicit. The changes to an aggregate’s cluster of objects will always follow from decisions made by the aggregate. It will always be true that a decision itself, having happened, does not change. But the results of such a decision are not always expressed explicitly as an immutable event object. Conventional applications update records of this objects in-place as a result of those decisions. For example, consider a bank account with a starting balance of £100. A debit transaction for £20 then occurs, resulting in a new balance of £80. That might be coded as follows:
class BankAccount:
def __init__(self, starting_balance: int):
self.balance = starting_balance
def debit(self, amount):
self.balance = self.balance - amount
account = BankAccount(100)
account.debit(20)
assert account.balance == 80
Note that the event - the reason that the balance changed from 100 to 80 - is
transient. It has a brief existence in the time it takes the debit()
method
to execute, but then is lost. The debit decision itself is implicit; it has no
durable existence. Event-sourced aggregates make these things explicit.
“Explicit is better than implicit.”
To make things explicit, a decision made in the command method of an aggregate can be coded and recorded as an immutable ‘domain event’ object, and this object can be used to evolve the aggregate’s cluster of entities and value objects. For example, bank account statements are comprised of a sequence of transactions. In general, for each event-sourced aggregate, there will a sequence of domain event objects, and the state of an event-sourced aggregate will be determined by this sequence. The state of an aggregate can change, and its sequence of domain events can be augmented. But once created the individual domain event objects do not change. They are what they are.
The state of an aggregate, event-sourced or not, is changed by calling its command methods. In an event-sourced aggregate, the command methods create new domain event objects. The domain events are used to evolve the state of the aggregate. The domain events can be recorded and used to reconstruct the state of the aggregate in the future and elsewhere.
One command may result in many new domain event objects, and a single client request may result in the execution of many commands. To maintain consistency in the domain model, all the domain events triggered by responding to a single client request must be recorded atomically in the order they were created, otherwise the recorded state of the aggregate may become inconsistent with respect to that which was desired or expected.
Domain events¶
Domain event objects are created but do not change.
The library’s DomainEvent
class is a base
class for domain events. It is defined as a frozen Python data class
to model the immutable character of a domain event. It has an originator_id
attribute which is a Python UUID
that identifies a sequence
(or “stream”) to which an event belongs, an originator_version
attribute which is a Python int
that determines its position
in that sequence, and a timestamp
attribute which is a Python
datetime
that represents when the event occurred.
from datetime import datetime
from uuid import uuid4
from eventsourcing.domain import DomainEvent
originator_id = uuid4()
domain_event = DomainEvent(
originator_id=originator_id,
originator_version=2,
timestamp=datetime(2022, 2, 2),
)
assert domain_event.originator_id == originator_id
assert domain_event.originator_version == 2
assert domain_event.timestamp == datetime(2022, 2, 2)
Domain event objects are ordered in their sequence by their originator_version
,
and not by their timestamp
. The timestamps have no consequences for the operation
of this library, and are included to give an approximate indication of when a
domain event object was created. The reason for ordering a sequence of events by
originator_version
and not timestamp
is that the version numbers can form
a gapless sequence that excludes the possibility for inserting new items before old ones,
and timestamps are more likely to have such gaps and anyway can suffer from clock skews.
In this library, domain event objects are specialised into two different kinds: ‘aggregate event’ and ‘snapshot’.
The domain events that are triggered by an aggregate will be referred to as
‘aggregate events’. The library’s AggregateEvent
class is defined as a subclass of the DomainEvent
class. See the Aggregate events section below for more
information about aggregate events.
The state of an aggregate at a particular point in its evolution
can be recorded as a ‘snapshot’. This library’s Snapshot
class is also defined as a subclass of the DomainEvent
class. See the Snapshots section for more information about snapshots.
Aggregate events¶
Aggregate event objects represent original decisions by a domain model that advance
the state of an application. The library’s AggregateEvent
class is defined as a subclass of DomainEvent
. It can be
used to define domain-specific aggregate event objects in your domain model. Aggregate
events are uniquely identifiable in a domain model by the combination of their
originator_id
and originator version
.
from eventsourcing.domain import AggregateEvent
aggregate_event = AggregateEvent(
originator_id=originator_id,
originator_version=2,
timestamp=datetime(2022, 2, 2),
)
assert aggregate_event.originator_id == originator_id
assert aggregate_event.originator_version == 2
assert aggregate_event.timestamp == datetime(2022, 2, 2)
The AggregateEvent
class extends
DomainEvent
by defining two methods,
mutate()
and
apply()
.
Firstly, the AggregateEvent
class has a
mutate()
method. This method has
a non-optional argument aggregate
which is used to pass into the method
the aggregate object to which the aggregate event object pertains . We will
discuss aggregate objects in more detail below, but for the purpose of the
discussion in this section, all we need to know is that the aggregate
argument is expected to have four attributes: id
,
version
,
created_on
, and
modified_on
. The class A
defined below
will suffice for our current purpose of explaining the aggregate event classes.
class A:
def __init__(self, id, version, created_on):
self.id = id
self.version = version
self.created_on = created_on
self.modified_on = created_on
When mutate()
is called on an aggregate event
object, several operations are performed. It checks
the event’s originator_id
equals the aggregate
object’s
id
, if not an
OriginatorIDError
exception is raised.
It checks the event’s originator_version
equals the aggregate’s
version
number plus 1
, if not an
OriginatorVersionError
exception is raised.
It then calls the apply()
method, which is discussed below.
Then, it increments the aggregate’s version
. It
assigns the event’s timestamp
to the aggregate’s modified_on
property. And then it returns the modified object to the caller.
a = A(id=originator_id, version=1, created_on=datetime(2011, 1, 1))
a = aggregate_event.mutate(a)
assert a.version == 2
assert a.modified_on == datetime(2022, 2, 2)
Secondly, the AggregateEvent
class has an
apply()
method, which is called
by the mutate()
method. The
apply()
method has
a non-optional argument aggregate
which is used to provide the aggregate
object to which the domain event object pertains. The base class’s
apply()
method body is empty, and
so this method can be simply overridden (implemented without a call to the
superclass method). It is also not expected to return a value (any value that
it does return will be ignored). Hence this method can be simply and conveniently
implemented in aggregate event classes to apply an event’s attribute values to an
aggregate. The apply()
method is called
after the validation checks and before modifying the aggregate’s state, so that if
it raises an exception, then the aggregate will remain unmodified by the
mutate()
method.
class MyEvent(AggregateEvent):
full_name: str
def apply(self, aggregate):
aggregate.full_name = self.full_name
my_event = MyEvent(
originator_id=originator_id,
originator_version=3,
timestamp=datetime(2033, 3, 3),
full_name="Eric Idle"
)
a = my_event.mutate(a)
assert a.version == 3
assert a.modified_on == datetime(2033, 3, 3)
assert a.full_name == "Eric Idle"
The library also has an
AggregateCreated
class which represents
the creation of an aggregate. It extends AggregateEvent
with
its attribute originator_topic
which is a Python str
. The value of this
attribute will be a topic that describes the path to the aggregate
object’s class. It has a mutate()
method which constructs an aggregate object after resolving the originator_topic
value to an aggregate class. Although this method has the same signature as the base class’s
method, the argument is expected to be None
and is anyway ignored. It does not call
apply()
since the aggregate class’s __init__()
method receives the “created” event attribute values and can initialise the aggregate
object in the usual way. The event’s attributes originator_id
, originator_version
,
and timestamp
are actually passed to a method __base_init__()
so that subclasses can define an __init__
method that, for the purpose of simplicity and concision,
neither needs to have the common attributes in its signature nor needs to call super()
.
The __base_init__()
method initialises the aggregate’s
id
, version
, created_on
, and modified_on
properties. After
calling __base_init__()
, this
mutate()
method then calls the aggregate’s __init__
method with the other event object attributes, the attributes which are particular to the particular
“created” event and aggregate class, and then returns the newly constructed aggregate
object to the caller.
from eventsourcing.domain import AggregateCreated
aggregate_created = AggregateCreated(
originator_topic="eventsourcing.domain:Aggregate",
originator_id=originator_id,
originator_version=1,
timestamp=datetime(2011, 1, 1),
)
a = aggregate_created.mutate(None)
assert a.__class__.__name__ == "Aggregate"
assert a.__class__.__module__ == "eventsourcing.domain"
assert a.id == originator_id
assert a.version == 1
assert a.created_on == datetime(2011, 1, 1)
assert a.modified_on == datetime(2011, 1, 1)
The object returned by calling the mutate()
method on one aggregate event object can be passed in when calling the same method on
another aggregate event object. In this way, the mutate()
methods of a sequence of aggregate event objects can be used successively to reconstruct
the current state of an aggregate.
a = aggregate_created.mutate(None)
assert a.id == originator_id
assert a.version == 1
assert a.modified_on == datetime(2011, 1, 1)
a = aggregate_event.mutate(a)
assert a.id == originator_id
assert a.version == 2
assert a.modified_on == datetime(2022, 2, 2)
a = my_event.mutate(a)
assert a.id == originator_id
assert a.version == 3
assert a.modified_on == datetime(2033, 3, 3)
assert a.full_name == "Eric Idle"
Hence, the mutate()
and
apply()
methods of aggregate events
can be used effectively to implement an “aggregate projector”, or “mutator function”
by which the state of an aggregate will be reconstructed from its history of events.
This is essentially how the get()
method of an application repository reconstructs an aggregate
from stored events when the aggregate is requested by ID.
def reconstruct_aggregate_from_events(events):
a = None
for event in events:
a = event.mutate(a)
return a
events = [
aggregate_created,
aggregate_event,
my_event,
]
a = reconstruct_aggregate_from_events(events)
assert a.id == originator_id
assert a.version == 3
assert a.modified_on == datetime(2033, 3, 3)
assert a.full_name == "Eric Idle"
By making the aggregate
argument and return value optional,
“created” events can be defined which start from None
and “discarded” events
can be defined which return None
. For example, an initial “created” event can
construct an aggregate object, subsequent events can receive an aggregate and
return a modified aggregate, and a final “discarded” event can receive an aggregate
and return None
.
class AggregateDiscarded(AggregateEvent):
def mutate(self, aggregate):
super().mutate(aggregate)
aggregate._is_discarded = True
return None
aggregate_discarded = AggregateDiscarded(
originator_id=originator_id,
originator_version=4,
timestamp=datetime(2044, 4, 4),
)
events.append(aggregate_discarded)
a = reconstruct_aggregate_from_events(events)
assert a is None
Aggregate base class¶
This library’s Aggregate
class is a base class for
event-sourced aggregates. It can be used to develop event-sourced aggregates. See
for example the World
aggregate in the Simple example
below.
from eventsourcing.domain import Aggregate
It has three methods which can be used by and on subclasses:
the “private” class method
_create()
will create new aggregate objects;the object method
trigger_event()
can be used to trigger subsequent events; andthe object method
collect_events()
returns new events that have just been triggered.
These methods are explained below.
Creating new aggregates¶
Firstly, the Aggregate
class has a “private” class
method _create()
which can be used to create
a new aggregate object.
aggregate_id = uuid4()
aggregate = Aggregate._create(Aggregate.Created, id=aggregate_id)
When called, this method creates the first of a new sequence of aggregate event objects, and uses this aggregate event object to construct and initialise an instance of the aggregate class, which it then returns to the caller.
Usually, this generically named “private” method will be called by a “public”
class method defined on a subclass of the Aggregate
base class. For example, see the class method create()
of World
aggregate
class in the Simple example below. Your public
“create” method should be named after a particular concern
in your domain. Nevertheless, we need a name to refer to this sort of thing
in general, and the words “create” and “created” are used for that purpose.
The _create()
method has a required positional
argument event_class
which is used by the caller to pass a domain event class that
will represent the fact that an aggregate was “created”. A domain event object of this
type will be constructed by this method, and this domain event object will be used to
construct and initialise an aggregate object.
The Aggregate
class is defined with
a nested class Created
, which is
is a subclass of the AggregateCreated
base class
discussed in the Aggregate events section above. It can be used
or extended (and is used in the example above) to represent the fact that an aggregate
was “created”. It is defined for convenience, and adds no further attributes or methods.
The Created
class can
be used directly, but can also be subclassed to define a particular “created”
event class for a particular aggregate class, with a suitable name and
with suitable extra attributes that represent the particular beginning
of a particular type of aggregate. Aggregate event classes should be named using
past participles (regular or irregular). And a “created” event class should be named
using a past participle that describes the initiation or the beginning of something,
such as “Started”, “Opened”, or indeed “Created”.
The _create()
method also has
an optional id
argument which should be a Python UUID
object that will be used to uniquely identify the aggregate in the domain
model. It uses the given value of its id
argument as the new event’s
originator_id
. If a value is not provided, by default a
version 4 UUID
will be created by calling its create_id()
method. It will also, by default, set the originator_version
to the value of 1
. It derives the event’s originator_topic
value from the
aggregate class itself, using the library’s get_topic()
function. And it calls datetime.now()
to create the event’s timestamp
value, a timezone-aware Python datetime
object.
The _create()
method also accepts variable
keyword arguments, **kwargs
, which if given will also be used to construct the event
object in addition to those mentioned above. Since the “created” event object will be
constructed with these additional arguments, so these other method arguments must be
matched by the particular attributes of your “created” event class.
After creating the event object, the _create()
method will construct the aggregate object by calling the “created” event’s
mutate()
method.
If the “created” event extends the base class by defining any additional event attributes
that are particular to itself, those attributes required to match the extra arguments passed
to the _create()
method as the variable keyword
arguments, then an initializer method __init__()
must be coded on the concrete aggregate
class which accepts these additional attributes, since (as discussed above) they will all be
passed to the __init__()
method when it is called by the “created” event’s
mutate()
method. Having constructed the
aggregate object from the “created” event object, the
_create()
method will append the “created” event
object to the aggregate object’s list of “pending events”, and then return the aggregate object
to the caller.
Having been created, an aggregate object will have an aggregate ID. The ID is presented
by its id
property. The ID will be identical to
the value passed to the _create()
method
with the id
argument.
assert aggregate.id == aggregate_id
A new aggregate instance has a version number. The version number is presented by its
version
property, and is a Python int
.
The initial version of a newly created aggregate is, by default, always 1
. If you want
to start at a different number, set INITIAL_VERSION
on the aggregate class.
assert aggregate.version == 1
A new aggregate instance has a created_on
property which gives the date and time when an aggregate object was created, and is determined
by the timestamp attribute of the first event in the aggregate’s sequence, which is the “created”
event. The timestamps are timezone-aware Python datetime
objects.
assert isinstance(aggregate.created_on, datetime)
A new aggregate instance also has a modified_on
property which gives the date and time when an aggregate object was last modified, and is determined
by the timestamp attribute of the last event in the aggregate’s sequence.
assert isinstance(aggregate.modified_on, datetime)
Initially, since there is only one event in the aggregate’s sequence,
the created_on
and
modified_on
values are
identical, and equal to the timestamp of the “created” event.
assert aggregate.created_on == aggregate.modified_on
Triggering subsequent events¶
Secondly, the Aggregate
class has a
method trigger_event()
which can be called
to trigger a subsequent aggregate event object.
aggregate.trigger_event(Aggregate.Event)
This method can be called by the command methods of an aggregate to
capture the decisions that are made. For example, see the
make_it_so()
method of the World
class in the Simple example
below. The creation of the aggregate event object is the final stage in the
coming-to-be of a private individual occasion of experience within the aggregate,
that begins before the event object is created. The event object is the beginning
of a transition of this private individuality to the publicity of many such things,
one stubborn fact amongst the many which together determine the current state of an
event-sourced application.
Like the _create()
method, the
trigger_event()
method has a required
positional argument event_class
, which is the type of aggregate
event object to be triggered.
It uses the id
attribute of the aggregate as the originator_id
of the new
domain event. It uses the current aggregate version
to create the next version
number (by adding 1
) and uses this value as the originator_version
of the
new aggregate event. It calls datetime.now()
to create the timestamp
value of the new domain event, a timezone-aware Python datetime
object
The Aggregate
class has a nested
Event
class. It is defined
as a subclass of the AggregateEvent
discussed above in the section on Aggregate events.
The Event
class can be
used as a base class to define the particular aggregate event classes
needed by in your domain model. For example, see the SomethingHappened
class in the Simple example below.
Aggregate event classes are usually named using past participles
to describe what was decided by the command method, such as “Done”,
“Updated”, “Closed”… names that are meaningful in your domain,
expressing and helping to constitute your project’s ubiquitous language.
The trigger_event()
method also accepts arbitrary
keyword-only arguments, which will be used to construct the aggregate event object. As with the
_create()
method described above, the event object will be constructed
with these arguments, and so any extra arguments must be matched by the expected values of
the event class. For example, what
on the SomethingHappened
event class definition
in the Simple example below matches the what=what
keyword argument passed in the call to the trigger_event()
method in the make_it_so()
command.
After creating the aggregate event object, the trigger_event()
method
will apply the event to the aggregate by calling the event object’s
mutate()
method, which will
call the event’s apply()
method,
and then update the version
and modified_on
attributes.
Finally, it will append the aggregate event to the aggregate object’s list of pending events.
Hence, after calling trigger_event()
, the aggregate’s
version
will have been incremented by 1
, and the
modified_on
time should be greater than the created_on
time.
assert aggregate.version == 2
assert aggregate.modified_on > aggregate.created_on
Collecting pending events¶
Thirdly, the Aggregate
class has a “public” object method
collect_events()
which can be called to collect the aggregate events that have been created
either after the last call to this method, or after the aggregate object was
constructed if the method hasn’t been called.
pending_events = aggregate.collect_events()
assert len(pending_events) == 2
assert isinstance(pending_events[0], Aggregate.Created)
assert pending_events[0].originator_id == aggregate.id
assert pending_events[0].originator_version == 1
assert pending_events[0].timestamp == aggregate.created_on
assert isinstance(pending_events[1], Aggregate.Event)
assert pending_events[1].originator_id == aggregate.id
assert pending_events[1].originator_version == 2
assert pending_events[1].timestamp == aggregate.modified_on
The collect_events()
method is called without
any arguments. In the example above, we can see two aggregate event objects have been
collected, because we created an aggregate and then triggered a subsequent event.
This method will drain the aggregate object’s list of pending events, so that if this method is called again before any further aggregate events have been appended, it will return an empty list.
pending_events = aggregate.collect_events()
assert len(pending_events) == 0
Simple example¶
In the example below, the World
aggregate is defined as a subclass of the Aggregate
class.
class World(Aggregate):
def __init__(self):
self.history = []
@classmethod
def create(cls):
return cls._create(cls.Created, id=uuid4())
def make_it_so(self, what):
self.trigger_event(self.SomethingHappened, what=what)
class SomethingHappened(Aggregate.Event):
what: str
def apply(self, world):
world.history.append(self.what)
The __init__()
method
initialises a history
attribute with an empty Python list
object.
The World.create()
class method creates and returns
a new World
aggregate object. It calls the inherited
_create()
method that
we discussed above. It uses its own Created
event class as
the value of the event_class
argument. As above, it uses a
version 4 UUID
object as the value of the id
argument. See the Namespaced IDs
section below for a discussion about using version 5 UUIDs.
The make_it_so()
method is a command method that triggers
a World.SomethingHappened
domain event. It calls the inherited
trigger_event()
method.
The event is triggered with the method argument what
.
The nested SomethingHappened
class is a frozen data class that extends the
base aggregate event class Aggregate.Event
(also a frozen data class) with a
field what
which is defined as a Python str
. An apply()
method
is defined which appends the what
value to the aggregate’s history
. This
method is called when the event is triggered (see Aggregate events).
By defining the event class under the command method which triggers it, and then
defining an apply()
method as part of the event class definition, the story of
calling a command method, triggering an event, and evolving the state of the aggregate
is expressed in three cohesive parts that are neatly co-located.
Having defined the World
aggregate class, we can create a new World
aggregate object by calling the World.create()
class method.
world = World.create()
assert isinstance(world, World)
The aggregate’s attributes created_on
and modified_on
show
when the aggregate was created and when it was modified. Since there
has only been one domain event, these are initially equal.
These values follow from the timestamp
values of the domain event
objects, and represent when the aggregate’s first and last domain events
were created.
assert world.created_on == world.modified_on
We can now call the aggregate object methods. The World
aggregate has a command
method make_it_so()
which triggers the SomethingHappened
event. The
apply()
method of the SomethingHappened
class appends the what
of the event to the history
of the world
. So when we call the make_it_so()
command, the argument what
will be appended to the history
.
# Commands methods trigger events.
world.make_it_so("dinosaurs")
world.make_it_so("trucks")
world.make_it_so("internet")
# State of aggregate object has changed.
assert world.history[0] == "dinosaurs"
assert world.history[1] == "trucks"
assert world.history[2] == "internet"
Now that more than one domain event has been created, the aggregate’s
modified_on
value is greater than its created_on
value.
assert world.modified_on > world.created_on
The resulting domain events are now held internally in the aggregate in
a list of pending events. The pending events can be collected by calling
the aggregate’s collect_events()
method.
These events are pending to be saved, and indeed the library’s
application object has a
save()
method which
works by calling this method. So far, we have created four domain events and
we have not yet collected them, and so there will be four pending events: one
Created
event, and three SomethingHappened
events.
# Has four pending events.
assert len(world.pending_events) == 4
# Collect pending events.
pending_events = world.collect_events()
assert len(pending_events) == 4
assert len(world.pending_events) == 0
assert isinstance(pending_events[0], World.Created)
assert isinstance(pending_events[1], World.SomethingHappened)
assert isinstance(pending_events[2], World.SomethingHappened)
assert isinstance(pending_events[3], World.SomethingHappened)
assert pending_events[1].what == "dinosaurs"
assert pending_events[2].what == "trucks"
assert pending_events[3].what == "internet"
assert pending_events[0].timestamp == world.created_on
assert pending_events[3].timestamp == world.modified_on
As discussed above, the event objects can be used to reconstruct
the current state of the aggregate, by calling their
mutate()
methods.
copy = None
for domain_event in pending_events:
copy = domain_event.mutate(copy)
assert isinstance(copy, World)
assert copy.id == world.id
assert copy.version == world.version
assert copy.created_on == world.created_on
assert copy.modified_on == world.modified_on
assert copy.history == world.history
The get()
method of the application repository class
works by calling these methods in this way for this purpose.
Namespaced IDs¶
Aggregates can be created with version 5 UUIDs so that their IDs can be generated from a given name in a namespace. They can be used for example to create IDs for aggregates with fixed names that you want to identify by name. For example, you can use this technique to identify a system configuration object. This technique can also be used to identify index aggregates that hold the IDs of aggregates with mutable names, or used to index other mutable attributes of an event sourced aggregate. It isn’t possible to change the ID of an existing aggregate, because the domain events will need to be stored together in a single sequence. And so, using an index aggregate that has an ID that can be recreated from a particular value of a mutable attribute of another aggregate to hold the ID of that aggregate with makes it possible to identify that aggregate from that particular value. Such index aggregates can be updated when the mutable attribute changes, or not.
For example, if you have a collection of page aggregates with names that might change,
and you want to be able to identify the pages by name, then you can create index
aggregates with version 5 UUIDs that are generated from the names, and put the IDs
of the page aggregates in the index aggregates. The aggregate classes Page
and Index
in the example code below show how this can be done.
If we imagine we can save these page and index aggregates and retrieve them by ID, we can imagine retrieving a page aggregate using its name by firstly recreating an index ID from the page name, retrieving the index aggregate using that ID, getting the page ID from the index aggregate, and then using that ID to retrieve the page aggregate. When the name is changed, a new index aggregate can be saved along with the page, so that later the page aggregate can be retrieved using the new name. See the discussion about saving multiple aggregates to see an example of how this can work.
from uuid import NAMESPACE_URL, uuid5, UUID
from typing import Optional
from eventsourcing.domain import Aggregate
class Page(Aggregate):
def __init__(self, name: str, body: str):
self.name = name
self.body = body
@classmethod
def create(cls, name: str, body: str = ""):
return cls._create(
id=uuid4(),
event_class=cls.Created,
name=name,
body=body
)
class Created(Aggregate.Created):
name: str
body: str
def update_name(self, name: str):
self.trigger_event(self.NameUpdated, name=name)
class NameUpdated(Aggregate.Event):
name: str
def apply(self, page: "Page"):
page.name = self.name
class Index(Aggregate):
def __init__(self, name: str, ref: UUID):
self.name = name
self.ref = ref
@classmethod
def create(cls, name: str, ref: UUID):
return cls._create(
event_class=cls.Created,
id=cls.create_id(page.name),
name=page.name,
ref=page.id
)
@staticmethod
def create_id(name: str):
return uuid5(NAMESPACE_URL, f"/pages/{name}")
class Created(Aggregate.Created):
name: str
ref: UUID
def update_ref(self, ref):
self.trigger_event(self.RefUpdated, ref=ref)
class RefUpdated(Aggregate.Event):
ref: Optional[UUID]
def apply(self, index: "Index"):
index.ref = self.ref
We can use the classes above to create a “page” aggregate with a name that we will then change. We can at the same time create an index object for the page.
page = Page.create(name="Erth")
index1 = Index.create(page.name, page.id)
Let’s imagine these two aggregate are saved together, and having been saved can be retrieved by ID. See the discussion about saving multiple aggregates to see how this works in an application object.
We can use the page name to recreate the index ID, and use the index ID to retrieve the index aggregate. We can then obtain the page ID from the index aggregate, and then use the page ID to get the page aggregate.
index_id = Index.create_id("Erth")
assert index_id == index1.id
assert index1.ref == page.id
Now let’s imagine we want to correct the name of the page. We can update the name of the page, and create another index aggregate for the new name, so that later we can retrieve the page using its new name.
page.update_name("Earth")
index2 = Index.create(page.name, page.id)
We can drop the reference from the old index, so that it can be used to refer to a different page.
We can now use the new name to get the ID of the second index aggregate, and imagine using the second index aggregate to get the ID of the page.
index_id = Index.create_id("Earth")
assert index_id == index2.id
assert index2.ref == page.id
Saving and retrieving aggregates by ID is demonstrated in the discussion about saving multiple aggregates in the applications documentation.
Alternative mutator function¶
An alternative to defining apply()
methods
on all the aggregate event classes is to define apply methods on the aggregate class.
A base Event
class can be defined to have an apply()
method which simply
calls an apply()
method on the aggregate object, passing the aggregate event object
as an argument. The aggregate’s apply()
method can be decorated with the
@singledispatchmethod
decorator, allowing event-specific parts to be registered,
or implemented as a big if-else block. Event-specific parts of the projection can be
defined that will apply particular types events to the aggregate in a particular way.
Defining the aggregate projector with methods on the aggregate class has the advantage
of setting values on self
, which avoids the reverse of intuition that occurs when
writing apply()
methods on the events, and makes it legitimate to set values on
“private” attributes.
See the Cargo
aggregate in the domain model section of the Cargo shipping
example for an example of this alternative.
Another alternative is to use a mutator function defined at the module level. But then
the event-specific parts of the aggregate projector will become more distant from
definition of the event. The issue of setting “private” attributes on the aggregate
returns. And the common aspects may need to repeated on each part of the projection
that handles a particular type of event, which can be repetitive. However, such a
function can be passed into the get()
method of an application repository using the
projector_func
argument of that method.
A further alternative, which is highly recommended, is to use the new
declarative syntax, especially the @event
decorator, which is used on the project’s README page, and which is explained
in detail below.
See also the Notes section for a more detailed discussion of these choices.
Declarative syntax¶
You may have noticed a certain amount of repetition in the definitions of the aggregates above. In several places, the same argument was defined in a command method, on an event class, and in an apply method. The library offers a more concise way to express aggregates by using a declarative syntax. This is possibly the most concise way of expressing an event-sourced domain model across all programming languages.
Create new aggregate by calling the aggregate class directly¶
An aggregate can be “created” by calling the aggregate class directly. You don’t actually need to define a class method to do this, although you may wish to express your project’s ubiquitous language by doing so.
class MyAggregate(Aggregate):
class Started(Aggregate.Created):
pass
# Call the class directly.
agg = MyAggregate()
# There is one pending event.
pending_events = agg.collect_events()
assert len(pending_events) == 1
assert isinstance(pending_events[0], MyAggregate.Started)
# The pending event can be used to reconstruct the aggregate.
copy = pending_events[0].mutate(None)
assert copy.id == agg.id
assert copy.created_on == agg.created_on
Using the init method to define the created event class¶
If a “created” event class is not defined on an aggregate class,
one will be automatically defined. The attributes of this event
class will be derived by inspecting the signature of the
__init__()
method. The example below has an init method that
has a name
argument. Because this example doesn’t have a “created”
event class defined explicitly on the aggregate class, a “created”
event class will be defined automatically to match the signature
of the init method. That is, a “created” event class will be defined
that has an attribute name
.
class MyAggregate(Aggregate):
def __init__(self, name):
self.name = name
# Call the class with a 'name' argument.
agg = MyAggregate(name="foo")
assert agg.name == "foo"
# There is one pending event.
pending_events = agg.collect_events()
assert len(pending_events) == 1
# The pending event is a "created" event.
assert isinstance(pending_events[0], MyAggregate.Created)
# The "created" event is defined on the aggregate class.
assert type(pending_events[0]).__qualname__ == "MyAggregate.Created"
# The "created" event has a 'name' attribute.
pending_events[0].name == "foo"
# The "created" event can be used to reconstruct the aggregate.
copy = pending_events[0].mutate(None)
assert copy.name == agg.name
Please note, by default the name “Created” will be used for an automatically
defined “created” event class. However, the name of the “create” class can be specified
using the aggregate class argument created_event_name
, and it can be defined by using
an @event
decorator on the aggregate’s __init__()
method.
Dataclass-style init methods¶
Python’s dataclass annotations can be used to define an aggregate’s
__init__()
method. A “created” event class will then be
automatically defined from the automatically defined method.
from dataclasses import dataclass
@dataclass
class MyAggregate(Aggregate):
name: str
# Create a new aggregate.
agg = MyAggregate(name="foo")
# The aggregate has a 'name' attribute
assert agg.name == "foo"
# The created event has a 'name' attribute.
pending_events = agg.collect_events()
pending_events[0].name == "foo"
Anything that works on a dataclass should work here too. For example, optional arguments can be defined by providing default values on the attribute definitions.
@dataclass
class MyAggregate(Aggregate):
name: str = "bar"
# Call the class without a name.
agg = MyAggregate()
assert agg.name == "bar"
# Call the class with a name.
agg = MyAggregate("foo")
assert agg.name == "foo"
And you can define “non-init argument” attributes, attributes
that will be initialised in the __init__()
method but not
appear as arguments of that method, by using the field
feature of the dataclasses module.
from dataclasses import field
from typing import List
@dataclass
class MyAggregate(Aggregate):
history: List[str] = field(default_factory=list, init=False)
# Create a new aggregate.
agg = MyAggregate()
# The aggregate has a list.
assert agg.history == []
Please note, when using the dataclass-style for defining __init__()
methods, using the @dataclass
decorator on your aggregate class
will inform your IDE of the method signature, and then command completion
should work when calling the class. The annotations will in any case be used
to create an __init__()
method when the class does not already have an
__init__()
. Using the dataclass decorator merely enables code completion
and syntax checking, but the code will run just the same with or without the
@dataclass
decorator being applied to aggregate classes that
are defined using this style.
Declaring the created event class name¶
To give the “created” event class a particular name, use the class argument created_event_name
.
class MyAggregate(Aggregate, created_event_name="Started"):
name: str
# Create a new aggregate.
agg = MyAggregate("foo")
# The created event class is called "Started".
pending_events = agg.collect_events()
assert isinstance(pending_events[0], MyAggregate.Started)
This is equivalent to declaring a “created” event class on the aggregate class.
class MyAggregate(Aggregate):
class Started(Aggregate.Created):
pass
# Create a new aggregate.
agg = MyAggregate()
# The created event class is called "Started".
pending_events = agg.collect_events()
assert isinstance(pending_events[0], MyAggregate.Started)
If more than one “created” event class is defined on the aggregate class, perhaps
because the name of the “created” event class was changed and there are stored events
that were created using the old “created” event class that need to be supported,
then created_event_name
can be used to identify which “created” event
class is the one to use when creating new aggregate instances.
class MyAggregate(Aggregate, created_event_name="Started"):
class Created(Aggregate.Created):
pass
class Started(Aggregate.Created):
pass
# Create a new aggregate.
agg = MyAggregate()
# The created event class is called "Started".
pending_events = agg.collect_events()
assert isinstance(pending_events[0], MyAggregate.Started)
If created_event_name
is used but the value does not match
the name of any of the “created” event classes that are explicitly defined on the
aggregate class, then an event class will be automatically defined, and it
will be used when creating new aggregate instances.
class MyAggregate(Aggregate, created_event_name="Opened"):
class Created(Aggregate.Created):
pass
class Started(Aggregate.Created):
pass
# Create a new aggregate.
agg = MyAggregate()
# The created event class is called "Opened".
pending_events = agg.collect_events()
assert isinstance(pending_events[0], MyAggregate.Opened)
Defining the aggregate ID¶
By default, the aggregate ID will be a version 4 UUID, automatically
generated when a new aggregate is created. However, the aggregate ID
can also be defined as a function of the arguments used to create the
aggregate. You can do this by defining a create_id()
method.
class MyAggregate(Aggregate):
name: str
@staticmethod
def create_id(name: str):
return uuid5(NAMESPACE_URL, f"/my_aggregates/{name}")
# Create a new aggregate.
agg = MyAggregate(name="foo")
assert agg.name == "foo"
# The aggregate ID is a version 5 UUID.
assert agg.id == MyAggregate.create_id("foo")
If a create_id()
method is defined on the aggregate class, the base class
method create_id()
will be overridden. The arguments used in this method must be a subset of the
arguments used to create the aggregate. The base class method simply returns a
version 4 UUID, which is the default behaviour for generating aggregate IDs.
Alternatively, an id
attribute can be declared on the aggregate
class, and an id
argument supplied when creating new aggregates.
def create_id(name: str):
return uuid5(NAMESPACE_URL, f"/my_aggregates/{name}")
class MyAggregate(Aggregate):
id: UUID
# Create an ID.
agg_id = create_id(name="foo")
# Create an aggregate with the ID.
agg = MyAggregate(id=agg_id)
assert agg.id == agg_id
When defining an explicit __init__()
method, the id
argument can
be set on the object as self._id
. Assigning to self.id
won’t work
because id
is defined as a read-only property on the base aggregate class.
class MyAggregate(Aggregate):
def __init__(self, id: UUID):
self._id = id
# Create an aggregate with the ID.
agg = MyAggregate(id=agg_id)
assert agg.id == agg_id
The @event
decorator¶
A more concise way of expressing the concerns around defining, triggering and
applying subsequent aggregate events can be achieved by using the library function
event()
to decorate aggregate command methods.
When decorating a method with the @event
decorator, the method signature
will be used to automatically define an aggregate event class. And when the
method is called, the event will firstly be triggered with the values given
when calling the method, so that an event is created and used to mutate the
state of the aggregate. The body of the decorated method will be used as the
apply()
method of the event, both after the event has been triggered and
when the aggregate is reconstructed from stored events. The name of the event
class can be passed to the decorator as a Python str
.
from eventsourcing.domain import event
class MyAggregate(Aggregate):
name: str
@event("NameUpdated")
def update_name(self, name):
self.name = name
# Create an aggregate.
agg = MyAggregate(name="foo")
assert agg.name == "foo"
# Update the name.
agg.update_name("bar")
assert agg.name == "bar"
# There are two pending events.
pending_events = agg.collect_events()
assert len(pending_events) == 2
assert pending_events[0].name == "foo"
# The second pending event is a 'NameUpdated' event.
assert isinstance(pending_events[1], MyAggregate.NameUpdated)
# The second pending event has a 'name' attribute.
assert pending_events[1].name == "bar"
Please also note, if an exception happens to be raised in the decorated method body, then the triggered event will not be appended to the internal list of pending events as described above. If you are careful, this behaviour (of not appending the event to the list of pending events) can be used to validate the state of the event against the current state of the aggregate. But if you wish to continue using the same aggregate instance after catching a validation exception in the caller of the decorated method, please just be careful to complete all validation before adjusting the state of the aggregate, otherwise you will need to retrieve a fresh instance from the repository.
This decorator also works with the __init__()
methods.
class MyAggregate(Aggregate):
@event("Started")
def __init__(self, name):
self.name = name
# Call the class with a 'name' argument.
agg = MyAggregate(name="foo")
assert agg.name == "foo"
# There is one pending event.
pending_events = agg.collect_events()
assert len(pending_events) == 1
# The pending event is a "created" event.
assert isinstance(pending_events[0], MyAggregate.Started)
# The "created" event is defined on the aggregate class.
assert type(pending_events[0]).__qualname__ == "MyAggregate.Started"
# The "created" event has a 'name' attribute.
pending_events[0].name == "foo"
# The "created" event can be used to reconstruct the aggregate.
copy = pending_events[0].mutate(None)
assert copy.name == agg.name
Inferring the event class name from the method name¶
The @event
decorator can be used without providing
the name of an event. If the decorator is used without any
arguments, the name of the event will be derived from the
method name. The method name is assumed to be lower case
and underscore-separated. The name of the event class is
constructed by firstly splitting the name of the method by its
underscore characters, then by capitalising the resulting parts,
and then by concatenating the capitalised parts to give an
“upper camel case” class name. For example, a method name
name_updated
would give an event class name NameUpdated
.
from eventsourcing.domain import event
class MyAggregate(Aggregate):
name: str
@event
def name_updated(self, name):
self.name = name
# Create an aggregate.
agg = MyAggregate(name="foo")
assert agg.name == "foo"
# Update the name.
agg.name_updated("bar")
assert agg.name == "bar"
# There are two pending events.
pending_events = agg.collect_events()
assert len(pending_events) == 2
assert pending_events[0].name == "foo"
# The second pending event is a 'NameUpdated' event.
assert isinstance(pending_events[1], MyAggregate.NameUpdated)
# The second pending event has a 'name' attribute.
assert pending_events[1].name == "bar"
However, this creates a slight tension in the naming conventions because methods should normally be named using the imperative form and event names should normally be past participles. However, this can be useful when naming methods that will be only called by aggregate command methods under certain conditions.
For example, if an attempt is made to update the value of an attribute, but the given value happens to be identical to the existing value, then it might be desirable to skip on having an event triggered.
class MyAggregate(Aggregate):
name: str
def update_name(self, name):
if name != self.name:
self.name_updated(name)
@event
def name_updated(self, name):
self.name = name
# Create an aggregate.
agg = MyAggregate(name="foo")
assert agg.name == "foo"
# Update the name lots of times.
agg.update_name("foo")
agg.update_name("foo")
agg.update_name("foo")
agg.update_name("bar")
agg.update_name("bar")
agg.update_name("bar")
agg.update_name("bar")
# There are two pending events (not eight).
pending_events = agg.collect_events()
assert len(pending_events) == 2, len(pending_events)
Using an explicitly defined event class¶
Of course, you may wish to define event classes explicitly. You
can refer to the event class in the decorator, rather than using
a string. The synonymous decorator @triggers
can be used
instead of the @event
decorator (it does the same thing).
from eventsourcing.domain import triggers
class MyAggregate(Aggregate):
name: str
class NameUpdated(Aggregate.Event):
name: str
@triggers(NameUpdated)
def update_name(self, name):
self.name = name
# Create an aggregate.
agg = MyAggregate(name="foo")
assert agg.name == "foo"
# Update the name.
agg.update_name("bar")
assert agg.name == "bar"
# There are two pending events.
pending_events = agg.collect_events()
assert len(pending_events) == 2
assert pending_events[0].name == "foo"
# The second pending event is a 'NameUpdated' event.
assert isinstance(pending_events[1], MyAggregate.NameUpdated)
# The second pending event has a 'name' attribute.
assert pending_events[1].name == "bar"
The World aggregate class revisited¶
Using the declarative syntax described above, the World
aggregate in
the Simple example above can be
expressed more concisely in the following way.
class World(Aggregate):
def __init__(self):
self.history = []
@event("SomethingHappened")
def make_it_so(self, what):
self.history.append(what)
In this example, the World.SomethingHappened
event is automatically
defined by inspecting the make_it_so()
method. The event class
name “SomethingHappened” is given to the method’s decorator. The
body of the make_it_so()
method will be used as the apply()
method of the World.SomethingHappened
event.
The World
aggregate class can be called directly. Calling the
make_it_so()
method will trigger a World.SomethingHappened
event, and this event will be used to mutate the state of the aggregate,
such that the make_it_so()
method argument what
will be
appended to the aggregate’s history
attribute.
world = World()
world.make_it_so("dinosaurs")
world.make_it_so("trucks")
world.make_it_so("internet")
assert world.history[0] == "dinosaurs"
assert world.history[1] == "trucks"
assert world.history[2] == "internet"
As before, the pending events can be collected and used to reconstruct the aggregate object.
pending_events = world.collect_events()
assert len(pending_events) == 4
copy = reconstruct_aggregate_from_events(pending_events)
assert copy.id == world.id
assert copy.version == world.version
assert copy.created_on == world.created_on
assert copy.modified_on == world.modified_on
assert copy.history[0] == "dinosaurs"
assert copy.history[1] == "trucks"
assert copy.history[2] == "internet"
The Page and Index aggregates revisited¶
The Page
and Index
aggregates defined in the above
discussion about namespaced IDs can be expressed more
concisely in the following way.
@dataclass
class Page(Aggregate):
name: str
body: str = ""
@event("NameUpdated")
def update_name(self, name: str):
self.name = name
@dataclass
class Index(Aggregate):
name: str
ref: Optional[UUID]
@staticmethod
def create_id(name: str):
return uuid5(NAMESPACE_URL, f"/pages/{name}")
@event("RefUpdated")
def update_ref(self, ref: Optional[UUID]):
self.ref = ref
# Create new page and index aggregates.
page = Page(name="Erth")
index1 = Index(name=page.name, ref=page.id)
# The page name can be used to recreate
# the index ID. The index ID can be used
# to retrieve the index aggregate, which
# gives the page ID, and then the page ID
# can be used to retrieve the page aggregate.
index_id = Index.create_id(name="Erth")
assert index_id == index1.id
assert index1.ref == page.id
assert index1.name == page.name
# Later, the page name can be updated,
# and a new index created for the page.
page.update_name(name="Earth")
index1.update_ref(ref=None)
index2 = Index(name=page.name, ref=page.id)
# The new page name can be used to recreate
# the new index ID. The new index ID can be
# used to retrieve the new index aggregate,
# which gives the page ID, and then the page
# ID can be used to retrieve the renamed page.
index_id = Index.create_id(name="Earth")
assert index_id == index2.id
assert index2.ref == page.id
assert index2.name == page.name
Non-trivial command methods¶
Tn the examples above, the work of the command methods is “trivial”, in that the command method arguments are always used directly as the aggregate event attribute values. But often a command method needs to do some work before triggering an event. As a result of this processing, the event attributes may not be the same as the command method arguments. And the logic of the command may be such that under some conditions an event should not be triggered.
Any processing of method arguments that should be done only once, and not repeated when reconstructing aggregates from stored events, should be done in a undecorated command method. For example, if the triggered event will have a new UUID, you will either want to use a separate command method, or create this value in the expression used when calling the method, and not generate the UUID in the decorated method body. Otherwise, rather than being fixed in the stored state of the aggregate, a new UUID will be created each time the aggregate is reconstructed.
To illustrate this, consider the following Order
class. It is an ordinary
Python object class. Its __init__()
method takes a name
argument. The
method confirm()
sets the attribute confirmed_at
. The method
pickup()
checks that the order has been confirmed before calling
the _pickup()
method which sets the attribute pickedup_at
.
If the order has not been confirmed, an exception will be raised. That is,
whilst the confirm()
command method is trivial in that its arguments
are always used as the event attributes, the pickup()
method is non-trivial
in that it will only trigger an event if the order has been confirmed. That
means we can’t decorate the pickup()
method with the @event
decorator
without triggering an unwanted event.
class Order:
def __init__(self, name):
self.name = name
self.confirmed_at = None
self.pickedup_at = None
def confirm(self, at):
self.confirmed_at = at
def pickup(self, at):
if self.confirmed_at:
self._pickup(at)
else:
raise RuntimeError("Order is not confirmed")
def _pickup(self, at):
self.pickedup_at = at
This ordinary Python class can used in the usual way. We can construct a new instance of the class, and call its command methods.
# Start a new order, confirm, and pick up.
order = Order("my order")
try:
order.pickup(datetime.now())
except RuntimeError:
pass
else:
raise AssertionError("shouldn't get here")
order.confirm(datetime.now())
order.pickup(datetime.now())
This ordinary Python class can be easily converted into an event sourced aggregate
by applying the library’s @event
decorator to the
confirm()
and _pickup()
methods.
Because the command methods are decorated in this way, when the confirm()
method is called, an Order.Confirmed
event will be triggered. When the
_pickup()
method is called, an Order.PickedUp
event will be triggered.
Those event classes are defined automatically from the method signatures. The
decorating of the _pickup()
method and not of the pickup()
method is
a good example of a command method that needs to do some work before an event
is triggered. The body of the pickup()
method is only executed when the
command method is called, whereas the body of the _pickup()
method is
executed each time the event is applied to evolve the state of the aggregate.
class Order(Aggregate):
def __init__(self, name):
self.name = name
self.confirmed_at = None
self.pickedup_at = None
@event("Confirmed")
def confirm(self, at):
self.confirmed_at = at
def pickup(self, at):
if self.confirmed_at:
self._pickup(at)
else:
raise RuntimeError("Order is not confirmed")
@event("PickedUp")
def _pickup(self, at):
self.pickedup_at = at
We can use the event sourced Order
aggregate in the same way as the undecorated
ordinary Python Order
class. The event sourced version has the advantage
that using it will trigger a sequence of aggregate events that can be persisted in
a database and used in future to determine the state of the order.
order = Order("my order")
order.confirm(datetime.now())
order.pickup(datetime.now())
# Check the state of the order.
assert order.name == "my order"
assert isinstance(order.confirmed_at, datetime)
assert isinstance(order.pickedup_at, datetime)
assert order.pickedup_at > order.confirmed_at
# Check the triggered events determine the state of the order.
pending_events = order.collect_events()
copy = None
for e in pending_events:
copy = e.mutate(copy)
assert copy.name == order.name
assert copy.created_on == order.created_on
assert copy.modified_on == order.modified_on
assert copy.confirmed_at == order.confirmed_at
assert copy.pickedup_at == order.pickedup_at
Raising exceptions in the body of decorated methods¶
It is actually possible to decorate the pickup()
command method
with the @event
decorator, but if a decorated command method
has conditional logic that would mean the state of the aggregate
should not be evolved, you must take care to raise an exception
rather than returning early, and raise an exception before changing
the state of the aggregate at all. By raising an exception in the body
of a decorated method, the triggered event will not in fact be appended
to the aggregate’s list of pending events, and it will be as if it never
happened. However, the conditional expression will be perhaps needlessly
evaluated each time the aggregate is reconstructed from stored events. Of
course this conditional logic may be useful and considered as validation
of the projection of earlier events, for example checking the the Confirmed
event is working properly.
If you wish to use this style, just make sure to raise an exception rather than returning early, and make sure not to change the state of the aggregate if an exception may be raised later. Returning early will mean the event will be appended to the list of pending events. Changing the state before raising an exception will mean the state will differ from the recorded state. So if your method does change state and then raise an exception, make sure to obtain a fresh version of the aggregate before continuing to trigger events.
class Order(Aggregate):
def __init__(self, name):
self.name = name
self.confirmed_at = None
self.pickedup_at = None
@event("Confirmed")
def confirm(self, at):
self.confirmed_at = at
@event("PickedUp")
def pickup(self, at):
if self.confirmed_at:
self.pickedup_at = at
else:
raise RuntimeError("Order is not confirmed")
# Creating the aggregate causes one pending event.
order = Order("name")
assert len(order.pending_events) == 1
# Call pickup() too early raises an exception.
try:
order.pickup(datetime.now())
except RuntimeError:
pass
else:
raise Exception("Shouldn't get here")
# There is still only one pending event.
assert len(order.pending_events) == 1
Recording command arguments and reprocessing them each time the aggregate is reconstructed is perhaps best described as “command sourcing”.
In many cases, a command will do some work and trigger an aggregate event that has attributes that are different from the command, and in those cases it is necessary to have two different methods with different signatures: a command method that is not decorated and a decorated method that triggers and applies an aggregate event. This second method may arguably be well named by using a past participle rather than the imperative form.
The @aggregate
decorator¶
Just for fun, the library’s aggregate()
function can be
used to declare event sourced aggregate classes. This is equivalent to inheriting
from the library’s Aggregate
class. The created
event name can be defined using the created_event_name
argument of the decorator.
However, it is recommended to inherit from the Aggregate
class rather than using the @aggregate
decorator so that full the
Aggregate
class definition will be visible to your IDE.
from eventsourcing.domain import aggregate
@aggregate(created_event_name="Started")
class Order:
def __init__(self, name):
self.name = name
order = Order("my order")
pending_events = order.collect_events()
assert isinstance(pending_events[0], Order.Started)
Timestamp timezones¶
The timestamp values mentioned above are timezone-aware Python datetime
objects, created by calling datetime.now()
. By default, the timezone is set
to UTC, as defined by:data:timezone.utc in Python’s datetime
module. It
is generally recommended to store date-times as timezone-aware values with UTC as
the timezone, and then localize the values in the interface to the application,
according to the local timezone of a particular user. You can localize date-time
values by calling astimezone()
on a datetime
object, passing in
a tzinfo
object.
from datetime import timezone
try:
from zoneinfo import ZoneInfo
except ImportError:
from backports.zoneinfo import ZoneInfo
# Function to localize datetimes. See also the pytz module.
def localize(dt: datetime, tz: str):
return dt.astimezone(ZoneInfo(tz))
usa_user_timezone_setting = 'America/Los_Angeles'
# Summer time in LA.
domain_model_timestamp = datetime(2020, 6, 6, hour=12, tzinfo=timezone.utc)
local_timestamp = localize(domain_model_timestamp, usa_user_timezone_setting)
assert local_timestamp.hour == 5 # Seven hours behind.
# Winter time in LA.
domain_model_timestamp = datetime(2020, 1, 1, hour=12, tzinfo=timezone.utc)
local_timestamp = localize(domain_model_timestamp, usa_user_timezone_setting)
assert local_timestamp.hour == 4 # Daylight saving time.
china_user_timezone_setting = 'Asia/Shanghai'
# Summer time in Shanghai.
domain_model_timestamp = datetime(2020, 6, 6, hour=12, tzinfo=timezone.utc)
local_timestamp = localize(domain_model_timestamp, china_user_timezone_setting)
assert local_timestamp.hour == 20 # Eight hours ahead.
# Winter time in Shanghai.
domain_model_timestamp = datetime(2020, 1, 1, hour=12, tzinfo=timezone.utc)
local_timestamp = localize(domain_model_timestamp, china_user_timezone_setting)
assert local_timestamp.hour == 20 # No daylight saving time in China.
However, if necessary, this default can be changed by assigning a tzinfo
object to the TZINFO
attribute of the eventsourcing.domain
module. The
TZINFO
value can be configured using environment variables, by setting the
environment variable TZINFO_TOPIC
to a topic string that locates
a Python tzinfo
object in your code, for example a :data:timezone
with an offset
value, or a :data:ZoneInfo
from Python Standard Library with a
suitable key
. You need to set this environment variable before the
eventsourcing.domain
is imported, or otherwise assign to the TZINFO
attribute after that module has been imported. However, it is probably best to use
a timezone with a fixed offset from UTC, in which case you will probably still need
to convert to local time in the user interface. So it is strongly recommended to use
the default TZINFO
.
Please see the Python docs for
more information about timezones, in particular the need to install tzdata
on some systems. Please note, the zoneinfo
package is new in Python 3.9, so users
of earlier versions of Python may wish to install the backports.zoneinfo
package.
$ pip install 'backports.zoneinfo;python_version<"3.9"'
Initial version number¶
By default, the aggregates have an initial version number of 1
. Sometimes it may be
desired, or indeed necessary, to use a different initial version number.
In the example below, the initial version number of the class MyAggregate
is defined to be 0
.
class MyAggregate(Aggregate):
INITIAL_VERSION = 0
aggregate = MyAggregate()
assert aggregate.version == 0
If all aggregates in a domain model need to use the same non-default version number,
then a base class can be defined and used by the aggregates of the domain model on
which INITIAL_VERSION
is set to the preferred value. Some people may wish to set
the preferred value on the library’s Aggregate
class.
Topic strings¶
A ‘topic’ in this library is the path to a Python module (e.g. 'eventsourcing.domain'
)
optionally followed by the qualified name of an object in that module (e.g. 'Aggregate'
or 'Aggregate.Created'
), with these two parts joined with a colon character (':'
).
For example, 'eventsourcing.domain'
is the topic of the library’s domain module, and
'eventsourcing.domain:Aggregate'
is the topic of the Aggregate
class.
The library’s utils
module contains the functions
resolve_topic()
and get_topic()
which are used in the library to resolve a given topic to a Python object, and to
construct a topic for a given Python object.
Topics are used when serialising domain events, to create references to domain event class objects. Topic strings are also used in “created” events, to identify an aggregate class object. Topics are also used to identify infrastructure factory class objects, and in other places too, such as identifying the cipher and compressor classes to be used by an application, and to identify the timezone object to be used when creating timestamps.
from eventsourcing.utils import get_topic, resolve_topic
assert get_topic(Aggregate) == "eventsourcing.domain:Aggregate"
assert resolve_topic("eventsourcing.domain:Aggregate") == Aggregate
assert get_topic(Aggregate.Created) == "eventsourcing.domain:Aggregate.Created"
assert resolve_topic("eventsourcing.domain:Aggregate.Created") == Aggregate.Created
Registering old topics¶
The register_topic()
function
can be used to register an old topic for an object that has
been moved or renamed. When a class object is moved or renamed,
unless the old topic is registered, it will not be possible to
resolved an old topic. If an aggregate or event class is renamed,
it won’t be possible to reconstruct instances from previously stored
events, unless the old topic is registered for the renamed class.
This also supports nested classes. For example, by registering an old topic for a renamed aggregate, topics for nested classes that were created using the old enclosing name will resolve to the same nested class on the renamed enclosing class.
This will also work for renaming modules and packages. If a topic for an old module name is registered for a renamed module, topics for classes created under the old module name will resolve to the same classes in the renamed module. And if a topic for an old package name is registered for the renamed package, topics for classes created under the old package name will resolve to same classes in the same modules in the renamed package.
See the examples below.
from eventsourcing.utils import register_topic
class MyAggregate(Aggregate):
class Started(Aggregate.Created):
pass
# Current topics resolve.
assert get_topic(MyAggregate) == "__main__:MyAggregate"
assert resolve_topic("__main__:MyAggregate") == MyAggregate
assert resolve_topic("__main__:MyAggregate.Started") == MyAggregate.Started
# Aggregate class was renamed.
register_topic("__main__:OldName", MyAggregate)
assert resolve_topic("__main__:OldName") == MyAggregate
assert resolve_topic("__main__:OldName.Started") == MyAggregate.Started
# Nested event class was renamed.
register_topic("__main__:MyAggregate.Created", MyAggregate.Started)
assert resolve_topic("__main__:MyAggregate.Created") == MyAggregate.Started
# Aggregate class was moved from another module.
register_topic("eventsourcing.domain:MyAggregate", MyAggregate)
assert resolve_topic("eventsourcing.domain:MyAggregate") == MyAggregate
assert resolve_topic("eventsourcing.domain:MyAggregate.Created") == MyAggregate.Created
assert resolve_topic("eventsourcing.domain:Aggregate") == Aggregate
# Module was renamed.
import eventsourcing.domain
register_topic("eventsourcing.old", eventsourcing.domain)
assert resolve_topic("eventsourcing.old:Aggregate") == Aggregate
assert resolve_topic("eventsourcing.old:Aggregate.Created") == Aggregate.Created
# Package was renamed.
import eventsourcing
register_topic("old", eventsourcing)
assert resolve_topic("old.domain:Aggregate") == Aggregate
assert resolve_topic("old.domain:Aggregate.Created") == Aggregate.Created
# Current topics still resolve okay.
assert get_topic(MyAggregate) == "__main__:MyAggregate"
assert resolve_topic("__main__:MyAggregate") == MyAggregate
assert resolve_topic("__main__:MyAggregate.Started") == MyAggregate.Started
assert resolve_topic("eventsourcing.domain:Aggregate") == Aggregate
Versioning¶
Versioning allows aggregate and domain event classes to be modified after an application has been deployed.
On both aggregate and domain event classes, the class attribute class_version
can be used to indicate
the version of the class. This attribute is inferred to have a default value of 1
. If the data model is
changed, by adding or removing or renaming or changing the meaning of values of attributes, subsequent
versions should be given a successively higher number than the previously deployed version. Static methods
of the form upcast_vX_vY()
will be called to update the state of a stored aggregate event or snapshot
from a lower version X
to the next higher version Y
. Such upcast methods will be called to upcast
the state from the version of the class with which it was created to the version of the class which will
be reconstructed. For example, upcasting the stored state of an object created at version 2
of a
class that will be used to reconstruct an object at version 4
of the class will involve calling
upcast methods upcast_v2_v3()
, and upcast_v3_v4()
. If you aren’t using snapshots, you don’t
need to define upcast methods or version numbers on the aggregate class.
In the example below, version 1
of the class MyAggregate
is defined with an attribute a
.
class MyAggregate(Aggregate):
def __init__(self, a:str):
self.a = a
@classmethod
def create(cls, a:str):
return cls._create(cls.Created, id=uuid4(), a=a)
class Created(Aggregate.Created):
a: str
After an application that uses the above aggregate class has been deployed, its Created
events
will have been created and stored with the a
attribute defined. If subsequently the attribute b
is added to the definition of the Created
event, in order for the existing stored events to be
constructed in a way that satisfies the new version of the class, the stored events will need to be
upcast to have a value for b
. In the example below, the static method upcast_v1_v2()
defined
on the Created
event sets a default value for b
in the given state
. The class attribute
class_version
is set to 2
. The same treatment is given to the aggregate class as the domain
event class, so that snapshots can be upcast.
class MyAggregate(Aggregate):
def __init__(self, a:str, b:int):
self.a = a
self.b = b
@classmethod
def create(cls, a:str, b: int = 0):
return cls._create(cls.Created, id=uuid4(), a=a, b=b)
class Created(Aggregate.Created):
a: str
b: int
class_version = 2
@staticmethod
def upcast_v1_v2(state):
state["b"] = 0
class_version = 2
@staticmethod
def upcast_v1_v2(state):
state["b"] = 0
After an application that uses the above version 2 aggregate class has been deployed, its Created
events will have be created and stored with both the a
and b
attributes. If subsequently the
attribute c
is added to the definition of the Created
event, in order for the existing stored
events from version 1 to be constructed in a way that satisfies the new version of the class, they
will need to be upcast to include a value for b
and c
. The existing stored events from version 2
will need to be upcast to include a value for c
. The additional static method upcast_v2_v3()
defined on the Created
event sets a default value for c
in the given state
. The class attribute
class_version
is set to 3
. The same treatment is given to the aggregate class as the domain event
class, so that any snapshots will be upcast.
class MyAggregate(Aggregate):
def __init__(self, a:str, b:int, c:float):
self.a = a
self.b = b
self.c = c
@classmethod
def create(cls, a:str, b: int = 0, c: float = 0.0):
return cls._create(cls.Created, id=uuid4(), a=a, b=b, c=c)
class Created(Aggregate.Created):
a: str
b: int
c: float
class_version = 3
@staticmethod
def upcast_v1_v2(state):
state["b"] = 0
@staticmethod
def upcast_v2_v3(state):
state["c"] = 0.0
class_version = 3
@staticmethod
def upcast_v1_v2(state):
state["b"] = 0
@staticmethod
def upcast_v2_v3(state):
state["c"] = 0.0
If subsequently a new event is added that manipulates a new attribute that is expected to be initialised
when the aggregate is created, in order that snapshots from earlier version will be upcast, the aggregate
class attribute class_version
will need to be set to 4
and a static method upcast_v3_v4()
defined on the aggregate class which upcasts the state of a previously created snapshot. In the example
below, the new attribute d
is initialised in the __init__()
method, and a domain event which
updates d
is defined. Since the Created
event class has not changed, it remains at version 3
.
class MyAggregate(Aggregate):
def __init__(self, a:str, b:int, c:float):
self.a = a
self.b = b
self.c = c
self.d = False
@classmethod
def create(cls, a:str, b: int = 0, c: float = 0.0):
return cls._create(cls.Created, id=uuid4(), a=a, b=b, c=c)
class Created(Aggregate.Created):
a: str
b: int
c: float
class_version = 3
@staticmethod
def upcast_v1_v2(state):
state["b"] = 0
@staticmethod
def upcast_v2_v3(state):
state["c"] = 0.0
def set_d(self, d: bool):
self.trigger_event(self.DUpdated, d=d)
class DUpdated(Aggregate.Event):
d: bool
def apply(self, aggregate: "Aggregate") -> None:
aggregate.d = self.d
class_version = 4
@staticmethod
def upcast_v1_v2(state):
state["b"] = 0
@staticmethod
def upcast_v2_v3(state):
state["c"] = 0.0
@staticmethod
def upcast_v3_v4(state):
state["d"] = False
If the value objects used by your events also change, you may also need to define new transcodings
with new names. Simply register the new transcodings after the old, and use a modified name
value
for the transcoding. In this way, the existing encoded values will be decoded by the old transcoding,
and the new instances of the value object class will be encoded with the new version of the transcoding.
In order to support forward compatibility as well as backward compatibility, so that consumers designed for old versions will not be broken by modifications, it is advisable to restrict changes to existing types to be additions only, so that existing attributes are unchanged. If existing aspects need to be changed, for example by renaming or removing an attribute of an event, then it is advisable to define a new type. This approach depends on consumers overlooking or ignoring new attribute and new types, but they may effectively be broken anyway by such changes if they no longer see any data.
Including model changes in the domain events may help to inform consumers of changes to the model schema, and may allow the domain model itself to be validated, so that classes are marked with new versions if the attributes have changed. This may be addressed by a future version of this library. Considering model code changes as a sequence of immutable events brings the state of the domain model code itself into the same form of event-oriented consideration as the consideration of the state an application as a sequence of events.
Snapshots¶
Snapshots speed up aggregate access time, by avoiding the need to retrieve
and apply all the domain events when reconstructing an aggregate object instance.
The library’s Snapshot
class can be used to create
and restore snapshots of aggregate object instances. See Snapshotting
in the application module documentation for more information about taking snapshots
in an event-sourced application.
The Snapshot
class is defined as
a subclass of the domain event base class DomainEvent
.
It is defined as a frozen data class and extends the base class with attributes
topic
and state
, which hold the topic of an aggregate object class and
the current state of an aggregate object.
from eventsourcing.domain import Snapshot
The class method take()
can be used to
create a snapshot of an aggregate object.
snapshot = Snapshot.take(world)
assert isinstance(snapshot, Snapshot)
assert snapshot.originator_id == world.id
assert snapshot.originator_version == world.version
assert snapshot.topic == "__main__:World", snapshot.topic
assert snapshot.state["history"] == world.history
assert snapshot.state["_created_on"] == world.created_on
assert snapshot.state["_modified_on"] == world.modified_on
assert len(snapshot.state) == 3
A snapshot’s mutate()
method can be used to reconstruct its
aggregate object instance.
copy = snapshot.mutate(None)
assert isinstance(copy, World)
assert copy.id == world.id
assert copy.version == world.version
assert copy.created_on == world.created_on
assert copy.modified_on == world.modified_on
assert copy.history == world.history
The signature of the mutate()
method is the same as the
domain event object method of the same name, so that when reconstructing an aggregate, a list
that starts with a snapshot and continues with the subsequent domain event objects can be
treated in the same way as a list of all the domain event objects of an aggregate.
This similarity is needed by the application repository, since
some specialist event stores (e.g. AxonDB) return a snapshot as the first domain event.
Notes¶
Why put methods on event objects? There has been much discussion about the best way
to define the aggregate projectors. Of course, there isn’t a “correct” way of doing it,
and alternatives are possible. The reason for settling on the style presented most
prominently in this documentation, where each aggregate event has a method that
defines how it will apply to the aggregate (neatly wrapped up with the
declarative syntax) is the practical reason that it keeps the
parts of projection closest to the event class to which they pertain, and experience has shown
that this makes the domain model core easier to develop and understand. In the first version of
this library, the classical approach to writing domain models described in Martin
Fowler’s book Patterns of Enterprise Application Architecture was followed, so that
there was a repository object for each class of domain object (or aggregate). If the
type of aggregate is known when the domain object is requested, that is because each
repository was constructed for that type, and so we can understand that these
repositories can be constructed with a particular mutator function that will
function as the aggregate projector for that type. When stored domain events are
retried, this particular mutator function can be called. As the mutator function
will be coded for a particular aggregate, the aggregate class can be coded into
the function to handle the initial event. However, it became cumbersome to
define a new repository each time we define a new aggregate class. And anyway this
classical design for many repositories was, more or less, an encapsulation of the
database tables which were required to support the different types of domain object.
But in an event-sourced application, all the aggregate events are stored in the same
stored events table, and so it suffices to have one repository to encapsulate this
table in an event-sourced application. But the issue which arises is that a single
repository for all aggregates can’t know which type of aggregate is being requested
when an aggregate is requested by ID. Of course, it would be possible to pass in the
mutator function when requesting an aggregate, or for the repository more simply to
return a list of aggregate events rather than an aggregate. But if we want a single
repository, and we want it to provide the “dictionary-like interface” that is described
in Patterns of Enterprise Application Architecture, so that we give an ID and get an
aggregate, then the issue of identifying the aggregate type remains. To resolve this
issue, we can return to the “headline” definition of event sourcing, which is that
the state is determined by a sequence of events. The events can function to determine
the state of the aggregate by having a aggregate “topic” on the initial event, and by
defining methods on the event classes, methods that project the state of the events into
the state of the aggregate. These methods are as close as possible to the definition of the
event data, and when developing the code we avoid having to scroll around the
to see how an event is applied to an aggregate. Indeed, Martin Fowler’s 2005
event sourcing article
has a UML diagram which shows exactly this design, with a method called ‘process’
on the ShippingEvent class in the section How It Works. A further consideration,
and perhaps criticism of this style, is that this style of writing
projections is special, and other projections will have to be written in a different way.
This is true, but in practice, aggregate events are used overwhelmingly often to reconstruct
the state of aggregates, and in many applications that is the only way they will
be projected. And so, since there is an advantage to coding the aggregate projector
on the aggregate events, that is the way we settled on coding these things. The initial
impetus for coding things this way came from users of the library. The library
documentation was initially written to suggest using a separate mutator function.
However, that isn’t the end of the story. There are three outstanding unsettled feelings.
Firstly, by coding the aggregate project on methods of the aggregate, and passing
in the aggregate to this method, there is a reversal of intuition in these methods,
where self is the thing that has the data and doesn’t change, and changes are
made to a method argument. Secondly, a tension arises between either rudely accessing the
private members of the aggregate, or expanding its public interface that would
be much better restricted to being only an expression of support for the application.
Thirdly, in the process of defining an event class, passing in the arguments to
trigger an event, and then using the event attributes to mutate the aggregate, we
have to say everything three times. At first, we can enjoy being explicit about
everything. But after some time, the desire grows to find a way to say things once.
These three feelings, of an intuitive reversal of setting values from self onto a
method argument, of accessing private members or expanding the public interface, and
of saying everything three times, were resolved by the introduction of the @event
decorator as a more declarative syntax. One criticism
of the declarative syntax design is that it is “command sourcing” and not “event sourcing”,
because it is the command method arguments that are being used as the attributes of the
event. If may sometimes be “command sourcing” but then it is certainly also event
sourcing. Applications exist to support a domain, and in many cases applications support
the domain by recording decisions that are made in the domain and received by the domain model.
The @event
decorator can also be used on “private” methods, methods that not part of the
aggregate’s “public” command and query interface that will be used by the application, called
by “public” commands which are not so decorated, so do not trigger events that are simply
comprised of the method arguments, so then there is event sourcing” but not command
sourcing. The important thing here is to separate out the work of the command if
indeed there is any work (work that should happen only once) from the definition of
and construction of an aggregate event object, and from the projection
of the aggregate event into the aggregate state (which will happen many times).
Why mutate the aggregate state? Another question that may arise is about mutating the aggregate object, rather than reconstructing a copy of the aggregate that combines the old unchanged parts and the new. It is certainly possible to adjust the event classes so that the original is unchanged. There are a few good reasons for doing this. The issue is about concurrent access to aggregates that are cached in memory. This library follows the traditional patterns of having commands which do not return values and of mutating aggregate objects in-place. The issue of concurrent access doesn’t arise unless the aggregates are cached and used concurrently without any concurrency controls, such as serialisation of access. Aggregates aren’t cached, by default, and so the issue doesn’t arise unless they are. And because it is in the nature of aggregates that they create a sequence of events, there is no value to allowing concurrent write access. So if aggregates are to be cached then it would make sense either to implement locking, so that readers don’t access a half-updated state and so that writers don’t interact, or for writers to make a deep copy of a cached aggregate before calling aggregate commands, and then updating the cache with their mutated version after the aggregate has been successfully saved. And care would need to be taken to fast-forward an aggregate that is stale because another process has advanced the state of the aggregate. But none of these issues arise in the library because aggregates are not cached.
Another issue arises about the use of “absolute” topics to identify classes. The issue of moving and renaming classes can be resolved by setting the old paths to point to current classes, using the library’s methods for doing this.
Classes¶
- class eventsourcing.domain.MetaDomainEvent(name: str, bases: Tuple[type, ...], cls_dict: Dict[str, Any])[source]¶
Bases:
abc.ABCMeta
- static __new__(mcs, name: str, bases: Tuple[type, ...], cls_dict: Dict[str, Any]) eventsourcing.domain.MetaDomainEvent [source]¶
- class eventsourcing.domain.DomainEvent(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime)[source]¶
Bases:
abc.ABC
,Generic
[eventsourcing.domain.T
]Base class for domain events, such as aggregate
AggregateEvent
and aggregateSnapshot
.- mutate(aggregate: Optional[eventsourcing.domain.T]) Optional[eventsourcing.domain.T] [source]¶
Abstract mutator method.
- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.domain.AggregateEvent(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime)[source]¶
Bases:
eventsourcing.domain.DomainEvent
[eventsourcing.domain.TAggregate
]Base class for aggregate events. Subclasses will model decisions made by the domain model aggregates.
- mutate(aggregate: Optional[eventsourcing.domain.TAggregate]) Optional[eventsourcing.domain.TAggregate] [source]¶
Changes the state of the aggregate according to domain event attributes.
- apply(aggregate: eventsourcing.domain.TAggregate) None [source]¶
Applies the domain event to the aggregate.
- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.domain.AggregateCreated(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime, originator_topic: str)[source]¶
Bases:
eventsourcing.domain.AggregateEvent
[eventsourcing.domain.TAggregate
]Domain event for when aggregate is created.
Constructor arguments:
- Parameters
originator_id (UUID) – ID of originating aggregate.
originator_version (int) – version of originating aggregate.
timestamp (datetime) – date-time of the event
originator_topic (str) – topic for the aggregate class
- mutate(aggregate: Optional[eventsourcing.domain.TAggregate]) Optional[eventsourcing.domain.TAggregate] [source]¶
Constructs aggregate instance defined by domain event object attributes.
- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime, originator_topic: str) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- eventsourcing.domain.event(arg: Optional[Union[str, Type[eventsourcing.domain.AggregateEvent[Any]]]] = None) Callable[[eventsourcing.domain.DecoratedObjType], eventsourcing.domain.DecoratedObjType] [source]¶
- eventsourcing.domain.event(arg: eventsourcing.domain.DecoratedObjType) eventsourcing.domain.DecoratedObjType
Can be used to decorate an aggregate method so that when the method is called an event is triggered. The body of the method will be used to apply the event to the aggregate, both when the event is triggered and when the aggregate is reconstructed from stored events.
class MyAggregate(Aggregate): @event("NameChanged") def set_name(self, name: str): self.name = name
…is equivalent to…
class MyAggregate(Aggregate): def set_name(self, name: str): self.trigger_event(self.NameChanged, name=name) class NameChanged(Aggregate.Event): name: str def apply(self, aggregate): aggregate.name = self.name
In the example above, the event “NameChanged” is defined automatically by inspecting the signature of the set_name() method. If it is preferred to declare the event class explicitly, for example to define upcasting of old events, the event class itself can be mentioned in the event decorator rather than just providing the name of the event as a string.
class MyAggregate(Aggregate): class NameChanged(Aggregate.Event): name: str @event(NameChanged) def set_name(self, name: str): aggregate.name = self.name
- eventsourcing.domain.triggers(arg: Union[str, Type[eventsourcing.domain.AggregateEvent[Any]], None, eventsourcing.domain.DecoratedObjType] = None) Union[Callable[[eventsourcing.domain.DecoratedObjType], eventsourcing.domain.DecoratedObjType], eventsourcing.domain.DecoratedObjType] ¶
Can be used to decorate an aggregate method so that when the method is called an event is triggered. The body of the method will be used to apply the event to the aggregate, both when the event is triggered and when the aggregate is reconstructed from stored events.
class MyAggregate(Aggregate): @event("NameChanged") def set_name(self, name: str): self.name = name
…is equivalent to…
class MyAggregate(Aggregate): def set_name(self, name: str): self.trigger_event(self.NameChanged, name=name) class NameChanged(Aggregate.Event): name: str def apply(self, aggregate): aggregate.name = self.name
In the example above, the event “NameChanged” is defined automatically by inspecting the signature of the set_name() method. If it is preferred to declare the event class explicitly, for example to define upcasting of old events, the event class itself can be mentioned in the event decorator rather than just providing the name of the event as a string.
class MyAggregate(Aggregate): class NameChanged(Aggregate.Event): name: str @event(NameChanged) def set_name(self, name: str): aggregate.name = self.name
- class eventsourcing.domain.UnboundCommandMethodDecorator(event_decorator: eventsourcing.domain.CommandMethodDecorator)[source]¶
Bases:
object
Wraps an EventDecorator instance when attribute is accessed on an aggregate class.
- class eventsourcing.domain.BoundCommandMethodDecorator(event_decorator: eventsourcing.domain.CommandMethodDecorator, aggregate: eventsourcing.domain.TAggregate)[source]¶
Bases:
object
Wraps an EventDecorator instance when attribute is accessed on an aggregate so that the aggregate methods can be accessed.
- class eventsourcing.domain.DecoratedEvent(*args, **kwds)[source]¶
Bases:
eventsourcing.domain.AggregateEvent
[Any
]- apply(aggregate: eventsourcing.domain.TAggregate) None [source]¶
Applies event to aggregate by calling method decorated by @event.
- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.domain.MetaAggregate(*args: Any, **kwargs: Any)[source]¶
Bases:
abc.ABCMeta
- class Event(*args, **kwds)[source]¶
Bases:
eventsourcing.domain.AggregateEvent
[eventsourcing.domain.TAggregate
]- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class Created(*args, **kwds)[source]¶
Bases:
eventsourcing.domain.MetaAggregate.Event
[eventsourcing.domain.TAggregate
],eventsourcing.domain.AggregateCreated
[eventsourcing.domain.TAggregate
]- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime, originator_topic: str) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.domain.Aggregate(*args: Any, **kwargs: Any)[source]¶
Bases:
abc.ABC
Base class for aggregate roots.
- __base_init__(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime) None [source]¶
Initialises an aggregate object with an
id
, aversion
number, and atimestamp
.
- property id: uuid.UUID¶
The ID of the aggregate.
- property version: int¶
The version number of the aggregate.
- __hash__ = None¶
- __init__() None ¶
- property created_on: datetime.datetime¶
The date and time when the aggregate was created.
- property modified_on: datetime.datetime¶
The date and time when the aggregate was last modified.
- property pending_events: List[eventsourcing.domain.AggregateEvent[Any]]¶
A list of pending events.
- class Event(*args, **kwds)[source]¶
Bases:
eventsourcing.domain.AggregateEvent
[eventsourcing.domain.TAggregate
]- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class Created(*args, **kwds)[source]¶
Bases:
eventsourcing.domain.Aggregate.Event
[eventsourcing.domain.TAggregate
],eventsourcing.domain.AggregateCreated
[eventsourcing.domain.TAggregate
]- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime, originator_topic: str) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- trigger_event(event_class: Type[eventsourcing.domain.AggregateEvent[Any]], **kwargs: Any) None [source]¶
Triggers domain event of given type, by creating an event object and using it to mutate the aggregate.
- collect_events() List[eventsourcing.domain.AggregateEvent[Any]] [source]¶
Collects and returns a list of pending aggregate
AggregateEvent
objects.
- eventsourcing.domain.aggregate(cls: Optional[Any] = None, *, created_event_name: Optional[str] = None) Union[Type[eventsourcing.domain.Aggregate], Callable[[Any], Type[eventsourcing.domain.Aggregate]]] [source]¶
Converts the class that was passed in to inherit from Aggregate.
@aggregate class MyAggregate: pass
…is equivalent to…
class MyAggregate(Aggregate): pass
- exception eventsourcing.domain.OriginatorIDError[source]¶
Bases:
Exception
Raised when a domain event can’t be applied to an aggregate due to an ID mismatch indicating the domain event is not in the aggregate’s sequence of events.
- exception eventsourcing.domain.OriginatorVersionError[source]¶
Bases:
Exception
Raised when a domain event can’t be applied to an aggregate due to version mismatch indicating the domain event is not the next in the aggregate’s sequence of events.
- exception eventsourcing.domain.VersionError[source]¶
Bases:
eventsourcing.domain.OriginatorVersionError
Old name for ‘OriginatorVersionError’.
This class exists to maintain backwards-compatibility but will be removed in a future version Please use ‘OriginatorVersionError’ instead.
- class eventsourcing.domain.Snapshot(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime, topic: str, state: Dict[str, Any])[source]¶
Bases:
eventsourcing.domain.DomainEvent
[eventsourcing.domain.TAggregate
]Snapshots represent the state of an aggregate at a particular version.
Constructor arguments:
- Parameters
originator_id (UUID) – ID of originating aggregate.
originator_version (int) – version of originating aggregate.
timestamp (datetime) – date-time of the event
topic (str) – string that includes a class and its module
state (dict) – version of originating aggregate.
- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: uuid.UUID, originator_version: int, timestamp: datetime.datetime, topic: str, state: Dict[str, Any]) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- classmethod take(aggregate: eventsourcing.domain.TAggregate) eventsourcing.domain.Snapshot[eventsourcing.domain.TAggregate] [source]¶
Creates a snapshot of the given
Aggregate
object.
- eventsourcing.utils.random() x in the interval [0, 1). ¶
- exception eventsourcing.utils.TopicError[source]¶
Bases:
Exception
Raised when topic doesn’t resolve.
- eventsourcing.utils.get_topic(cls: type) str [source]¶
Returns a “topic string” that locates the given class in its module. The string is formed by joining the module name and the class qualname separated by the colon character.
- eventsourcing.utils.resolve_topic(topic: str) Any [source]¶
Returns an object located by the given topic.
This function can be (is) used to locate domain event classes and aggregate classes from the topics in stored events and snapshots. It can also be used to locate compression modules, timezone objects, etc.
- eventsourcing.utils.register_topic(topic: str, obj: Any) None [source]¶
Registers a topic with an object, so the object will be returned whenever the topic is resolved.
This function can be used to cache the topic of a class, so that the topic can be resolved faster. It can also be used to register old topics for objects that have been renamed or moved, so that old topics will resolve to the renamed or moved object.
- eventsourcing.utils.retry(exc: Union[Type[Exception], Sequence[Type[Exception]]] = <class 'Exception'>, max_attempts: int = 1, wait: float = 0, stall: float = 0) Callable[[Any], Any] [source]¶
Retry decorator.
- Parameters
exc – List of exceptions that will cause the call to be retried if raised.
max_attempts – Maximum number of attempts to try.
wait – Amount of time to wait before retrying after an exception.
stall – Amount of time to wait before the first attempt.
verbose – If True, prints a message to STDOUT when retries occur.
- Returns
Returns the value returned by decorated function.
application
— Applications¶
This module helps with developing event-sourced applications.
An event-sourced application object has command and query methods used by clients to interact with its domain model. An application object has an event-sourced repository used to obtain already existing event-sourced aggregates. It also has a notification log that is used to propagate the state of the application as a sequence of domain event notifications.
Domain-driven design¶
The book Domain-Driven Design describes a “layered architecture” with four layers: interface, application, domain, and infrastructure. The application layer depends on the domain and infrastructure layers. The interface layer depends on the application layer.
Generally speaking, the application layer implements commands which change the state of the application, and queries which present the state of the application. The commands and queries (“application services”) are called from the interface layer. By keeping the application and domain logic in the application and domain layers, different interfaces can be developed for different technologies without duplicating application and domain logic.
The discussion below continues these ideas, by combining event-sourced aggregates and persistence objects in an application object that implements “application services” as object methods.
Application objects¶
In general, an application combines a stand-alone domain model and infrastructure that persists the state of that model. In this library, an event-sourced application object combines an event-sourced domain model with a cohesive mechanism for storing and retrieving domain events.
The library’s Application
class brings together objects
from the domain and persistence modules.
It can be used as it is, or subclassed to develop domain-specific event-sourced applications.
The general idea is to name your application object class after the domain supported by its domain model, or after the “bounded context” supporting a “subdomain” if you are doing DDD. And then define command and query methods that allow clients to (let’s say) create, read, update and delete your event-sourced domain model aggregates. Domain model aggregates are discussed in the domain module documentation. The “ubiquitous language” of your project should guide the names of the application’s command and query methods, along with those of its domain model aggregates.
The main features of an application are:
the command and query methods you define, which implement your “application services”;
the
save()
method, used for collecting and recording new aggregate events;the
repository
object, with which aggregates are reconstructed;the notification
log
object, from which the state of the application can be propagated;the
take_snapshot()
method;the application’s
env
, through which the application can be configured.
The Application
class defines an object method
save()
which can be
used to update the recorded state of one or many
domain model aggregates. The
save()
method works by using
the aggregate’s collect_events()
method to collect
pending domain events; the pending domain events are stored by calling the
put()
method of application’s
event store.
The Application
class defines an
object attribute repository
which holds an event-sourced repository.
The repository’s get()
method can be used by
your application’s command and query methods to obtain already existing aggregates.
The Application
class defines an
object attribute log
which holds a local notification log.
The notification log can be used to propagate the state of an application as a sequence of
domain event notifications.
The Application
class defines an object method
take_snapshot()
which can
be used for snapshotting existing aggregates. Snapshotting
isn’t necessary, but can help to reduce the time it takes to access aggregates with
lots of domain events.
The Application
class has an env
attribute
which can be redefined on your application classes. Application objects also have
an env
attribute which is determined by a combination of the application class
attribute, the operating system environment, and by an optional constructor argument.
Basic example¶
In the example below, the Worlds
application extends the library’s
application object base class. The World
aggregate is defined and discussed
as the Simple example in the domain module documentation.
from typing import List
from uuid import UUID
from eventsourcing.application import Application
class Worlds(Application):
def create_world(self) -> UUID:
world = World.create()
self.save(world)
return world.id
def make_it_so(self, world_id: UUID, what: str):
world = self.repository.get(world_id)
world.make_it_so(what)
self.save(world)
def get_world_history(self, world_id: UUID) -> List[str]:
world = self.repository.get(world_id)
return list(world.history)
The create_world()
method is defined as a command method
that creates and saves new World
aggregates, returning a new world_id
that can be
used to identify the aggregate on subsequence method calls. It saves the new
aggregate by calling the base class save()
method.
The make_it_so()
method is also defined as a command method.
It works by obtaining an existing World
aggregate from the repository. Then it calls the
aggregate’s command method make_it_so()
, and then saves the aggregate by calling the
application’s save()
method.
The application’s get_world_history()
method is defined as a query method
that presents the current history
of an existing World
aggregate. It works by
calling the repository get()
method to
reconstruct a World
aggregate object from previously saved aggregate events, and
then returns the value of its history
attribute.
Having define an application object, we can use it. Below, an instance of the
Worlds
application is constructed. A new World
aggregate is created
by calling create_world()
. Three items are added to its history
by
calling make_it_so()
three times. The recorded history
of the aggregate
is then obtained by calling get_world_history()
method.
application = Worlds()
world_id = application.create_world()
application.make_it_so(world_id, "dinosaurs")
application.make_it_so(world_id, "trucks")
application.make_it_so(world_id, "internet")
history = application.get_world_history(world_id)
assert history[0] == "dinosaurs"
assert history[1] == "trucks"
assert history[2] == "internet"
In the example above, the application object uses the library’s “plain old Python objects” infrastructure, which keeps stored event objects in memory. This is the default for all application objects. To store the domain events in a real database, simply configure persistence by setting environment variables for the application.
Application environment¶
An application can be configured using environment variables. You
can set the application’s environment either on the env
attribute of the application class, in the
operating system environment,
or by passing them into the application using the constructor argument env
. You
can use all three ways for configuring an application in combination.
# Configure by setting class attribute.
class MyApplication(Application):
env = {"SETTING_A": "1", "SETTING_B": "1", "SETTING_C": "1"}
# Configure by setting operating system environment.
os.environ["SETTING_B"] = "2"
os.environ["SETTING_C"] = "2"
# Configure by setting constructor argument.
app = MyApplication(env={"SETTING_C": "3"})
The order of precedence is: constructor argument, operating system, class attribute. This means a constructor argument setting will override both a operating system, and a class attribute setting. And an operating system setting will override a class attribute setting.
assert app.env["SETTING_A"] == "1"
assert app.env["SETTING_B"] == "2"
assert app.env["SETTING_C"] == "3"
The resulting settings can be seen on the env
attribute of the application object.
In the example above, we can see that the settings from the construct argument have
overridden the settings from the operating system environment, and the settings from
the operating system environment have overridden the settings from the class attribute.
Repository¶
The application repository
is used to get the already existing aggregates of the
application’s domain model. It is an instance of the library’s
Repository
class.
The repository’s get()
method is used to
obtain already existing aggregates. It works by calling the
get()
method of an
event store to retrieve already existing
domain event objects for the requested
aggregate, and then using these to reconstruct an aggregate object.
world_latest = application.repository.get(world_id)
assert len(world_latest.history) == 3
assert world_latest.version == 4
The repository get()
method accepts
three arguments: aggregate_id
, version
, and projector_func
.
The aggregate_id
argument is required, and should be the ID of an already existing
aggregate. If the aggregate is not found, the exception
AggregateNotFound
will be raised.
The version
argument is optional, and represents the required version of the aggregate.
If the requested version is greater than the highest available version of the aggregate, the
highest available version of the aggregate will be returned.
world_v1 = application.repository.get(world_id, version=1)
assert world_v1.version == 1
assert len(world_v1.history) == 0
world_v2 = application.repository.get(world_id, version=2)
assert world_v2.version == 2
assert len(world_v2.history) == 1
assert world_v2.history[-1] == "dinosaurs"
world_v3 = application.repository.get(world_id, version=3)
assert world_v3.version == 3
assert len(world_v3.history) == 2
assert world_v3.history[-1] == "trucks"
world_v4 = application.repository.get(world_id, version=4)
assert world_v4.version == 4
assert len(world_v4.history) == 3
assert world_v4.history[-1] == "internet"
world_v5 = application.repository.get(world_id, version=5)
assert world_v5.version == 4 # There is no version 5.
assert len(world_v5.history) == 3
assert world_v5.history[-1] == "internet"
The projector_func
argument is also optional, and can be used to pass in an alternative
“mutator function” that will be used as the “aggregate projector” to reconstruct
the current state of the aggregate from stored snapshots and domain events.
By default, the repository will use the mutate()
methods of domain event objects to reconstruct the state of the requested aggregate.
Notification log¶
A notification log can be used to propagate the state of an application as a
sequence of domain event notifications. The library’s
LocalNotificationLog
class presents
the event notifications of an application.
The application object attribute log
is an instance of
LocalNotificationLog
.
from eventsourcing.application import LocalNotificationLog
assert isinstance(application.log, LocalNotificationLog)
This class implements an abstract base class NotificationLog
which defines method signatures needed by the NotificationLogReader
class, so that a reader can get event notifications from both “local” and “remote” notification
logs. The event notifications themselves are instances of the library’s
Notification
class, which is
defined in the persistence module and discussed in the
Notification objects section.
from eventsourcing.persistence import Notification
The general idea is that, whilst the aggregate events are recorded in a sequence for
the aggregate, all of the aggregate events are also given a “total order” by being
placed in a sequence of event notifications. When recording aggregate events using an
“application recorder”, which is the default for the
Application
class, event notifications are
atomically and automatically recorded along with the stored events.
Outbox pattern¶
The “notification log pattern” is also referred to as the “outbox pattern”. The general idea is to avoid “dual writing” when recording updates to application state and then messaging those updates to others. The important thing is that either both the stored events and the notifications are recorded together, or nothing is recorded. Unless these two things are recorded atomically, in the same database transaction, it is impossible to exclude the possibility that one will happen but not the other, potentially causing a permanent and perhaps catastrophic inconsistency in the state of downstream applications. But it is equally important to avoid “dual writing” in the consumption of event notifications, which is why “tracking” records need to be written atomically when recording the result of processing event notifications. This is a topic we shall return to when discussing the system module.
Selecting notifications from the log¶
The select()
method of a
notification log can be used to directly select a subsequence of event notifications
from a notification log.
The start
and limit
arguments are used to specify the selection. The
selection of event notifications will have notification IDs which are greater
or equal to the given value of start
. Please note, by default, the first
recorded event notification will have ID equal to 1
. The selection will
contain no more than the specified limit
.
In the example below, the first three event notifications are selected from the
notification log of the application
object. We can see these notifications
represent the facts that a World
aggregate was created, and then two SomethingHappened
events occurred.
notifications = application.log.select(start=1, limit=3)
assert len(notifications) == 3
assert isinstance(notifications[0], Notification)
assert notifications[0].id == 1
assert notifications[1].id == 2
assert notifications[2].id == 3
assert notifications[0].originator_id == world_id
assert notifications[1].originator_id == world_id
assert notifications[2].originator_id == world_id
assert notifications[0].originator_version == 1
assert notifications[1].originator_version == 2
assert notifications[2].originator_version == 3
assert "World.Created" in notifications[0].topic
assert "World.SomethingHappened" in notifications[1].topic
assert "World.SomethingHappened" in notifications[2].topic
assert b"dinosaurs" in notifications[1].state
assert b"trucks" in notifications[2].state
We can continue to select event notifications, by using the ID
of the last event notification received to calculate the next
start
value.
next_start = notifications[-1].id + 1
notifications = application.log.select(start=next_start, limit=3)
assert len(notifications) == 1
assert notifications[0].originator_id == world_id
assert notifications[0].originator_version == 4
assert "World.SomethingHappened" in notifications[0].topic
assert b"internet" in notifications[0].state
This method is used by the NotificationLogReader
class,
which is used by the ProcessApplication
class to
pull event notifications from upstream applications in an event-driven system,
using the max_tracking_id()
method
of a ProcessRecorder
object to calculate the
next start value from which to pull unprocessed event notifications.
See the system and persistence
module documentation for more information.
Linked list of sections¶
The notification log also presents linked sections of
notification objects.
The sections are instances of the library’s Section
class.
Each event notification has an id
that has the unique integer ID of
the event notification. The event notifications are ordered by their IDs,
with later event notifications having higher values than earlier ones.
A notification log section is identified by a section ID string that comprises
two integers separated by a comma, for example "1,10"
. The first integer
specifies the notification ID of the first event notification included in the
section. The second integer specifies the notification ID of the second event
notification included in the section. Sections are requested from the notification
using the Python square bracket syntax, for example application.log["1,10"]
.
The notification log will return a section that has no more than the requested number of event notifications. Sometimes there will be less event notifications in the recorded sequence of event notifications than are needed to fill the section, in which case less than the number of event notifications will be included in the returned section. On the other hand, there may be gaps in the recorded sequence of event notifications, in which case the last event notification included in the section may have a notification ID that is greater than that which was specified in the requested section ID.
A notification log section has an attribute section_id
that has the section
ID. The section ID value will represent the event notification ID of the first
and the last event notification included in the section. If there are no event
notifications, the section ID will be None
.
A notification log section has an attribute items
that has the list of
notification objects included in the section.
A notification log section has an attribute next_id
that has the section ID
of the next section in the notification log. If the notification log section has
less event notifications that were requested, the next_id
value will be None
.
In the example above, there are four domain events in the domain model, and so there are four notifications in the notification log.
section = application.log["1,10"]
assert len(section.items) == 4
assert section.id == "1,4"
assert section.next_id is None
assert isinstance(section.items[0], Notification)
assert section.items[0].id == 1
assert section.items[1].id == 2
assert section.items[2].id == 3
assert section.items[3].id == 4
assert section.items[0].originator_id == world_id
assert section.items[1].originator_id == world_id
assert section.items[2].originator_id == world_id
assert section.items[3].originator_id == world_id
assert section.items[0].originator_version == 1
assert section.items[1].originator_version == 2
assert section.items[2].originator_version == 3
assert section.items[3].originator_version == 4
assert "World.Created" in section.items[0].topic
assert "World.SomethingHappened" in section.items[1].topic
assert "World.SomethingHappened" in section.items[2].topic
assert "World.SomethingHappened" in section.items[3].topic
assert b"dinosaurs" in section.items[1].state
assert b"trucks" in section.items[2].state
assert b"internet" in section.items[3].state
A domain event can be reconstructed from an event notification by calling the
application’s mapper method to_domain_event()
.
If the application is configured to encrypt stored events, the event notification
will also be encrypted, but the mapper will decrypt the event notification.
domain_event = application.mapper.to_domain_event(section.items[0])
assert isinstance(domain_event, World.Created)
assert domain_event.originator_id == world_id
domain_event = application.mapper.to_domain_event(section.items[3])
assert isinstance(domain_event, World.SomethingHappened)
assert domain_event.originator_id == world_id
assert domain_event.what == "internet"
Snapshotting¶
If the reconstruction of an aggregate depends on obtaining and replaying a relatively large number of domain event objects, it can take a relatively long time to reconstruct the aggregate. Snapshotting aggregates can help to reduce access time of aggregates with lots of domain events.
Snapshots are stored separately from the aggregate events. When snapshotting
is enabled, the application object will have a snapshot store assigned to the
attribute snapshots
. By default, snapshotting is not enabled, and the application
object’s snapshots
attribute will have the value None
.
assert application.snapshots is None
Enabling snapshotting¶
To enable snapshotting in application objects, the environment variable
IS_SNAPSHOTTING_ENABLED
may be set to a valid “true” value. The
function strtobool()
is used to interpret
the value of this environment variable, so that strings
"y"
, "yes"
, "t"
, "true"
, "on"
and "1"
are considered to
be “true” values, and "n"
, "no"
, "f"
, "false"
, "off"
and "0"
are considered to be “false” values, and other values are considered to be invalid.
The default is for an application’s snapshotting functionality to be not enabled.
Application environment variables can be passed into the application using the
env
argument when constructing an application object. Snapshotting can be
enabled (or disabled) for an individual application object in this way.
application = Worlds(env={"IS_SNAPSHOTTING_ENABLED": "y"})
assert application.snapshots is not None
Application environment variables can be also be set in the operating system environment. Setting operating system environment variables will affect all applications created in that environment.
os.environ["IS_SNAPSHOTTING_ENABLED"] = "y"
application = Worlds()
assert application.snapshots is not None
del os.environ["IS_SNAPSHOTTING_ENABLED"]
Values passed into the application object will override operating system environment variables.
os.environ["IS_SNAPSHOTTING_ENABLED"] = "y"
application = Worlds(env={"IS_SNAPSHOTTING_ENABLED": "n"})
assert application.snapshots is None
del os.environ["IS_SNAPSHOTTING_ENABLED"]
Snapshotting can also be enabled for all instances of an application class by setting the boolean attribute ‘is_snapshotting_enabled’ on the application class.
class WorldsWithSnapshottingEnabled(Worlds):
is_snapshotting_enabled = True
application = WorldsWithSnapshottingEnabled()
assert application.snapshots is not None
However, this setting will also be overridden by both the construct arg env
and by the operating system environment. The example below demonstrates this
by extending the Worlds
application class defined above.
application = WorldsWithSnapshottingEnabled(env={"IS_SNAPSHOTTING_ENABLED": "n"})
assert application.snapshots is None
os.environ["IS_SNAPSHOTTING_ENABLED"] = "n"
application = WorldsWithSnapshottingEnabled()
assert application.snapshots is None
del os.environ["IS_SNAPSHOTTING_ENABLED"]
Taking snapshots¶
The application method take_snapshot()
can be used to create a snapshot of the state of an aggregate. The ID of an aggregate
to be snapshotted must be passed when calling this method. By passing in the ID
(and optional version number), rather than an actual aggregate object, the risk of
snapshotting a somehow “corrupted” aggregate object that does not represent the
actually recorded state of the aggregate is avoided.
application = Worlds(env={"IS_SNAPSHOTTING_ENABLED": "y"})
world_id = application.create_world()
application.make_it_so(world_id, "dinosaurs")
application.make_it_so(world_id, "trucks")
application.make_it_so(world_id, "internet")
application.take_snapshot(world_id)
Snapshots are stored separately from the aggregate events, but snapshot objects are
implemented as a kind of domain event, and snapshotting uses the same mechanism
for storing snapshots as for storing aggregate events. When snapshotting is enabled,
the application object attribute snapshots
is an event store dedicated to storing
snapshots. The snapshots can be retrieved from the snapshot store using the
get()
method. We can get the latest snapshot
by selecting in descending order with a limit of 1.
snapshots = application.snapshots.get(world_id, desc=True, limit=1)
snapshots = list(snapshots)
assert len(snapshots) == 1, len(snapshots)
snapshot = snapshots[0]
assert snapshot.originator_id == world_id
assert snapshot.originator_version == 4
When snapshotting is enabled, the application repository looks for snapshots in this way. If a snapshot is found by the aggregate repository when retrieving an aggregate, then only the snapshot and subsequent aggregate events will be retrieved and used to reconstruct the state of the aggregate.
Automatic snapshotting¶
Automatic snapshotting of aggregates at regular intervals can be enabled
by setting the application class attribute ‘snapshotting_intervals’. The
‘snapshotting_intervals’ should be a mapping of aggregate classes to integers
which represent the snapshotting interval. When aggregates are saved, snapshots
will be taken if the version of aggregate coincides with the specified interval.
The example below demonstrates this by extending the Worlds
application class
with World
aggregates snapshotted every 2 events.
class WorldsWithAutomaticSnapshotting(Worlds):
snapshotting_intervals = {World: 2}
application = WorldsWithAutomaticSnapshotting()
world_id = application.create_world()
application.make_it_so(world_id, "dinosaurs")
application.make_it_so(world_id, "trucks")
application.make_it_so(world_id, "internet")
snapshots = application.snapshots.get(world_id)
snapshots = list(snapshots)
assert len(snapshots) == 2
assert snapshots[0].originator_id == world_id
assert snapshots[0].originator_version == 2
assert snapshots[1].originator_id == world_id
assert snapshots[1].originator_version == 4
In practice, a suitable interval would most likely be larger than 2. And ‘snapshotting_intervals’ would be defined on your application class and not a subclass.
Configuring persistence¶
The example above uses the application’s default persistence infrastructure. By default, application objects will be configured to use the library’s “plain old Python objects” infrastructure, and will store events in memory. This is good for development, because it is the fastest option. But the state of the application will be lost when the application object is deleted.
If you want the state of the application object to endure, you will need to
use alternative persistence infrastructure. To use alternative persistence
infrastructure, you will need to set the environment variable INFRASTRUCTURE_FACTORY
to the topic of an alternative infrastructure factory
class. Using alternative persistence infrastructure will may involve
setting further environment variables, perhaps to configure access to
a real database, such as a database name, a user name, and a password.
For example, to use the library’s SQLite infrastructure,
set INFRASTRUCTURE_FACTORY
to the value "eventsourcing.sqlite:Factory"
.
When using the library’s SQLite infrastructure, the environment variable
SQLITE_DBNAME
must also be set. This value will be passed to Python’s
sqlite3.connect()
.
from tempfile import NamedTemporaryFile
tmpfile = NamedTemporaryFile(suffix="_eventsourcing_test.db")
tmpfile.name
os.environ["INFRASTRUCTURE_FACTORY"] = "eventsourcing.sqlite:Factory"
os.environ["SQLITE_DBNAME"] = tmpfile.name
application = Worlds()
world_id = application.create_world()
application.make_it_so(world_id, "dinosaurs")
application.make_it_so(world_id, "trucks")
application.make_it_so(world_id, "internet")
By using a file on disk, as we did in the example above, the state of the application will endure after the application object has been deleted and reconstructed.
del(application)
application = Worlds()
history = application.get_world_history(world_id)
assert history[0] == "dinosaurs"
assert history[1] == "trucks"
assert history[2] == "internet"
See also PostgreSQL infrastructure.
Registering custom transcodings¶
The application serialises and deserialise domain events using a
transcoder object. By default, the application
will use the library’s default JSON transcoder. The library’s application
base class registers transcodings for UUID
, Decimal
,
and datetime
objects.
If your domain model uses types of object that are not already supported by the transcoder, then custom transcodings for these objects will need to be implemented and registered with the application’s transcoder.
The application method register_transcodings()
can be overridden or extended to register the transcodings required by your application.
For example, to define and register a Transcoding
for the Python date
class, define a class such as the
DateAsISO
class below, and extend the application
register_transcodings()
method by calling the super()
method with the given transcoder
argument, and then the transcoder’s register()
method once for each of your custom transcodings.
from datetime import date
from typing import Union
from eventsourcing.persistence import Transcoder, Transcoding
class MyApplication(Application):
def register_transcodings(self, transcoder: Transcoder):
super().register_transcodings(transcoder)
transcoder.register(DateAsISO())
class DateAsISO(Transcoding):
type = date
name = "date_iso"
def encode(self, o: date) -> str:
return o.isoformat()
def decode(self, d: Union[str, dict]) -> date:
assert isinstance(d, str)
return date.fromisoformat(d)
Encryption and compression¶
Application-level encryption is useful for encrypting the state of the application “on the wire” and “at rest”. Compression is useful for reducing transport time of domain events and domain event notifications across a network and for reducing the total size of recorded application state.
To enable encryption, set the environment variable CIPHER_TOPIC
to be the topic of a cipher class, and CIPHER_KEY
to be a valid encryption key. To enable compression, set the environment
variable COMPRESSOR_TOPIC
to be the topic of a
compressor class or module.
The library’s AESCipher
class can
be used to encrypt stored domain events. The Python zlib
module
can be used to compress stored domain events.
When using the library’s AESCipher
class,
you can use its static method create_key()
to generate a valid encryption key.
from eventsourcing.cipher import AESCipher
# Generate a cipher key (keep this safe).
cipher_key = AESCipher.create_key(num_bytes=32)
# Configure cipher key.
os.environ["CIPHER_KEY"] = cipher_key
# Configure cipher topic.
os.environ["CIPHER_TOPIC"] = "eventsourcing.cipher:AESCipher"
# Configure compressor topic.
os.environ["COMPRESSOR_TOPIC"] = "eventsourcing.compressor:ZlibCompressor"
Saving multiple aggregates¶
In some cases, it is necessary to save more than one aggregate in the same atomic transaction. The persistence modules included in this library all support atomic recording of aggregate events into more than one aggregate sequence. However, not all databases can support this, and so this isn’t allowed on the library extensions that adapt these databases.
The example below continues the example from the discussion of
namespaced IDs in the domain module documentation. The
aggregate classes Page
and Index
are defined in that section.
We can define a simple wiki application, which creates named pages. Pages can be retrieved by name. Names can be changed and the pages can be retrieved by the new name.
class Wiki(Application):
def create_page(self, name: str, body: str) -> None:
page = Page.create(name, body)
index = Index.create(page)
self.save(page, index)
def rename_page(self, name: str, new_name: str) -> None:
page = self.get_page(name)
page.update_name(new_name)
index = Index.create(page)
self.save(page, index)
return page.body
def get_page(self, name: str) -> Page:
index_id = Index.create_id(name)
index = self.repository.get(index_id)
page_id = index.ref
return self.repository.get(page_id)
Now let’s construct the application object and create a new page (with a deliberate spelling mistake).
wiki = Wiki()
wiki.create_page(name="Erth", body="Lorem ipsum...")
We can use the page name to retrieve the body of the page.
assert wiki.get_page(name="Erth").body == "Lorem ipsum..."
We can also update the name of the page, and then retrieve the page using the new name.
wiki.rename_page(name="Erth", new_name="Earth")
assert wiki.get_page(name="Earth").body == "Lorem ipsum..."
The uniqueness constraint on the recording of stored domain event objects combined with the atomicity of recording domain events means that name collisions in the index will result in the wiki not being updated.
from eventsourcing.persistence import RecordConflictError
# Can't create another page using an existing name.
try:
wiki.create_page(name="Earth", body="Neque porro quisquam...")
except RecordConflictError:
pass
else:
raise AssertionError("RecordConflictError not raised")
assert wiki.get_page(name="Earth").body == "Lorem ipsum..."
# Can't rename another page to an existing name.
wiki.create_page(name="Mars", body="Neque porro quisquam...")
try:
wiki.rename_page(name="Mars", new_name="Earth")
except RecordConflictError:
pass
else:
raise AssertionError("RecordConflictError not raised")
assert wiki.get_page(name="Earth").body == "Lorem ipsum..."
assert wiki.get_page(name="Mars").body == "Neque porro quisquam..."
A more refined implementation might release old index objects when page names are changed so that they can be reused by other pages, or update the old index to point to the new index, so that redirects can be implemented. See the Wiki application example to see how this can be done.
Classes¶
- eventsourcing.application.mutate_aggregate(aggregate: Optional[eventsourcing.application.T], domain_events: Iterable[eventsourcing.domain.DomainEvent[eventsourcing.application.T]]) Optional[eventsourcing.application.T] [source]¶
Mutator function for aggregate projections, which works by successively calling the mutate() method of the given list of domain events.
- class eventsourcing.application.Repository(event_store: eventsourcing.persistence.EventStore[eventsourcing.domain.AggregateEvent[eventsourcing.domain.TAggregate]], snapshot_store: Optional[eventsourcing.persistence.EventStore[eventsourcing.domain.Snapshot[eventsourcing.domain.TAggregate]]] = None)[source]¶
Bases:
Generic
[eventsourcing.domain.TAggregate
]Reconstructs aggregates from events in an
EventStore
, possibly using snapshot store to avoid replaying all events.- __init__(event_store: eventsourcing.persistence.EventStore[eventsourcing.domain.AggregateEvent[eventsourcing.domain.TAggregate]], snapshot_store: Optional[eventsourcing.persistence.EventStore[eventsourcing.domain.Snapshot[eventsourcing.domain.TAggregate]]] = None)[source]¶
Initialises repository with given event store (an
EventStore
for aggregateAggregateEvent
objects) and optionally a snapshot store (anEventStore
for aggregateSnapshot
objects).
- get(aggregate_id: uuid.UUID, version: Optional[int] = None, projector_func: Callable[[Optional[eventsourcing.domain.TAggregate], Iterable[eventsourcing.domain.DomainEvent[eventsourcing.domain.TAggregate]]], Optional[eventsourcing.domain.TAggregate]] = <function mutate_aggregate>) eventsourcing.domain.TAggregate [source]¶
Returns an
Aggregate
for given ID, optionally at the given version.
- class eventsourcing.application.Section(id: Optional[str], items: List[eventsourcing.persistence.Notification], next_id: Optional[str])[source]¶
Bases:
object
Frozen dataclass that represents a section from a
NotificationLog
. Theitems
attribute contains a list ofNotification
objects. Theid
attribute is the section ID, two integers separated by a comma that described the first and last notification ID that are included in the section. Thenext_id
attribute describes the section ID of the next section, and will be set if the section contains as many notifications are were requested.Constructor arguments:
- Parameters
id (Optional[str]) – section ID of this section e.g. “1,10”
items (List[Notification]) – a list of event notifications
next_id (Optional[str]) – section ID of the following section
- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(id: Optional[str], items: List[eventsourcing.persistence.Notification], next_id: Optional[str]) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.application.NotificationLog[source]¶
Bases:
abc.ABC
Abstract base class for notification logs.
- abstract __getitem__(section_id: str) eventsourcing.application.Section [source]¶
Returns a
Section
ofNotification
objects from the notification log.
- abstract select(start: int, limit: int) List[eventsourcing.persistence.Notification] [source]¶
Returns a selection
Notification
objects from the notification log.
- class eventsourcing.application.LocalNotificationLog(recorder: eventsourcing.persistence.ApplicationRecorder, section_size: int = 10)[source]¶
Bases:
eventsourcing.application.NotificationLog
Notification log that presents sections of event notifications retrieved from an
ApplicationRecorder
.- __init__(recorder: eventsourcing.persistence.ApplicationRecorder, section_size: int = 10)[source]¶
Initialises a local notification object with given
ApplicationRecorder
and an optional section size.Constructor arguments:
- Parameters
recorder (ApplicationRecorder) – application recorder from which event notifications will be selected
section_size (int) – number of notifications to include in a section
- __getitem__(requested_section_id: str) eventsourcing.application.Section [source]¶
Returns a
Section
of event notifications based on the requested section ID. The section ID of the returned section will describe the event notifications that are actually contained in the returned section, and may vary from the requested section ID if there are less notifications in the recorder than were requested, or if there are gaps in the sequence of recorded event notification.
- select(start: int, limit: int) List[eventsourcing.persistence.Notification] [source]¶
Returns a selection
Notification
objects from the notification log.
- class eventsourcing.application.ProcessEvent(tracking: Optional[eventsourcing.persistence.Tracking] = None)[source]¶
Bases:
object
Keeps together a
Tracking
object, which represents the position of a domain event notification in the notification log of a particular application, and the new domain events that result from processing that notification.- __init__(tracking: Optional[eventsourcing.persistence.Tracking] = None)[source]¶
Initalises the process event with the given tracking object.
- save(*aggregates: Union[eventsourcing.domain.Aggregate, eventsourcing.domain.AggregateEvent[eventsourcing.domain.Aggregate]], **kwargs: Any) None [source]¶
Collects pending domain events from the given aggregate.
- class eventsourcing.application.Application(env: Optional[Mapping[str, str]] = None)[source]¶
Bases:
abc.ABC
,Generic
[eventsourcing.domain.TAggregate
]Base class for event-sourced applications.
- __init__(env: Optional[Mapping[str, str]] = None) None [source]¶
Initialises an application with an
InfrastructureFactory
, aMapper
, anApplicationRecorder
, anEventStore
, aRepository
, and aLocalNotificationLog
.
- construct_env(env: Optional[Mapping[str, str]] = None) Mapping[str, str] [source]¶
Constructs environment from which application will be configured.
- construct_factory() eventsourcing.persistence.InfrastructureFactory [source]¶
Constructs an
InfrastructureFactory
for use by the application.
- construct_mapper(application_name: str = '') eventsourcing.persistence.Mapper [source]¶
Constructs a
Mapper
for use by the application.
- construct_transcoder() eventsourcing.persistence.Transcoder [source]¶
Constructs a
Transcoder
for use by the application.
- register_transcodings(transcoder: eventsourcing.persistence.Transcoder) None [source]¶
Registers
Transcoding
objects on givenJSONTranscoder
.
- construct_recorder() eventsourcing.persistence.ApplicationRecorder [source]¶
Constructs an
ApplicationRecorder
for use by the application.
- construct_event_store() eventsourcing.persistence.EventStore[eventsourcing.domain.AggregateEvent[eventsourcing.domain.TAggregate]] [source]¶
Constructs an
EventStore
for use by the application to store and retrieve aggregateAggregateEvent
objects.
- construct_snapshot_store() Optional[eventsourcing.persistence.EventStore[eventsourcing.domain.Snapshot[eventsourcing.domain.TAggregate]]] [source]¶
Constructs an
EventStore
for use by the application to store and retrieve aggregateSnapshot
objects.
- construct_repository() eventsourcing.application.Repository[eventsourcing.domain.TAggregate] [source]¶
Constructs a
Repository
for use by the application.
- construct_notification_log() eventsourcing.application.LocalNotificationLog [source]¶
Constructs a
LocalNotificationLog
for use by the application.
- save(*aggregates: Union[eventsourcing.domain.TAggregate, eventsourcing.domain.AggregateEvent[eventsourcing.domain.Aggregate]], **kwargs: Any) None [source]¶
Collects pending events from given aggregates and puts them in the application’s event store.
- record(process_event: eventsourcing.application.ProcessEvent) None [source]¶
Records given process event in the application’s recorder.
- notify(new_events: List[eventsourcing.domain.AggregateEvent[eventsourcing.domain.Aggregate]]) None [source]¶
Called after new domain events have been saved. This method on this class class doesn’t actually do anything, but this method may be implemented by subclasses that need to take action when new domain events have been saved.
- exception eventsourcing.application.AggregateNotFound[source]¶
Bases:
Exception
Raised when an
Aggregate
object is not found in aRepository
.
- class eventsourcing.cipher.AESCipher(cipher_key: str)[source]¶
Bases:
eventsourcing.persistence.Cipher
Cipher strategy that uses AES cipher in GCM mode.
- static create_key(num_bytes: int) str [source]¶
Creates AES cipher key, with length num_bytes.
- Parameters
num_bytes – An int value, either 16, 24, or 32.
persistence
— Infrastructure¶
This module provides a cohesive mechanism for storing domain events.
The entire mechanism is encapsulated by the library’s event store object class. An event store stores and retrieves domain events. The event store uses a mapper to convert domain events to stored events, and it uses a recorder to insert stored events in a datastore.
A mapper converts domain event objects of various types to
stored event objects when domain events are stored in the event
store. It also converts stored events objects back to domain
event objects when domain events are retrieved from the event
store. A mapper uses an extensible transcoder that can be set up
with additional transcoding objects that serialise and deserialise
particular types of object, such as Python’s UUID
,
datetime
and Decimal
objects.
A mapper may use a compressor to compress and decompress the state
of stored event objects, and may use a cipher to encode and decode
the state of stored event objects. If both a compressor and a cipher
are being used by a mapper, the state of any stored event objects will
be compressed and then encoded when storing domain events, and will be
decoded and then decompressed when retrieving domain events.
A recorder inserts stored event objects in a datastore when domain events are stored in an event store, and selects stored events from a datastore when domain events are retrieved from an event store. Depending on the type of the recorder it may be possible to select the stored events as event notifications, and it may be possible atomically to record tracking records along with the stored events,
Transcoder¶
A transcoder is used by a mapper to serialise and deserialise the state of domain model event objects.
The library’s JSONTranscoder
class
can be constructed without any arguments.
from eventsourcing.persistence import JSONTranscoder
transcoder = JSONTranscoder()
The transcoder
object has methods encode()
and decode()
which are used to perform the
serialisation and deserialisation. The serialised state is a Python bytes
object.
data = transcoder.encode({"a": 1})
copy = transcoder.decode(data)
assert copy == {"a": 1}
The library’s JSONTranscoder
uses the Python
json
module. And so, by default, only the basic object types supported by that
module can be encoded and decoded. The transcoder can be extended by registering
transcodings for the other types of object used in your domain model’s event objects.
A transcoding will convert other types of object to a representation of the non-basic
type of object that uses the basic types that are supported. The transcoder method
register()
is used to register
individual transcodings with the transcoder.
Transcodings¶
In order to encode and decode non-basic types of object that are not supported by
the transcoder by default, custom transcodings need to be defined in code and
registered with the transcoder using the transcoder object’s
register()
method. A transcoding
will encode an instance of a non-basic type of object that cannot by default be
encoded by the transcoder into a basic type of object that can be encoded by the
transcoder, and will decode that representation into the original type of object.
This makes it possible to transcode custom value objects, including custom types
that contain custom types. The transcoder works recursively through the object
and so included custom types do not need to be encoded by the transcoder, but
will be converted subsequently.
The library includes a limited collection of custom transcoding objects. For
example, the library’s UUIDAsHex
class
transcodes a Python UUID
objects as a hexadecimal string.
from uuid import uuid4
from eventsourcing.persistence import UUIDAsHex
transcoding = UUIDAsHex()
id1 = uuid4()
data = transcoding.encode(id1)
copy = transcoding.decode(data)
assert copy == id1
The library’s DatetimeAsISO
class
transcodes Python datetime
objects as ISO strings.
from datetime import datetime
from eventsourcing.persistence import (
DatetimeAsISO,
)
transcoding = DatetimeAsISO()
datetime1 = datetime(2021, 12, 31, 23, 59, 59)
data = transcoding.encode(datetime1)
copy = transcoding.decode(data)
assert copy == datetime1
The library’s DecimalAsStr
class
transcodes Python Decimal
objects as decimal strings.
from decimal import Decimal
from eventsourcing.persistence import (
DecimalAsStr,
)
transcoding = DecimalAsStr()
decimal1 = Decimal("1.2345")
data = transcoding.encode(decimal1)
copy = transcoding.decode(data)
assert copy == decimal1
Transcodings are registered with the transcoder using the transcoder object’s
register()
method.
transcoder.register(UUIDAsHex())
transcoder.register(DatetimeAsISO())
transcoder.register(DecimalAsStr())
data = transcoder.encode(id1)
copy = transcoder.decode(data)
assert copy == id1
data = transcoder.encode(datetime1)
copy = transcoder.decode(data)
assert copy == datetime1
data = transcoder.encode(decimal1)
copy = transcoder.decode(data)
assert copy == decimal1
Attempting to serialize an unsupported type will result in a Python TypeError
.
from datetime import date
date1 = date(2021, 12, 31)
try:
data = transcoder.encode(date1)
except TypeError as e:
assert e.args[0] == (
"Object of type <class 'datetime.date'> is not serializable. "
"Please define and register a custom transcoding for this type."
)
else:
raise AssertionError("TypeError not raised")
Attempting to deserialize an unsupported type will also result in a Python TypeError
.
try:
JSONTranscoder().decode(data)
except TypeError as e:
assert e.args[0] == (
"Data serialized with name 'decimal_str' is not deserializable. "
"Please register a custom transcoding for this type."
)
else:
raise AssertionError("TypeError not raised")
The library’s abstract base class Transcoding
can be subclassed to define custom transcodings for other object types. To define
a custom transcoding, simply subclass this base class, assign to the class attribute
type
the class transcoded type, and assign a string to the class attribute
name
. Then define an encode()
method that converts an instance of that type to a representation that uses a basic
type, and a decode()
method that will
convert that representation back to an instance of that type.
from eventsourcing.persistence import Transcoding
from typing import Union
class DateAsISO(Transcoding):
type = date
name = "date_iso"
def encode(self, obj: date) -> str:
return obj.isoformat()
def decode(self, data: str) -> date:
return date.fromisoformat(data)
transcoder.register(DateAsISO())
data = transcoder.encode(date1)
copy = transcoder.decode(data)
assert copy == date1
Please note, due to the way the Python json
module works, it isn’t
currently possible to transcode subclasses of the basic Python types that
are supported by default, such as dict
, list
, tuple
,
str
, int
, float
, and bool
. This behaviour
also means an encoded tuple
will be decoded as a list
.
This behaviour is coded in Python as C code, and can’t be suspended without
avoiding the use of this C code and thereby incurring a performance penalty
in the transcoding of domain event objects.
data = transcoder.encode((1, 2, 3))
copy = transcoder.decode(data)
assert isinstance(copy, list)
assert copy == [1, 2, 3]
Custom or non-basic types that contain other custom or non-basic types can be
supported in the transcoder by registering a transcoding for each non-basic type.
The transcoding for the type which contains non-basic types must return an object
that represents that type by involving the included non-basic objects, and this
representation will be subsequently transcoded by the transcoder using the applicable
transcoding for the included non-basic types. In the example below, SimpleCustomValue
has a UUID
and a date
as its id
and data
attributes.
The transcoding for SimpleCustomValue
returns a Python dict
that includes
the non-basic UUID
and date
objects. The class ComplexCustomValue
simply has a ComplexCustomValue
object as its value
attribute, and its
transcoding simply returns that object.
from uuid import UUID
class SimpleCustomValue:
def __init__(self, id: UUID, date: date):
self.id = id
self.date = date
def __eq__(self, other):
return (
isinstance(other, SimpleCustomValue) and
self.id == other.id and self.date == other.date
)
class ComplexCustomValue:
def __init__(self, value: SimpleCustomValue):
self.value = value
def __eq__(self, other):
return (
isinstance(other, ComplexCustomValue) and
self.value == other.value
)
class SimpleCustomValueAsDict(Transcoding):
type = SimpleCustomValue
name = "simple_custom_value"
def encode(self, obj: SimpleCustomValue) -> dict:
return {"id": obj.id, "date": obj.date}
def decode(self, data: dict) -> SimpleCustomValue:
assert isinstance(data, dict)
return SimpleCustomValue(**data)
class ComplexCustomValueAsDict(Transcoding):
type = ComplexCustomValue
name = "complex_custom_value"
def encode(self, obj: ComplexCustomValue) -> SimpleCustomValue:
return obj.value
def decode(self, data: SimpleCustomValue) -> ComplexCustomValue:
assert isinstance(data, SimpleCustomValue)
return ComplexCustomValue(data)
The custom value object transcodings can be registered with the transcoder.
transcoder.register(SimpleCustomValueAsDict())
transcoder.register(ComplexCustomValueAsDict())
We can now transcode an instance of ComplexCustomValueAsDict
.
obj1 = ComplexCustomValue(
SimpleCustomValue(
id=UUID("b2723fe2c01a40d2875ea3aac6a09ff5"),
date=date(2000, 2, 20)
)
)
data = transcoder.encode(obj1)
copy = transcoder.decode(data)
assert copy == obj1
As you can see from the bytes representation below, the transcoder puts the return value
of each transcoding’s encode()
method in a Python dict
that has two values
_data_
and _type_
. The _data_
value is the return value of the
transcoding’s encode()
method, and the _type_
value is the name of the
transcoding. For this reason, it is necessary to avoid defining model objects to have a
Python dict
that has only two attributes _data_
and _type_
, and
avoid defining transcodings that return such a thing.
expected_data = (
b'{"_type_": "complex_custom_value", "_data_": {"_type_": '
b'"simple_custom_value", "_data_": {"id": {"_type_": '
b'"uuid_hex", "_data_": "b2723fe2c01a40d2875ea3aac6a09ff5"},'
b' "date": {"_type_": "date_iso", "_data_": "2000-02-20"}'
b'}}}'
)
assert data == expected_data
Stored event objects¶
A stored event object is a common object type that can be used to represent domain event objects of different types. By using a common object for the representation of different types of domain events objects, the domain event objects can be stored and retrieved in a standard way.
The library’s StoredEvent
class
is a Python frozen dataclass that can be used to hold information
about a domain event object between it being serialised and being
recorded in a datastore, and between it be retrieved from a datastore
from an aggregate sequence and being deserialised as a domain event object.
from uuid import uuid4
from eventsourcing.persistence import StoredEvent
stored_event = StoredEvent(
originator_id=uuid4(),
originator_version=1,
state="{}",
topic="eventsourcing.model:DomainEvent",
)
Mapper¶
A mapper maps between domain event objects and stored event objects. It brings together a transcoder, and optionally a cipher and a compressor. It is used by an event store.
The library’s Mapper
class
must be constructed with a transcoder object.
from eventsourcing.persistence import Mapper
mapper = Mapper(transcoder=transcoder)
The from_domain_event()
method of the
mapper
object converts DomainEvent
objects to
StoredEvent
objects.
from eventsourcing.domain import DomainEvent, TZINFO
domain_event1 = DomainEvent(
originator_id = id1,
originator_version = 1,
timestamp = datetime.now(tz=TZINFO),
)
stored_event1 = mapper.from_domain_event(domain_event1)
assert isinstance(stored_event1, StoredEvent)
The to_domain_event()
method of the
mapper
object converts StoredEvent
objects to
DomainEvent
objects.
assert mapper.to_domain_event(stored_event1) == domain_event1
Encryption¶
Using a cryptographic cipher with your mapper will make the state of your application encrypted “at rest” and “on the wire”.
Without encryption, the state of the domain event will be visible in the
recorded stored events in your database. For example, the timestamp
of the domain event in the example above (domain_event1
) is visible
in the stored event (stored_event1
).
assert domain_event1.timestamp.isoformat() in str(stored_event1.state)
The library’s AESCipher
class can
be used to cryptographically encode and decode the state of stored
events. It must be constructed with a cipher key. The class method
create_key()
can be used to
generate a cipher key. The AES cipher key must be either 16, 24, or
32 bytes long. Please note, the same cipher key must be used to
decrypt stored events as that which was used to encrypt stored events.
from eventsourcing.cipher import AESCipher
key = AESCipher.create_key(num_bytes=32) # 16, 24, or 32
cipher = AESCipher(cipher_key=key)
mapper = Mapper(
transcoder=transcoder,
cipher=cipher,
)
stored_event1 = mapper.from_domain_event(domain_event1)
assert isinstance(stored_event1, StoredEvent)
assert mapper.to_domain_event(stored_event1) == domain_event1
With encryption, the state of the domain event will not be visible in the stored event. This feature can be used to implement “application-level encryption” in an event-sourced application.
assert domain_event1.timestamp.isoformat() not in str(stored_event1.state)
The library’s AESCipher
class uses the
AES cipher
from the PyCryptodome library
in GCM mode.
AES is a very fast and secure symmetric block cipher, and is the de facto
standard for symmetric encryption. Galois/Counter Mode (GCM) is a mode of
operation for symmetric block ciphers that is designed to provide both data
authenticity and confidentiality, and is widely adopted for its performance.
The mapper expects an instance of the abstract base class Cipher
,
and AESCipher
implements this abstract base class,
so if you want to use another cipher strategy simply implement the base class.
Compression¶
A compressor can be used to reduce the size of stored events.
The library’s ZlibCompressor
class
can be used to compress and decompress the state of stored events. The
size of the state of a compressed and encrypted stored event will be
less than or equal to the size of the state of a stored event that is
encrypted but not compressed.
from eventsourcing.compressor import ZlibCompressor
compressor = ZlibCompressor()
mapper = Mapper(
transcoder=transcoder,
cipher=cipher,
compressor=compressor,
)
stored_event2 = mapper.from_domain_event(domain_event1)
assert mapper.to_domain_event(stored_event2) == domain_event1
assert len(stored_event2.state) <= len(stored_event1.state)
The library’s ZlibCompressor
class
uses Python’s zlib
module.
The mapper expects an instance of the abstract base class
Compressor
, and
ZlibCompressor
implements this
abstract base class, so if you want to use another compression
strategy simply implement the base class.
Notification objects¶
Event notifications are used to propagate the state of an event sourced application in a reliable way. The stored events can be positioned in a “total order” by giving each a new domain event a notification ID that is higher that any previously recorded event. By recording the domain events atomically with their notification IDs, there will never be a domain event that is not available to be passed as a message across a network, and there will never be a message passed across a network that doesn’t correspond to a recorded event. This solves the “dual writing” problem that occurs when separately a domain model is updated and then a message is put on a message queue.
The library’s Notification
class
is a Python frozen dataclass that can be used to hold information
about a domain event object when being transmitted as an item in a
section of a notification log.
It will be returned when selecting event notifications from a
recorder, and presented in an application by a
notification log.
from uuid import uuid4
from eventsourcing.persistence import Notification
stored_event = Notification(
id=123,
originator_id=uuid4(),
originator_version=1,
state="{}",
topic="eventsourcing.model:DomainEvent",
)
Tracking objects¶
A tracking object can be used to encapsulate the position of an event notification in an upstream application’s notification log. A tracking object can be passed into a process recorder along with new stored event objects, and recorded atomically with those objects. By ensuring the uniqueness of recorded tracking objects, we can ensure that a domain event notification is never processed twice. By recording the position of the last event notification that has been processed, we can ensure to resume processing event notifications at the correct position. This constructs “exactly once” semantics when processing event notifications, by solving the “dual writing” problem that occurs when separately an event notification is consumed from a message queue with updates made to materialized view, and then an acknowledgement is sent back to the message queue.
The library’s Tracking
class
is a Python frozen dataclass that can be used to hold the notification
ID of a notification that has been processed.
from uuid import uuid4
from eventsourcing.persistence import Tracking
tracking = Tracking(
notification_id=123,
application_name="bounded_context1",
)
Recorder¶
A recorder adapts a database management system for the purpose of recording stored events. It is used by an event store.
The library’s Recorder
class
is an abstract base for concrete recorder classes that will insert
stored event objects in a particular datastore.
There are three flavours of recorder: “aggregate recorders” are the simplest and simply store domain events in aggregate sequences; “application recorders” extend aggregate recorders by storing domain events with a total order; “process recorders” extend application recorders by supporting the recording of domain events atomically with “tracking” objects that record the position in a total ordering of domain events that is being processed. The “aggregate recorder” can be used for storing snapshots.
The library includes in its sqlite
module
recorder classes for SQLite that use the Python sqlite3
module, and in its postgres
module recorders for
PostgreSQL that use the third party psycopg2
module.
Recorder classes are conveniently constructed by using an infrastructure factory. For illustrative purposes, the direct use of the library’s SQLite recorders is shown below. The other persistence modules follow a similar naming scheme and pattern of use.
from eventsourcing.sqlite import SQLiteAggregateRecorder
from eventsourcing.sqlite import SQLiteApplicationRecorder
from eventsourcing.sqlite import SQLiteProcessRecorder
from eventsourcing.sqlite import SQLiteDatastore
datastore = SQLiteDatastore(db_name=":memory:")
aggregate_recorder = SQLiteAggregateRecorder(datastore, "snapshots")
aggregate_recorder.create_table()
application_recorder = SQLiteApplicationRecorder(datastore)
application_recorder.create_table()
datastore = SQLiteDatastore(db_name=":memory:")
process_recorder = SQLiteProcessRecorder(datastore)
process_recorder.create_table()
The library also includes in the popo
module recorders
that use “plain old Python objects”, which simply keep stored events in a
data structure in memory, and provides the fastest alternative for rapid
development of event sourced applications (~4x faster than using SQLite, and
~20x faster than using PostgreSQL).
Recorders compatible with this version of the library for popular ORMs such as SQLAlchemy and Django, specialist event stores such as EventStoreDB and AxonDB, and NoSQL databases such as DynamoDB and MongoDB are forthcoming.
Event store¶
An event store provides a common interface for storing and retrieving domain event objects. It combines a mapper and a recorder, so that domain event objects can be converted to stored event objects and then stored event objects can be recorded in a datastore.
The library’s EventStore
class must
be constructed with a mapper and a recorder.
The EventStore
has an object method
put()
which can be used to
store a list of new domain event objects. If any of these domain event
objects conflict with any already existing domain event object (because
they have the same aggregate ID and version number), an exception will
be raised and none of the new events will be stored.
The EventStore
has an object method
get()
which can be used to
get a list of domain event objects. Only the originator_id
argument
is required, which is the ID of the aggregate for which existing events
are wanted. The arguments gt
, lte
, limit
, and desc
condition the selection of events to be greater than a particular version
number, less then or equal to a particular version number, limited in
number, or selected in a descending fashion. The selection is by default
ascending, unlimited, and otherwise unrestricted such that all the previously
stored domain event objects for a particular aggregate will be returned
in the order in which they were created.
from eventsourcing.persistence import EventStore
event_store = EventStore(
mapper=mapper,
recorder=application_recorder,
)
event_store.put([domain_event1])
domain_events = list(event_store.get(id1))
assert domain_events == [domain_event1]
Infrastructure factory¶
An infrastructure factory helps with the construction of the persistence infrastructure objects mentioned above. By reading and responding to particular environment variables, the persistence infrastructure of an event-sourced application can be easily configured in different ways at different times.
The library’s InfrastructureFactory
class
is a base class for concrete infrastructure factories that help with the construction
of persistence objects that use a particular database in a particular way.
The class method construct()
will, by default, construct the library’s “plain old Python objects”
infrastructure Factory
, which uses recorders that simply
keep stored events in a data structure in memory (see eventsourcing.popo
).
from eventsourcing.persistence import InfrastructureFactory
factory = InfrastructureFactory.construct()
recorder = factory.application_recorder()
mapper = factory.mapper(transcoder=transcoder)
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event1])
domain_events = list(event_store.get(id1))
assert domain_events == [domain_event1]
The optional environment variables COMPRESSOR_TOPIC
, CIPHER_KEY
, and CIPHER_TOPIC
may
be used to enable compression and encryption of stored events when using POPO infrastructure.
SQLite¶
The library supports storing events in SQLite.
The library’s SQLite Factory
uses various
environment variables to control the construction and configuration of its
persistence infrastructure.
The environment variable SQLITE_DBNAME
is required to set the name of a database,
normally a file path, but the special name :memory:
can be used to create an
in-memory database.
Please note, a file-based SQLite database will have its journal mode set to use
write-ahead logging (WAL), which allows reading to proceed concurrently reading
and writing. Writing is serialised with a lock. The lock timeout can be adjusted
by setting the environment variable SQLITE_LOCK_TIMEOUT
. Setting this value to
a positive number of seconds will cause attempts to lock the SQLite database for
writing to timeout after that duration. By default this value is 5 (seconds).
The optional environment variables COMPRESSOR_TOPIC
, CIPHER_KEY
, and CIPHER_TOPIC
may
be used to enable compression and encryption of stored events.
The optional environment variable CREATE_TABLE
may be control whether database tables are created.
If the tables already exist, the CREATE_TABLE
may be set to a “false” value ("n"
,
"no"
, "f"
, "false"
, "off"
, or "0"
). This value is by default “true”
which is normally okay because the tables are created only if they do not exist.
import os
os.environ["INFRASTRUCTURE_FACTORY"] = "eventsourcing.sqlite:Factory"
os.environ["SQLITE_DBNAME"] = ":memory:"
os.environ["SQLITE_LOCK_TIMEOUT"] = "10"
factory = InfrastructureFactory.construct()
recorder = factory.application_recorder()
mapper = factory.mapper(transcoder=transcoder)
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event1])
domain_events = list(event_store.get(id1))
assert domain_events == [domain_event1]
The SQLite infrastructure is provided by the eventsourcing.sqlite
module.
PostgreSQL¶
The library also supports storing events in PostgresSQL.
The library’s PostgreSQL Factory
uses various
environment variables to control the construction and configuration of its persistence
infrastructure.
The environment variables POSTGRES_DBNAME
, POSTGRES_HOST
, POSTGRES_PORT
,
POSTGRES_USER
, and POSTGRES_PASSWORD
are required to set the name of a database,
the database server’s host name and port number, and the database user name and password.
The optional environment variable POSTGRES_CONN_MAX_AGE
is used to control the length of time in
seconds before a connection is closed. By default this value is not set, and connections will
be reused indefinitely (or until an operational database error is encountered). If this
value is set to a positive integer, the connection will be closed after this number of
seconds from the time it was created, but only when the connection is idle. If this value
if set to zero, each connection will only be used for one transaction. Setting this value
to an empty string has the same effect as not setting this value. Setting this value to
any other value will cause an environment error exception to be raised. If your database
terminates idle connections after some time, you should set POSTGRES_CONN_MAX_AGE
to a
lower value, so that attempts are not made to use connections that have been terminated
by the database server.
The optional environment variable POSTGRES_PRE_PING
may be used to enable pessimistic
disconnection handling. Setting this to a “true” value ("y"
, "yes"
, "t"
, "true"
,
"on"
, or "1"
) means database connections will be checked that they are usable before
executing statements, and database connections remade if the connection is not usable. This
value is by default “false”, meaning connections will not be checked before they are reused.
Enabling this option will incur a small impact on performance.
The optional environment variable POSTGRES_LOCK_TIMEOUT
may be used to enable a timeout
on acquiring an ‘EXCLUSIVE’ mode table lock when inserting stored events. To avoid interleaving
of inserts when writing events, an ‘EXCLUSIVE’ mode table lock is acquired when inserting events.
This avoids a potential issue where insert order and commit order are not the same.
Locking the table effectively serialises writing events. It prevents concurrent transactions
interleaving inserts, which would potentially cause notification log readers that are tailing
the application notification log to miss event notifications. Reading from the table can proceed
concurrently with other readers and writers, since selecting acquires an ‘ACCESS SHARE’ lock which
does not block and is not blocked by the ‘EXCLUSIVE’ lock. This issue of interleaving inserts
by concurrent writers is not exhibited by SQLite, which locks the entire database for writing,
effectively serializing the operations of concurrent readers. When its journal mode is set to
use write ahead logging, reading can proceed concurrently with writing. By default, this timeout
has the value of 0 seconds, which means attempts to acquire the lock will not timeout. Setting
this value to a positive integer number of seconds will cause attempt to obtain this lock to
timeout after that duration has passed. The lock will be released when the transaction ends.
The optional environment variable POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT
may be used to
timeout sessions that are idle in a transaction. If a transaction cannot be ended for some reason,
perhaps because the database server cannot be reached, the transaction may remain in an idle
state and any locks will continue to be held. By timing out the session, transactions will be ended,
locks will be released, and the connection slot will be freed. By default, this timeout has the value
of 0 seconds, which means sessions in an idle transaction will not timeout. Setting this value to a
positive integer number of seconds will cause sessions in an idle transaction to timeout after that duration
has passed.
The optional environment variables COMPRESSOR_TOPIC
, CIPHER_KEY
, and CIPHER_TOPIC
may
be used to enable compression and encryption of stored events.
The optional environment variable CREATE_TABLE
may be control whether database tables are created.
If the tables already exist, the CREATE_TABLE
may be set to a “false” value ("n"
,
"no"
, "f"
, "false"
, "off"
, or "0"
). This value is by default “true”
which is normally okay because the tables are created only if they do not exist.
import os
os.environ["INFRASTRUCTURE_FACTORY"] = "eventsourcing.postgres:Factory"
os.environ["POSTGRES_DBNAME"] = "eventsourcing"
os.environ["POSTGRES_HOST"] = "127.0.0.1"
os.environ["POSTGRES_PORT"] = "5432"
os.environ["POSTGRES_USER"] = "eventsourcing"
os.environ["POSTGRES_PASSWORD"] = "eventsourcing"
os.environ["POSTGRES_CONN_MAX_AGE"] = "10"
os.environ["POSTGRES_PRE_PING"] = "y"
os.environ["POSTGRES_LOCK_TIMEOUT"] = "5"
os.environ["POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT"] = "5"
factory = InfrastructureFactory.construct()
recorder = factory.application_recorder()
mapper = factory.mapper(transcoder=transcoder)
event_store = factory.event_store(
mapper=mapper,
recorder=recorder,
)
event_store.put([domain_event1])
domain_events = list(event_store.get(id1))
assert domain_events == [domain_event1]
The PostgreSQL infrastructure is provided by the eventsourcing.postgres
module.
Classes¶
- class eventsourcing.persistence.Transcoding[source]¶
Bases:
abc.ABC
Abstract base class for custom transcodings.
- abstract property name: str¶
Name of transcoding.
- class eventsourcing.persistence.Transcoder[source]¶
Bases:
abc.ABC
Abstract base class for transcoders.
- register(transcoding: eventsourcing.persistence.Transcoding) None [source]¶
Registers given transcoding with the transcoder.
- class eventsourcing.persistence.JSONTranscoder[source]¶
Bases:
eventsourcing.persistence.Transcoder
Extensible transcoder that uses the Python
json
module.
- class eventsourcing.persistence.UUIDAsHex[source]¶
Bases:
eventsourcing.persistence.Transcoding
Transcoding that represents
UUID
objects as hex values.- type¶
alias of
uuid.UUID
- class eventsourcing.persistence.DecimalAsStr[source]¶
Bases:
eventsourcing.persistence.Transcoding
Transcoding that represents
Decimal
objects as strings.- type¶
alias of
decimal.Decimal
- class eventsourcing.persistence.DatetimeAsISO[source]¶
Bases:
eventsourcing.persistence.Transcoding
Transcoding that represents
datetime
objects as ISO strings.- type¶
alias of
datetime.datetime
- class eventsourcing.persistence.StoredEvent(originator_id: uuid.UUID, originator_version: int, topic: str, state: bytes)[source]¶
Bases:
object
Frozen dataclass that represents
DomainEvent
objects, such as aggregateEvent
objects andSnapshot
objects.Constructor parameters:
- Parameters
originator_id (UUID) – ID of the originating aggregate
originator_version (int) – version of the originating aggregate
topic (str) – topic of the domain event object class
state (bytes) – serialised state of the domain event object
- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: uuid.UUID, originator_version: int, topic: str, state: bytes) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.persistence.Cipher(cipher_key: str)[source]¶
Bases:
abc.ABC
Base class for ciphers.
- class eventsourcing.persistence.Mapper(transcoder: eventsourcing.persistence.Transcoder, compressor: Optional[eventsourcing.persistence.Compressor] = None, cipher: Optional[eventsourcing.persistence.Cipher] = None)[source]¶
Bases:
object
Converts between domain event objects and
StoredEvent
objects.Uses a
Transcoder
, and optionally a cryptographic cipher and compressor.- __init__(transcoder: eventsourcing.persistence.Transcoder, compressor: Optional[eventsourcing.persistence.Compressor] = None, cipher: Optional[eventsourcing.persistence.Cipher] = None)[source]¶
- from_domain_event(domain_event: eventsourcing.domain.DomainEvent[Any]) eventsourcing.persistence.StoredEvent [source]¶
Converts the given domain event to a
StoredEvent
object.
- to_domain_event(stored: eventsourcing.persistence.StoredEvent) eventsourcing.domain.DomainEvent[Any] [source]¶
Converts the given
StoredEvent
to a domain event object.
- exception eventsourcing.persistence.RecordConflictError[source]¶
Bases:
Exception
Legacy exception, replaced with IntegrityError.
- exception eventsourcing.persistence.PersistenceError[source]¶
Bases:
Exception
The base class of the other exceptions in this module.
Exception class names follow https://www.python.org/dev/peps/pep-0249/#exceptions
- exception eventsourcing.persistence.InterfaceError[source]¶
Bases:
eventsourcing.persistence.PersistenceError
Exception raised for errors that are related to the database interface rather than the database itself.
- exception eventsourcing.persistence.DatabaseError[source]¶
Bases:
eventsourcing.persistence.PersistenceError
Exception raised for errors that are related to the database.
- exception eventsourcing.persistence.DataError[source]¶
Bases:
eventsourcing.persistence.DatabaseError
Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range, etc.
- exception eventsourcing.persistence.OperationalError[source]¶
Bases:
eventsourcing.persistence.DatabaseError
Exception raised for errors that are related to the database’s operation and not necessarily under the control of the programmer, e.g. an unexpected disconnect occurs, the data source name is not found, a transaction could not be processed, a memory allocation error occurred during processing, etc.
- exception eventsourcing.persistence.IntegrityError[source]¶
Bases:
eventsourcing.persistence.DatabaseError
,eventsourcing.persistence.RecordConflictError
Exception raised when the relational integrity of the database is affected, e.g. a foreign key check fails.
- exception eventsourcing.persistence.InternalError[source]¶
Bases:
eventsourcing.persistence.DatabaseError
Exception raised when the database encounters an internal error, e.g. the cursor is not valid anymore, the transaction is out of sync, etc.
- exception eventsourcing.persistence.ProgrammingError[source]¶
Bases:
eventsourcing.persistence.DatabaseError
Exception raised for programming errors, e.g. table not found or already exists, syntax error in the SQL statement, wrong number of parameters specified, etc.
- exception eventsourcing.persistence.NotSupportedError[source]¶
Bases:
eventsourcing.persistence.DatabaseError
Exception raised in case a method or database API was used which is not supported by the database, e.g. calling the rollback() method on a connection that does not support transaction or has transactions turned off.
- class eventsourcing.persistence.Recorder[source]¶
Bases:
abc.ABC
Abstract base class for stored event recorders.
- class eventsourcing.persistence.AggregateRecorder[source]¶
Bases:
eventsourcing.persistence.Recorder
Abstract base class for recorders that record and retrieve stored events for domain model aggregates.
- abstract insert_events(stored_events: List[eventsourcing.persistence.StoredEvent], **kwargs: Any) None [source]¶
Writes stored events into database.
- abstract select_events(originator_id: uuid.UUID, gt: Optional[int] = None, lte: Optional[int] = None, desc: bool = False, limit: Optional[int] = None) List[eventsourcing.persistence.StoredEvent] [source]¶
Reads stored events from database.
- class eventsourcing.persistence.Notification(originator_id: uuid.UUID, originator_version: int, topic: str, state: bytes, id: int)[source]¶
Bases:
eventsourcing.persistence.StoredEvent
Frozen dataclass that represents domain event notifications.
- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(originator_id: uuid.UUID, originator_version: int, topic: str, state: bytes, id: int) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.persistence.ApplicationRecorder[source]¶
Bases:
eventsourcing.persistence.AggregateRecorder
Abstract base class for recorders that record and retrieve stored events for domain model aggregates.
Extends the behaviour of aggregate recorders by recording aggregate events in a total order that allows the stored events also to be retrieved as event notifications.
- abstract select_notifications(start: int, limit: int) List[eventsourcing.persistence.Notification] [source]¶
Returns a list of event notifications from ‘start’, limited by ‘limit’.
- class eventsourcing.persistence.ProcessRecorder[source]¶
Bases:
eventsourcing.persistence.ApplicationRecorder
Abstract base class for recorders that record and retrieve stored events for domain model aggregates.
Extends the behaviour of applications recorders by recording aggregate events with tracking information that records the position of a processed event notification in a notification log.
- class eventsourcing.persistence.EventStore(mapper: eventsourcing.persistence.Mapper, recorder: eventsourcing.persistence.AggregateRecorder)[source]¶
Bases:
Generic
[eventsourcing.domain.TDomainEvent
]Stores and retrieves domain events.
- __init__(mapper: eventsourcing.persistence.Mapper, recorder: eventsourcing.persistence.AggregateRecorder)[source]¶
- put(events: Sequence[eventsourcing.domain.DomainEvent[Any]], **kwargs: Any) None [source]¶
Stores domain events in aggregate sequence.
- class eventsourcing.persistence.InfrastructureFactory(application_name: str, env: Mapping[str, str])[source]¶
Bases:
abc.ABC
Abstract base class for infrastructure factories.
- classmethod construct(application_name: str = '', env: Optional[Mapping[str, str]] = None) eventsourcing.persistence.TF [source]¶
Constructs concrete infrastructure factory for given named application. Reads and resolves infrastructure factory class topic from environment variable ‘INFRASTRUCTURE_FACTORY’.
- __init__(application_name: str, env: Mapping[str, str])[source]¶
Initialises infrastructure factory object with given application name.
- getenv(key: str, default: Optional[str] = None, application_name: str = '') Optional[str] [source]¶
Returns value of environment variable defined by given key.
- mapper(transcoder: eventsourcing.persistence.Transcoder, application_name: str = '') eventsourcing.persistence.Mapper [source]¶
Constructs a mapper.
- cipher(application_name: str) Optional[eventsourcing.persistence.Cipher] [source]¶
Reads environment variables ‘CIPHER_TOPIC’ and ‘CIPHER_KEY’ to decide whether or not to construct a cipher.
- compressor(application_name: str) Optional[eventsourcing.persistence.Compressor] [source]¶
Reads environment variable ‘COMPRESSOR_TOPIC’ to decide whether or not to construct a compressor.
- static event_store(**kwargs: Any) eventsourcing.persistence.EventStore[eventsourcing.domain.TDomainEvent] [source]¶
Constructs an event store.
- abstract aggregate_recorder(purpose: str = 'events') eventsourcing.persistence.AggregateRecorder [source]¶
Constructs an aggregate recorder.
- abstract application_recorder() eventsourcing.persistence.ApplicationRecorder [source]¶
Constructs an application recorder.
- abstract process_recorder() eventsourcing.persistence.ProcessRecorder [source]¶
Constructs a process recorder.
- class eventsourcing.persistence.Tracking(application_name: str, notification_id: int)[source]¶
Bases:
object
Frozen dataclass representing the position of a domain event
Notification
in an application’s notification log.- __delattr__(name)¶
Implement delattr(self, name).
- __eq__(other)¶
Return self==value.
- __hash__()¶
Return hash(self).
- __init__(application_name: str, notification_id: int) None ¶
- __repr__()¶
Return repr(self).
- __setattr__(name, value)¶
Implement setattr(self, name, value).
- class eventsourcing.popo.POPOAggregateRecorder[source]¶
Bases:
eventsourcing.persistence.AggregateRecorder
- insert_events(stored_events: List[eventsourcing.persistence.StoredEvent], **kwargs: Any) None [source]¶
Writes stored events into database.
- select_events(originator_id: uuid.UUID, gt: Optional[int] = None, lte: Optional[int] = None, desc: bool = False, limit: Optional[int] = None) List[eventsourcing.persistence.StoredEvent] [source]¶
Reads stored events from database.
- class eventsourcing.popo.POPOApplicationRecorder[source]¶
Bases:
eventsourcing.persistence.ApplicationRecorder
,eventsourcing.popo.POPOAggregateRecorder
- select_notifications(start: int, limit: int) List[eventsourcing.persistence.Notification] [source]¶
Returns a list of event notifications from ‘start’, limited by ‘limit’.
- class eventsourcing.popo.POPOProcessRecorder[source]¶
Bases:
eventsourcing.persistence.ProcessRecorder
,eventsourcing.popo.POPOApplicationRecorder
- class eventsourcing.popo.Factory(application_name: str, env: Mapping[str, str])[source]¶
Bases:
eventsourcing.persistence.InfrastructureFactory
- aggregate_recorder(purpose: str = 'events') eventsourcing.persistence.AggregateRecorder [source]¶
Constructs an aggregate recorder.
- application_recorder() eventsourcing.persistence.ApplicationRecorder [source]¶
Constructs an application recorder.
- process_recorder() eventsourcing.persistence.ProcessRecorder [source]¶
Constructs a process recorder.
- class eventsourcing.sqlite.SQLiteAggregateRecorder(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶
Bases:
eventsourcing.persistence.AggregateRecorder
- __init__(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶
- insert_events(stored_events: List[eventsourcing.persistence.StoredEvent], **kwargs: Any) None [source]¶
Writes stored events into database.
- select_events(originator_id: uuid.UUID, gt: Optional[int] = None, lte: Optional[int] = None, desc: bool = False, limit: Optional[int] = None) List[eventsourcing.persistence.StoredEvent] [source]¶
Reads stored events from database.
- class eventsourcing.sqlite.SQLiteApplicationRecorder(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶
Bases:
eventsourcing.sqlite.SQLiteAggregateRecorder
,eventsourcing.persistence.ApplicationRecorder
- __init__(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶
- select_notifications(start: int, limit: int) List[eventsourcing.persistence.Notification] [source]¶
Returns a list of event notifications from ‘start’, limited by ‘limit’.
- class eventsourcing.sqlite.SQLiteProcessRecorder(datastore: eventsourcing.sqlite.SQLiteDatastore, events_table_name: str = 'stored_events')[source]¶
Bases:
eventsourcing.sqlite.SQLiteApplicationRecorder
,eventsourcing.persistence.ProcessRecorder
- class eventsourcing.sqlite.Factory(application_name: str, env: Mapping[str, str])[source]¶
Bases:
eventsourcing.persistence.InfrastructureFactory
- __init__(application_name: str, env: Mapping[str, str])[source]¶
Initialises infrastructure factory object with given application name.
- aggregate_recorder(purpose: str = 'events') eventsourcing.persistence.AggregateRecorder [source]¶
Constructs an aggregate recorder.
- application_recorder() eventsourcing.persistence.ApplicationRecorder [source]¶
Constructs an application recorder.
- process_recorder() eventsourcing.persistence.ProcessRecorder [source]¶
Constructs a process recorder.
- class eventsourcing.postgres.PostgresAggregateRecorder(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str)[source]¶
Bases:
eventsourcing.persistence.AggregateRecorder
- insert_events(stored_events: List[eventsourcing.persistence.StoredEvent], **kwargs: Any) None [source]¶
Writes stored events into database.
- select_events(originator_id: uuid.UUID, gt: Optional[int] = None, lte: Optional[int] = None, desc: bool = False, limit: Optional[int] = None) List[eventsourcing.persistence.StoredEvent] [source]¶
Reads stored events from database.
- class eventsourcing.postgres.PostgresApplicationRecorder(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str = 'stored_events')[source]¶
Bases:
eventsourcing.postgres.PostgresAggregateRecorder
,eventsourcing.persistence.ApplicationRecorder
- __init__(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str = 'stored_events')[source]¶
- select_notifications(start: int, limit: int) List[eventsourcing.persistence.Notification] [source]¶
Returns a list of event notifications from ‘start’, limited by ‘limit’.
- class eventsourcing.postgres.PostgresProcessRecorder(datastore: eventsourcing.postgres.PostgresDatastore, events_table_name: str, tracking_table_name: str)[source]¶
Bases:
eventsourcing.postgres.PostgresApplicationRecorder
,eventsourcing.persistence.ProcessRecorder
- class eventsourcing.postgres.Factory(application_name: str, env: Mapping[str, str])[source]¶
Bases:
eventsourcing.persistence.InfrastructureFactory
- __init__(application_name: str, env: Mapping[str, str])[source]¶
Initialises infrastructure factory object with given application name.
- aggregate_recorder(purpose: str = 'events') eventsourcing.persistence.AggregateRecorder [source]¶
Constructs an aggregate recorder.
- application_recorder() eventsourcing.persistence.ApplicationRecorder [source]¶
Constructs an application recorder.
- process_recorder() eventsourcing.persistence.ProcessRecorder [source]¶
Constructs a process recorder.
system
— Event-driven systems¶
This module shows how event-sourced applications can be combined to make an event driven system.
this page is under development — please check back soon
System of applications¶
The library’s system class…
from eventsourcing.system import System
from uuid import uuid4
from eventsourcing.domain import Aggregate, AggregateCreated, AggregateEvent
class World(Aggregate):
def __init__(self):
self.history = []
@classmethod
def create(cls):
return cls._create(
event_class=cls.Created,
id=uuid4(),
)
class Created(AggregateCreated):
pass
def make_it_so(self, what):
self.trigger_event(self.SomethingHappened, what=what)
class SomethingHappened(AggregateEvent):
what: str
def apply(self, world):
world.history.append(self.what)
Now let’s define an application…
from eventsourcing.application import Application
class WorldsApplication(Application):
def create_world(self):
world = World.create()
self.save(world)
return world.id
def make_it_so(self, world_id, what):
world = self.repository.get(world_id)
world.make_it_so(what)
self.save(world)
def get_world_history(self, world_id):
world = self.repository.get(world_id)
return list(world.history)
Now let’s define an analytics application…
from uuid import uuid5, NAMESPACE_URL
class Counter(Aggregate):
def __init__(self):
self.count = 0
@classmethod
def create_id(cls, name):
return uuid5(NAMESPACE_URL, f'/counters/{name}')
@classmethod
def create(cls, name):
return cls._create(
event_class=cls.Created,
id=cls.create_id(name),
)
class Created(AggregateCreated):
pass
def increment(self):
self.trigger_event(self.Incremented)
class Incremented(AggregateEvent):
def apply(self, counter):
counter.count += 1
from eventsourcing.application import AggregateNotFound
from eventsourcing.system import ProcessApplication
from eventsourcing.dispatch import singledispatchmethod
class Counters(ProcessApplication):
@singledispatchmethod
def policy(self, domain_event, process_event):
"""Default policy"""
@policy.register(World.SomethingHappened)
def _(self, domain_event, process_event):
what = domain_event.what
counter_id = Counter.create_id(what)
try:
counter = self.repository.get(counter_id)
except AggregateNotFound:
counter = Counter.create(what)
counter.increment()
process_event.save(counter)
def get_count(self, what):
counter_id = Counter.create_id(what)
try:
counter = self.repository.get(counter_id)
except AggregateNotFound:
return 0
return counter.count
system = System(pipes=[[WorldsApplication, Counters]])
Single-threaded runner¶
from eventsourcing.system import SingleThreadedRunner
runner= SingleThreadedRunner(system)
runner.start()
worlds = runner.get(WorldsApplication)
counters = runner.get(Counters)
world_id1 = worlds.create_world()
world_id2 = worlds.create_world()
world_id3 = worlds.create_world()
assert counters.get_count('dinosaurs') == 0
assert counters.get_count('trucks') == 0
assert counters.get_count('internet') == 0
worlds.make_it_so(world_id1, 'dinosaurs')
worlds.make_it_so(world_id2, 'dinosaurs')
worlds.make_it_so(world_id3, 'dinosaurs')
assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 0
assert counters.get_count('internet') == 0
worlds.make_it_so(world_id1, 'trucks')
worlds.make_it_so(world_id2, 'trucks')
assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 2
assert counters.get_count('internet') == 0
worlds.make_it_so(world_id1, 'internet')
assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 2
assert counters.get_count('internet') == 1
Multi-threaded runner¶
from eventsourcing.system import MultiThreadedRunner
runner= MultiThreadedRunner(system)
runner.start()
worlds = runner.get(WorldsApplication)
counters = runner.get(Counters)
world_id1 = worlds.create_world()
world_id2 = worlds.create_world()
world_id3 = worlds.create_world()
worlds.make_it_so(world_id1, 'dinosaurs')
worlds.make_it_so(world_id2, 'dinosaurs')
worlds.make_it_so(world_id3, 'dinosaurs')
worlds.make_it_so(world_id1, 'trucks')
worlds.make_it_so(world_id2, 'trucks')
worlds.make_it_so(world_id1, 'internet')
from time import sleep
sleep(0.01)
assert counters.get_count('dinosaurs') == 3
assert counters.get_count('trucks') == 2
assert counters.get_count('internet') == 1
…
Classes¶
- class eventsourcing.system.Follower[source]¶
Bases:
eventsourcing.application.Application
[eventsourcing.domain.TAggregate
]Extends the
Application
class by using a process recorder as its application recorder, by keeping track of the applications it is following, and pulling and processing new domain event notifications through itspolicy()
method.- __init__() None [source]¶
Initialises an application with an
InfrastructureFactory
, aMapper
, anApplicationRecorder
, anEventStore
, aRepository
, and aLocalNotificationLog
.
- construct_recorder() eventsourcing.persistence.ProcessRecorder [source]¶
Constructs and returns a
ProcessRecorder
for the application to use as its application recorder.
- follow(name: str, log: eventsourcing.application.NotificationLog) None [source]¶
Constructs a notification log reader and a mapper for the named application, and adds them to its collection of readers.
- pull_and_process(name: str) None [source]¶
Pulls and processes unseen domain event notifications from the notification log reader of the names application.
Converts received event notifications to domain event objects, and then calls the
policy()
with a newProcessEvent
object which contains aTracking
object that keeps track of the name of the application and the position in its notification log from which the domain event notification was pulled. The policy will save aggregates to the process event object, using itssave()
method, which collects pending domain events using the aggregates’collect_events()
method, and the process event object will then be recorded by calling therecord()
method.
- abstract policy(domain_event: eventsourcing.domain.AggregateEvent[eventsourcing.domain.TAggregate], process_event: eventsourcing.application.ProcessEvent) None [source]¶
Abstract domain event processing policy method. Must be implemented by event processing applications. When processing the given domain event, event processing applications must use the
save()
method of the given process event object (instead of the application’ssave()
method) to collect pending events from changed aggregates, so that the new domain events will be recorded atomically with tracking information about the position of the given domain event’s notification.
- class eventsourcing.system.Promptable[source]¶
Bases:
abc.ABC
Abstract base class for “promptable” objects.
- class eventsourcing.system.Leader[source]¶
Bases:
eventsourcing.application.Application
[eventsourcing.domain.TAggregate
]Extends the
Application
class by also being responsible for keeping track of followers, and prompting followers when there are new domain event notifications to be pulled and processed.- __init__() None [source]¶
Initialises an application with an
InfrastructureFactory
, aMapper
, anApplicationRecorder
, anEventStore
, aRepository
, and aLocalNotificationLog
.
- lead(follower: eventsourcing.system.Promptable) None [source]¶
Adds given follower to a list of followers.
- notify(new_events: List[eventsourcing.domain.AggregateEvent[eventsourcing.domain.Aggregate]]) None [source]¶
Extends the application
notify()
method by callingprompt_followers()
whenever new events have just been saved.
- prompt_followers() None [source]¶
Prompts followers by calling their
receive_prompt()
methods with the name of the application.
- class eventsourcing.system.ProcessApplication[source]¶
Bases:
eventsourcing.system.Leader
[eventsourcing.domain.TAggregate
],eventsourcing.system.Follower
[eventsourcing.domain.TAggregate
],abc.ABC
Base class for event processing applications that are both “leaders” and followers”.
- class eventsourcing.system.System(pipes: Iterable[Iterable[Type[eventsourcing.application.Application[eventsourcing.domain.Aggregate]]]])[source]¶
Bases:
object
Defines a system of applications.
- __init__(pipes: Iterable[Iterable[Type[eventsourcing.application.Application[eventsourcing.domain.Aggregate]]]])[source]¶
- class eventsourcing.system.Runner(system: eventsourcing.system.System)[source]¶
Bases:
abc.ABC
Abstract base class for system runners.
- __init__(system: eventsourcing.system.System)[source]¶
- exception eventsourcing.system.RunnerAlreadyStarted[source]¶
Bases:
Exception
Raised when runner is already started.
- class eventsourcing.system.SingleThreadedRunner(system: eventsourcing.system.System)[source]¶
Bases:
eventsourcing.system.Runner
,eventsourcing.system.Promptable
Runs a
System
in a single thread. A single threaded runner is a runner, and so implements thestart()
,stop()
, andget()
methods. A single threaded runner is also aPromptable
object, and implements thereceive_prompt()
method by collecting prompted names.- __init__(system: eventsourcing.system.System)[source]¶
Initialises runner with the given
System
.
- start() None [source]¶
Starts the runner. The applications are constructed, and setup to lead and follow each other, according to the system definition. The followers are setup to follow the applications they follow (have a notification log reader with the notification log of the leader), and their leaders are setup to lead the runner itself (send prompts).
- receive_prompt(leader_name: str) None [source]¶
Receives prompt by appending name of leader to list of prompted names. Unless this method has previously been called but not yet returned, it will then proceed to forward the prompts received to its application by calling the application’s
pull_and_process()
method for each prompted name.
- class eventsourcing.system.MultiThreadedRunner(system: eventsourcing.system.System)[source]¶
Bases:
eventsourcing.system.Runner
Runs a
System
with aMultiThreadedRunnerThread
for each follower in the system definition. It is a runner, and so implements thestart()
,stop()
, andget()
methods.- __init__(system: eventsourcing.system.System)[source]¶
Initialises runner with the given
System
.
- start() None [source]¶
Starts the runner.
A multi-threaded runner thread is started for each ‘follower’ application in the system, and constructs an instance of each non-follower leader application in the system. The followers are then setup to follow the applications they follow (have a notification log reader with the notification log of the leader), and their leaders are setup to lead the follower’s thead (send prompts).
- class eventsourcing.system.MultiThreadedRunnerThread(app_class: Type[eventsourcing.system.Follower[eventsourcing.domain.Aggregate]], is_stopping: threading.Event)[source]¶
Bases:
eventsourcing.system.Promptable
,threading.Thread
Runs one process application for a
MultiThreadedRunner
.A multi-threaded runner thread is a
Promptable
object, and implements thereceive_prompt()
method by collecting prompted names and setting its threading event ‘is_prompted’.A multi-threaded runner thread is a Python
threading.Thread
object, and implements the thread’srun()
method by waiting until the ‘is_prompted’ event has been set and then calling its process application’spull_and_process()
method once for each prompted name. It is expected that the process application will have been set up by the runner with a notification log reader from which event notifications will be pulled.- __init__(app_class: Type[eventsourcing.system.Follower[eventsourcing.domain.Aggregate]], is_stopping: threading.Event)[source]¶
- run() None [source]¶
Begins by constructing an application instance from given application class and then loops forever until stopped. The loop blocks on waiting for the ‘is_prompted’ event to be set, then forwards the prompts already received to its application by calling the application’s
pull_and_process()
method for each prompted name.
- class eventsourcing.system.NotificationLogReader(notification_log: eventsourcing.application.NotificationLog, section_size: int = 10)[source]¶
Bases:
object
Reads domain event notifications from a notification log.
- __init__(notification_log: eventsourcing.application.NotificationLog, section_size: int = 10)[source]¶
Initialises a reader with the given notification log, and optionally a section size integer which determines the requested number of domain event notifications in each section retrieved from the notification log.
- read(*, start: int) Iterator[eventsourcing.persistence.Notification] [source]¶
Returns a generator that yields event notifications from the reader’s notification log, starting from given start position (a notification ID).
This method traverses the linked list of sections presented by a notification log, and yields the individual event notifications that are contained in each section. When all the event notifications from a section have been yielded, the reader will retrieve the next section, and continue yielding event notification until all subsequent event notifications in the notification log from the start position have been yielded.
- select(*, start: int) Iterator[eventsourcing.persistence.Notification] [source]¶
Returns a generator that yields event notifications from the reader’s notification log, starting from given start position (a notification ID).
This method selects a limited list of notifications from a notification log and yields event notifications individually. When all the event notifications in the list are yielded, the reader will retrieve another list, and continue yielding event notification until all subsequent event notifications in the notification log from the start position have been yielded.
interface
— Interface¶
this page is under development — please check back soon
Classes¶
- class eventsourcing.interface.NotificationLogInterface[source]¶
Bases:
abc.ABC
Abstract base class for obtaining serialised sections of a notification log.
- abstract get_log_section(section_id: str) str [source]¶
Returns a serialised
Section
from a notification log.
- abstract get_notifications(start: int, limit: int) str [source]¶
Returns a serialised list of
Notification
objects from a notification log.
- class eventsourcing.interface.NotificationLogJSONService(app: eventsourcing.application.TApplication)[source]¶
Bases:
eventsourcing.interface.NotificationLogInterface
,Generic
[eventsourcing.application.TApplication
]Presents serialised sections of a notification log.
- __init__(app: eventsourcing.application.TApplication)[source]¶
Initialises service with given application.
- get_log_section(section_id: str) str [source]¶
Returns JSON serialised
Section
from a notification log.
- get_notifications(start: int, limit: int) str [source]¶
Returns a serialised list of
Notification
objects from a notification log.
- class eventsourcing.interface.NotificationLogJSONClient(interface: eventsourcing.interface.NotificationLogInterface)[source]¶
Bases:
eventsourcing.application.NotificationLog
Presents deserialized sections of a notification log.
- __init__(interface: eventsourcing.interface.NotificationLogInterface)[source]¶
Initialises log with a given interface.
- __getitem__(section_id: str) eventsourcing.application.Section [source]¶
Returns a
Section
ofNotification
objects from the notification log.
- select(start: int, limit: int) List[eventsourcing.persistence.Notification] [source]¶
Returns a selection
Notification
objects from the notification log.
Examples¶
This library contains a few example applications of event sourcing in Python.
Bank accounts application¶
This example demonstrates a straightforward event-sourced application.
Domain model¶
The BankAccount
aggregate class is defined using the
declarative syntax. It has a
balance and an overdraft limit. Accounts can be opened and
closed. Accounts can be credited and debited, which affects
the balance. Neither credits nor debits are not allowed if
the account has been closed. Debits are not allowed if the
balance would go below the overdraft limit. The overdraft
limit can be adjusted.
from decimal import Decimal
from eventsourcing.domain import Aggregate, event
class BankAccount(Aggregate):
@event("Opened")
def __init__(self, full_name: str, email_address: str):
self.full_name = full_name
self.email_address = email_address
self.balance = Decimal("0.00")
self.overdraft_limit = Decimal("0.00")
self.is_closed = False
@event("Credited")
def credit(self, amount: Decimal) -> None:
self.check_account_is_not_closed()
self.balance += amount
@event("Debited")
def debit(self, amount: Decimal) -> None:
self.check_account_is_not_closed()
self.check_has_sufficient_funds(amount)
self.balance -= amount
@event("OverdraftLimitSet")
def set_overdraft_limit(self, overdraft_limit: Decimal) -> None:
assert overdraft_limit > Decimal("0.00")
self.check_account_is_not_closed()
self.overdraft_limit = overdraft_limit
@event("Closed")
def close(self) -> None:
self.is_closed = True
def check_account_is_not_closed(self) -> None:
if self.is_closed:
raise AccountClosedError({"account_id": self.id})
def check_has_sufficient_funds(self, amount: Decimal) -> None:
if self.balance - amount < -self.overdraft_limit:
raise InsufficientFundsError({"account_id": self.id})
class TransactionError(Exception):
pass
class AccountClosedError(TransactionError):
pass
class InsufficientFundsError(TransactionError):
pass
Application¶
The BankAccounts
application has command and query methods for interacting
with the domain model. New accounts can be opened. Existing accounts can be
closed. Deposits and withdraws can be made on open accounts. Transfers can be
made between open accounts, if there are sufficient funds on the debited account.
All actions are atomic, including transfers between accounts.
from decimal import Decimal
from uuid import UUID
from eventsourcing.application import AggregateNotFound, Application
from eventsourcing.examples.bankaccounts.domainmodel import BankAccount
class BankAccounts(Application[BankAccount]):
def open_account(self, full_name: str, email_address: str) -> UUID:
account = BankAccount(
full_name=full_name,
email_address=email_address,
)
self.save(account)
return account.id
def get_account(self, account_id: UUID) -> BankAccount:
try:
return self.repository.get(account_id)
except AggregateNotFound:
raise AccountNotFoundError(account_id)
def get_balance(self, account_id: UUID) -> Decimal:
account = self.get_account(account_id)
return account.balance
def deposit_funds(self, credit_account_id: UUID, amount: Decimal) -> None:
account = self.get_account(credit_account_id)
account.credit(amount)
self.save(account)
def withdraw_funds(self, debit_account_id: UUID, amount: Decimal) -> None:
account = self.get_account(debit_account_id)
account.debit(amount)
self.save(account)
def transfer_funds(
self,
debit_account_id: UUID,
credit_account_id: UUID,
amount: Decimal,
) -> None:
debit_account = self.get_account(debit_account_id)
credit_account = self.get_account(credit_account_id)
debit_account.debit(amount)
credit_account.credit(amount)
self.save(debit_account, credit_account)
def set_overdraft_limit(self, account_id: UUID, overdraft_limit: Decimal) -> None:
account = self.get_account(account_id)
account.set_overdraft_limit(overdraft_limit)
self.save(account)
def get_overdraft_limit(self, account_id: UUID) -> Decimal:
account = self.get_account(account_id)
return account.overdraft_limit
def close_account(self, account_id: UUID) -> None:
account = self.get_account(account_id)
account.close()
self.save(account)
class AccountNotFoundError(Exception):
pass
Test case¶
For the purpose of showing how the application object might be used, the test runs through a scenario that exercises all the methods of the application in one test method.
import unittest
from decimal import Decimal
from uuid import uuid4
from eventsourcing.examples.bankaccounts.application import (
AccountNotFoundError,
BankAccounts,
)
from eventsourcing.examples.bankaccounts.domainmodel import (
AccountClosedError,
InsufficientFundsError,
)
class TestBankAccounts(unittest.TestCase):
def test(self) -> None:
app = BankAccounts()
# Check account not found error.
with self.assertRaises(AccountNotFoundError):
app.get_balance(uuid4())
# Create account #1.
account_id1 = app.open_account(
full_name="Alice",
email_address="alice@example.com",
)
# Check balance of account #1.
self.assertEqual(app.get_balance(account_id1), Decimal("0.00"))
# Deposit funds in account #1.
app.deposit_funds(
credit_account_id=account_id1,
amount=Decimal("200.00"),
)
# Check balance of account #1.
self.assertEqual(app.get_balance(account_id1), Decimal("200.00"))
# Withdraw funds from account #1.
app.withdraw_funds(
debit_account_id=account_id1,
amount=Decimal("50.00"),
)
# Check balance of account #1.
self.assertEqual(app.get_balance(account_id1), Decimal("150.00"))
# Fail to withdraw funds from account #1- insufficient funds.
with self.assertRaises(InsufficientFundsError):
app.withdraw_funds(
debit_account_id=account_id1,
amount=Decimal("151.00"),
)
# Check balance of account #1 - should be unchanged.
self.assertEqual(app.get_balance(account_id1), Decimal("150.00"))
# Create account #2.
account_id2 = app.open_account(
full_name="Bob",
email_address="bob@example.com",
)
# Transfer funds from account #1 to account #2.
app.transfer_funds(
debit_account_id=account_id1,
credit_account_id=account_id2,
amount=Decimal("100.00"),
)
# Check balances.
self.assertEqual(app.get_balance(account_id1), Decimal("50.00"))
self.assertEqual(app.get_balance(account_id2), Decimal("100.00"))
# Fail to transfer funds - insufficient funds.
with self.assertRaises(InsufficientFundsError):
app.transfer_funds(
debit_account_id=account_id1,
credit_account_id=account_id2,
amount=Decimal("1000.00"),
)
# Check balances - should be unchanged.
self.assertEqual(app.get_balance(account_id1), Decimal("50.00"))
self.assertEqual(app.get_balance(account_id2), Decimal("100.00"))
# Close account #1.
app.close_account(account_id1)
# Fail to transfer funds - account #1 is closed.
with self.assertRaises(AccountClosedError):
app.transfer_funds(
debit_account_id=account_id1,
credit_account_id=account_id2,
amount=Decimal("50.00"),
)
# Fail to withdraw funds - account #1 is closed.
with self.assertRaises(AccountClosedError):
app.withdraw_funds(
debit_account_id=account_id1,
amount=Decimal("1.00"),
)
# Fail to deposit funds - account #1 is closed.
with self.assertRaises(AccountClosedError):
app.deposit_funds(
credit_account_id=account_id1,
amount=Decimal("1000.00"),
)
# Fail to set overdraft limit on account #1 - account is closed.
with self.assertRaises(AccountClosedError):
app.set_overdraft_limit(
account_id=account_id1,
overdraft_limit=Decimal("500.00"),
)
# Check balances - should be unchanged.
self.assertEqual(app.get_balance(account_id1), Decimal("50.00"))
self.assertEqual(app.get_balance(account_id2), Decimal("100.00"))
# Check overdraft limits - should be unchanged.
self.assertEqual(
app.get_overdraft_limit(account_id1),
Decimal("0.00"),
)
self.assertEqual(
app.get_overdraft_limit(account_id2),
Decimal("0.00"),
)
# Set overdraft limit on account #2.
app.set_overdraft_limit(
account_id=account_id2,
overdraft_limit=Decimal("500.00"),
)
# Can't set negative overdraft limit.
with self.assertRaises(AssertionError):
app.set_overdraft_limit(
account_id=account_id2,
overdraft_limit=Decimal("-500.00"),
)
# Check overdraft limit of account #2.
self.assertEqual(
app.get_overdraft_limit(account_id2),
Decimal("500.00"),
)
# Withdraw funds from account #2.
app.withdraw_funds(
debit_account_id=account_id2,
amount=Decimal("500.00"),
)
# Check balance of account #2 - should be overdrawn.
self.assertEqual(
app.get_balance(account_id2),
Decimal("-400.00"),
)
# Fail to withdraw funds from account #2 - insufficient funds.
with self.assertRaises(InsufficientFundsError):
app.withdraw_funds(
debit_account_id=account_id2,
amount=Decimal("101.00"),
)
Cargo shipping example¶
This example follows the original Cargo Shipping example that figures in the DDD book, as worked up into a running application by the DDD Sample project:
“One of the most requested aids to coming up to speed on DDD has been a running example application. Starting from a simple set of functions and a model based on the cargo example used in Eric Evans’ book, we have built a running application with which to demonstrate a practical implementation of the building block patterns as well as illustrate the impact of aggregates and bounded contexts.”
The original example was not event-sourced and was coded in Java. The example below is an event-sourced version of the original coded in Python.
Domain model¶
The aggregate Cargo
allows new cargo bookings to be made, the destination
of the cargo to be changed, a route to be assigned, and for handling events
to be registered. It is defined in the more verbose style, using explicit
definitions of aggregate events, an explicit “created” method, and command
methods that explicitly trigger events by calling trigger_event()
.
An aggregate projector function is implemented on the aggregate object using
@simpledispatchmethod
, with an event-specific method registered to handle
each type of aggregate event.
Custom value objects such as Location
and Itinerary
, are defined as
part of the domain model, and used in the Cargo
aggregate events and methods.
For the purpose of simplicity in this example, a fixed collection of routes between
locations are also defined, but in practice these would be editable and could be
also modelled as event-sourced aggregates.
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union
from uuid import UUID, uuid4
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import TZINFO, Aggregate
class Location(Enum):
"""
Locations in the world.
"""
HAMBURG = "HAMBURG"
HONGKONG = "HONGKONG"
NEWYORK = "NEWYORK"
STOCKHOLM = "STOCKHOLM"
TOKYO = "TOKYO"
NLRTM = "NLRTM"
USDAL = "USDAL"
AUMEL = "AUMEL"
class Leg(object):
"""
Leg of an itinerary.
"""
def __init__(
self,
origin: str,
destination: str,
voyage_number: str,
):
self.origin: str = origin
self.destination: str = destination
self.voyage_number: str = voyage_number
class Itinerary(object):
"""
An itinerary along which cargo is shipped.
"""
def __init__(
self,
origin: str,
destination: str,
legs: Tuple[Leg, ...],
):
self.origin = origin
self.destination = destination
self.legs = legs
class HandlingActivity(Enum):
RECEIVE = "RECEIVE"
LOAD = "LOAD"
UNLOAD = "UNLOAD"
CLAIM = "CLAIM"
# Custom static types.
LegDetails = Dict[str, str]
ItineraryDetails = Dict[str, Union[str, List[LegDetails]]]
NextExpectedActivity = Optional[Tuple[HandlingActivity, Location, str]]
# Some routes from one location to another.
REGISTERED_ROUTES = {
("HONGKONG", "STOCKHOLM"): [
Itinerary(
origin="HONGKONG",
destination="STOCKHOLM",
legs=(
Leg(
origin="HONGKONG",
destination="NEWYORK",
voyage_number="V1",
),
Leg(
origin="NEWYORK",
destination="STOCKHOLM",
voyage_number="V2",
),
),
)
],
("TOKYO", "STOCKHOLM"): [
Itinerary(
origin="TOKYO",
destination="STOCKHOLM",
legs=(
Leg(
origin="TOKYO",
destination="HAMBURG",
voyage_number="V3",
),
Leg(
origin="HAMBURG",
destination="STOCKHOLM",
voyage_number="V4",
),
),
)
],
}
class Cargo(Aggregate):
"""
The Cargo aggregate is an event-sourced domain model aggregate that
specifies the routing from origin to destination, and can track what
happens to the cargo after it has been booked.
"""
def __init__(
self,
origin: Location,
destination: Location,
arrival_deadline: datetime,
):
self._origin: Location = origin
self._destination: Location = destination
self._arrival_deadline: datetime = arrival_deadline
self._transport_status: str = "NOT_RECEIVED"
self._routing_status: str = "NOT_ROUTED"
self._is_misdirected: bool = False
self._estimated_time_of_arrival: Optional[datetime] = None
self._next_expected_activity: NextExpectedActivity = None
self._route: Optional[Itinerary] = None
self._last_known_location: Optional[Location] = None
self._current_voyage_number: Optional[str] = None
@property
def origin(self) -> Location:
return self._origin
@property
def destination(self) -> Location:
return self._destination
@property
def arrival_deadline(self) -> datetime:
return self._arrival_deadline
@property
def transport_status(self) -> str:
return self._transport_status
@property
def routing_status(self) -> str:
return self._routing_status
@property
def is_misdirected(self) -> bool:
return self._is_misdirected
@property
def estimated_time_of_arrival(
self,
) -> Optional[datetime]:
return self._estimated_time_of_arrival
@property
def next_expected_activity(self) -> NextExpectedActivity:
return self._next_expected_activity
@property
def route(self) -> Optional[Itinerary]:
return self._route
@property
def last_known_location(self) -> Optional[Location]:
return self._last_known_location
@property
def current_voyage_number(self) -> Optional[str]:
return self._current_voyage_number
@classmethod
def new_booking(
cls,
origin: Location,
destination: Location,
arrival_deadline: datetime,
) -> "Cargo":
return cls._create(
event_class=cls.BookingStarted,
id=uuid4(),
origin=origin,
destination=destination,
arrival_deadline=arrival_deadline,
)
class BookingStarted(Aggregate.Created["Cargo"]):
origin: Location
destination: Location
arrival_deadline: datetime
class Event(Aggregate.Event["Cargo"]):
def apply(self, aggregate: "Cargo") -> None:
aggregate.apply(self)
@singledispatchmethod
def apply(self, event: Event) -> None:
"""
Default method to apply an aggregate event to the aggregate object.
"""
def change_destination(self, destination: Location) -> None:
self.trigger_event(
self.DestinationChanged,
destination=destination,
)
class DestinationChanged(Event):
destination: Location
@apply.register(DestinationChanged)
def destination_changed(self, event: DestinationChanged) -> None:
self._destination = event.destination
def assign_route(self, itinerary: Itinerary) -> None:
self.trigger_event(self.RouteAssigned, route=itinerary)
class RouteAssigned(Event):
route: Itinerary
@apply.register(RouteAssigned)
def route_assigned(self, event: RouteAssigned) -> None:
self._route = event.route
self._routing_status = "ROUTED"
self._estimated_time_of_arrival = datetime.now(tz=TZINFO) + timedelta(weeks=1)
self._next_expected_activity = (HandlingActivity.RECEIVE, self.origin, "")
self._is_misdirected = False
def register_handling_event(
self,
tracking_id: UUID,
voyage_number: Optional[str],
location: Location,
handling_activity: HandlingActivity,
) -> None:
self.trigger_event(
self.HandlingEventRegistered,
tracking_id=tracking_id,
voyage_number=voyage_number,
location=location,
handling_activity=handling_activity,
)
class HandlingEventRegistered(Event):
tracking_id: UUID
voyage_number: str
location: Location
handling_activity: str
@apply.register(HandlingEventRegistered)
def handling_event_registered(self, event: HandlingEventRegistered) -> None:
assert self.route is not None
if event.handling_activity == HandlingActivity.RECEIVE:
self._transport_status = "IN_PORT"
self._last_known_location = event.location
self._next_expected_activity = (
HandlingActivity.LOAD,
event.location,
self.route.legs[0].voyage_number,
)
elif event.handling_activity == HandlingActivity.LOAD:
self._transport_status = "ONBOARD_CARRIER"
self._current_voyage_number = event.voyage_number
for leg in self.route.legs:
if leg.origin == event.location.value:
if leg.voyage_number == event.voyage_number:
self._next_expected_activity = (
HandlingActivity.UNLOAD,
Location[leg.destination],
event.voyage_number,
)
break
else:
raise Exception(
"Can't find leg with origin={} and "
"voyage_number={}".format(
event.location,
event.voyage_number,
)
)
elif event.handling_activity == HandlingActivity.UNLOAD:
self._current_voyage_number = None
self._last_known_location = event.location
self._transport_status = "IN_PORT"
if event.location == self.destination:
self._next_expected_activity = (
HandlingActivity.CLAIM,
event.location,
"",
)
elif event.location.value in [leg.destination for leg in self.route.legs]:
for i, leg in enumerate(self.route.legs):
if leg.voyage_number == event.voyage_number:
next_leg: Leg = self.route.legs[i + 1]
assert Location[next_leg.origin] == event.location
self._next_expected_activity = (
HandlingActivity.LOAD,
event.location,
next_leg.voyage_number,
)
break
else:
self._is_misdirected = True
self._next_expected_activity = None
elif event.handling_activity == HandlingActivity.CLAIM:
self._next_expected_activity = None
self._transport_status = "CLAIMED"
else:
raise Exception(
"Unsupported handling event: {}".format(event.handling_activity)
)
Application¶
The application object BookingService
allows new cargo to be booked, cargo details
to be presented, the destination of cargo to be changed, choices of possible routes
for cargo to be presented, a route to be assigned, and for cargo handling events
to be registered.
The Booking
application defines and registers custom transcodings for the
custom value objects that are defined and used in the domain model.
from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import UUID
from eventsourcing.application import Application
from eventsourcing.examples.cargoshipping.domainmodel import (
REGISTERED_ROUTES,
Cargo,
HandlingActivity,
Itinerary,
Leg,
Location,
)
from eventsourcing.persistence import Transcoder, Transcoding
class LocationAsName(Transcoding):
type = Location
name = "location"
def encode(self, obj: Location) -> str:
return obj.name
def decode(self, data: str) -> Location:
assert isinstance(data, str)
return Location[data]
class HandlingActivityAsName(Transcoding):
type = HandlingActivity
name = "handling_activity"
def encode(self, obj: HandlingActivity) -> str:
return obj.name
def decode(self, data: str) -> HandlingActivity:
assert isinstance(data, str)
return HandlingActivity[data]
class ItineraryAsDict(Transcoding):
type = Itinerary
name = "itinerary"
def encode(self, obj: Itinerary) -> Dict[str, Any]:
return obj.__dict__
def decode(self, data: Dict[str, Any]) -> Itinerary:
assert isinstance(data, dict)
return Itinerary(**data)
class LegAsDict(Transcoding):
type = Leg
name = "leg"
def encode(self, obj: Leg) -> Dict[str, Any]:
return obj.__dict__
def decode(self, data: Dict[str, Any]) -> Leg:
assert isinstance(data, dict)
return Leg(**data)
class BookingApplication(Application[Cargo]):
def register_transcodings(self, transcoder: Transcoder) -> None:
super(BookingApplication, self).register_transcodings(transcoder)
transcoder.register(LocationAsName())
transcoder.register(HandlingActivityAsName())
transcoder.register(ItineraryAsDict())
transcoder.register(LegAsDict())
def book_new_cargo(
self,
origin: Location,
destination: Location,
arrival_deadline: datetime,
) -> UUID:
cargo = Cargo.new_booking(origin, destination, arrival_deadline)
self.save(cargo)
return cargo.id
def change_destination(self, tracking_id: UUID, destination: Location) -> None:
cargo = self.get_cargo(tracking_id)
cargo.change_destination(destination)
self.save(cargo)
def request_possible_routes_for_cargo(self, tracking_id: UUID) -> List[Itinerary]:
cargo = self.get_cargo(tracking_id)
from_location = (cargo.last_known_location or cargo.origin).value
to_location = cargo.destination.value
try:
possible_routes = REGISTERED_ROUTES[(from_location, to_location)]
except KeyError:
raise Exception(
"Can't find routes from {} to {}".format(from_location, to_location)
)
return possible_routes
def assign_route(self, tracking_id: UUID, itinerary: Itinerary) -> None:
cargo = self.get_cargo(tracking_id)
cargo.assign_route(itinerary)
self.save(cargo)
def register_handling_event(
self,
tracking_id: UUID,
voyage_number: Optional[str],
location: Location,
handing_activity: HandlingActivity,
) -> None:
cargo = self.get_cargo(tracking_id)
cargo.register_handling_event(
tracking_id,
voyage_number,
location,
handing_activity,
)
self.save(cargo)
def get_cargo(self, tracking_id: UUID) -> Cargo:
return self.repository.get(tracking_id)
Interface¶
The interface object BookingService
repeats the application methods, allowing
new cargo to be booked, cargo details to be presented, the destination of cargo to
be changed, choices of possible routes for cargo to be presented, a route to be
assigned, and for cargo handling events to be registered.
It allows clients (e.g. a test case, or Web interface) to deal with simple object
types that can be easily serialised and deserialised. It interacts with the
application using the custom value objects defined in the domain model. For
the purposes of testing, we need to simulate the user selecting a preferred
itinerary from a list, which we do by picking the first in the list of presented
options using the select_preferred_itinerary()
function.
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Union
from uuid import UUID
from eventsourcing.examples.cargoshipping.application import BookingApplication
from eventsourcing.examples.cargoshipping.domainmodel import (
HandlingActivity,
Itinerary,
ItineraryDetails,
LegDetails,
Location,
)
NextExpectedActivityDetails = Optional[Tuple[str, ...]]
CargoDetails = Dict[
str, Optional[Union[str, bool, datetime, NextExpectedActivityDetails]]
]
class BookingService(object):
"""
Presents an application interface that uses
simple types of object (str, bool, datetime).
"""
def __init__(self, app: BookingApplication):
self.app = app
def book_new_cargo(
self,
origin: str,
destination: str,
arrival_deadline: datetime,
) -> str:
tracking_id = self.app.book_new_cargo(
Location[origin],
Location[destination],
arrival_deadline,
)
return str(tracking_id)
def get_cargo_details(self, tracking_id: str) -> CargoDetails:
cargo = self.app.get_cargo(UUID(tracking_id))
# Present 'next_expected_activity'.
next_expected_activity: NextExpectedActivityDetails
if cargo.next_expected_activity is None:
next_expected_activity = None
elif len(cargo.next_expected_activity) == 2:
next_expected_activity = (
cargo.next_expected_activity[0].value,
cargo.next_expected_activity[1].value,
)
elif len(cargo.next_expected_activity) == 3:
next_expected_activity = (
cargo.next_expected_activity[0].value,
cargo.next_expected_activity[1].value,
cargo.next_expected_activity[2],
)
else:
raise Exception(
"Invalid next expected activity: {}".format(
cargo.next_expected_activity
)
)
# Present 'last_known_location'.
if cargo.last_known_location is None:
last_known_location = None
else:
last_known_location = cargo.last_known_location.value
# Present the cargo details.
return {
"id": str(cargo.id),
"origin": cargo.origin.value,
"destination": cargo.destination.value,
"arrival_deadline": cargo.arrival_deadline,
"transport_status": cargo.transport_status,
"routing_status": cargo.routing_status,
"is_misdirected": cargo.is_misdirected,
"estimated_time_of_arrival": cargo.estimated_time_of_arrival,
"next_expected_activity": next_expected_activity,
"last_known_location": last_known_location,
"current_voyage_number": cargo.current_voyage_number,
}
def change_destination(self, tracking_id: str, destination: str) -> None:
self.app.change_destination(UUID(tracking_id), Location[destination])
def request_possible_routes_for_cargo(
self, tracking_id: str
) -> List[ItineraryDetails]:
routes = self.app.request_possible_routes_for_cargo(UUID(tracking_id))
return [self.dict_from_itinerary(route) for route in routes]
def dict_from_itinerary(self, itinerary: Itinerary) -> ItineraryDetails:
legs_details = []
for leg in itinerary.legs:
leg_details: LegDetails = {
"origin": leg.origin,
"destination": leg.destination,
"voyage_number": leg.voyage_number,
}
legs_details.append(leg_details)
route_details: ItineraryDetails = {
"origin": itinerary.origin,
"destination": itinerary.destination,
"legs": legs_details,
}
return route_details
def assign_route(
self,
tracking_id: str,
route_details: ItineraryDetails,
) -> None:
routes = self.app.request_possible_routes_for_cargo(UUID(tracking_id))
for route in routes:
if route_details == self.dict_from_itinerary(route):
self.app.assign_route(UUID(tracking_id), route)
def register_handling_event(
self,
tracking_id: str,
voyage_number: Optional[str],
location: str,
handling_activity: str,
) -> None:
self.app.register_handling_event(
UUID(tracking_id),
voyage_number,
Location[location],
HandlingActivity[handling_activity],
)
# Stub function that picks an itinerary from a list of possible itineraries.
def select_preferred_itinerary(
itineraries: List[ItineraryDetails],
) -> ItineraryDetails:
return itineraries[0]
Test case¶
Following the sample project, the test case has two test methods. One test shows an administrator booking a new cargo, viewing the current state of the cargo, and changing the destination. The other test goes further by assigning a route to a cargo booking, tracking the cargo handling events as it is shipped around the world, recovering by assigning a new route after the cargo was unloaded in the wrong place, until finally the cargo is claimed at its correct destination.
import unittest
from datetime import datetime, timedelta
from eventsourcing.domain import TZINFO
from eventsourcing.examples.cargoshipping.application import BookingApplication
from eventsourcing.examples.cargoshipping.interface import (
BookingService,
select_preferred_itinerary,
)
class TestBookingService(unittest.TestCase):
def setUp(self) -> None:
self.service = BookingService(BookingApplication())
def test_admin_can_book_new_cargo(self) -> None:
arrival_deadline = datetime.now(tz=TZINFO) + timedelta(weeks=3)
cargo_id = self.service.book_new_cargo(
origin="NLRTM",
destination="USDAL",
arrival_deadline=arrival_deadline,
)
cargo_details = self.service.get_cargo_details(cargo_id)
self.assertTrue(cargo_details["id"])
self.assertEqual(cargo_details["origin"], "NLRTM")
self.assertEqual(cargo_details["destination"], "USDAL")
self.service.change_destination(cargo_id, destination="AUMEL")
cargo_details = self.service.get_cargo_details(cargo_id)
self.assertEqual(cargo_details["destination"], "AUMEL")
self.assertEqual(
cargo_details["arrival_deadline"],
arrival_deadline,
)
def test_scenario_cargo_from_hongkong_to_stockholm(
self,
) -> None:
# Test setup: A cargo should be shipped from
# Hongkong to Stockholm, and it should arrive
# in no more than two weeks.
origin = "HONGKONG"
destination = "STOCKHOLM"
arrival_deadline = datetime.now(tz=TZINFO) + timedelta(weeks=2)
# Use case 1: booking.
# A new cargo is booked, and the unique tracking
# id is assigned to the cargo.
tracking_id = self.service.book_new_cargo(origin, destination, arrival_deadline)
# The tracking id can be used to lookup the cargo
# in the repository.
# Important: The cargo, and thus the domain model,
# is responsible for determining the status of the
# cargo, whether it is on the right track or not
# and so on. This is core domain logic. Tracking
# the cargo basically amounts to presenting
# information extracted from the cargo aggregate
# in a suitable way.
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(
cargo_details["transport_status"],
"NOT_RECEIVED",
)
self.assertEqual(cargo_details["routing_status"], "NOT_ROUTED")
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(
cargo_details["estimated_time_of_arrival"],
None,
)
self.assertEqual(cargo_details["next_expected_activity"], None)
# Use case 2: routing.
#
# A number of possible routes for this cargo is
# requested and may be presented to the customer
# in some way for him/her to choose from.
# Selection could be affected by things like price
# and time of delivery, but this test simply uses
# an arbitrary selection to mimic that process.
itineraries = self.service.request_possible_routes_for_cargo(tracking_id)
route_details = select_preferred_itinerary(itineraries)
# The cargo is then assigned to the selected
# route, described by an itinerary.
self.service.assign_route(tracking_id, route_details)
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(
cargo_details["transport_status"],
"NOT_RECEIVED",
)
self.assertEqual(cargo_details["routing_status"], "ROUTED")
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertTrue(cargo_details["estimated_time_of_arrival"])
self.assertEqual(
cargo_details["next_expected_activity"],
("RECEIVE", "HONGKONG", ""),
)
# Use case 3: handling
# A handling event registration attempt will be
# formed from parsing the data coming in as a
# handling report either via the web service
# interface or as an uploaded CSV file. The
# handling event factory tries to create a
# HandlingEvent from the attempt, and if the
# factory decides that this is a plausible
# handling event, it is stored. If the attempt
# is invalid, for example if no cargo exists for
# the specified tracking id, the attempt is
# rejected.
#
# Handling begins: cargo is received in Hongkong.
self.service.register_handling_event(tracking_id, None, "HONGKONG", "RECEIVE")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["transport_status"], "IN_PORT")
self.assertEqual(
cargo_details["last_known_location"],
"HONGKONG",
)
self.assertEqual(
cargo_details["next_expected_activity"],
("LOAD", "HONGKONG", "V1"),
)
# Load onto voyage V1.
self.service.register_handling_event(tracking_id, "V1", "HONGKONG", "LOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], "V1")
self.assertEqual(
cargo_details["last_known_location"],
"HONGKONG",
)
self.assertEqual(
cargo_details["transport_status"],
"ONBOARD_CARRIER",
)
self.assertEqual(
cargo_details["next_expected_activity"],
("UNLOAD", "NEWYORK", "V1"),
)
# Incorrectly unload in Tokyo.
self.service.register_handling_event(tracking_id, "V1", "TOKYO", "UNLOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], None)
self.assertEqual(cargo_details["last_known_location"], "TOKYO")
self.assertEqual(cargo_details["transport_status"], "IN_PORT")
self.assertEqual(cargo_details["is_misdirected"], True)
self.assertEqual(cargo_details["next_expected_activity"], None)
# Reroute.
itineraries = self.service.request_possible_routes_for_cargo(tracking_id)
route_details = select_preferred_itinerary(itineraries)
self.service.assign_route(tracking_id, route_details)
# Load in Tokyo.
self.service.register_handling_event(tracking_id, "V3", "TOKYO", "LOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], "V3")
self.assertEqual(cargo_details["last_known_location"], "TOKYO")
self.assertEqual(
cargo_details["transport_status"],
"ONBOARD_CARRIER",
)
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(
cargo_details["next_expected_activity"],
("UNLOAD", "HAMBURG", "V3"),
)
# Unload in Hamburg.
self.service.register_handling_event(tracking_id, "V3", "HAMBURG", "UNLOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], None)
self.assertEqual(cargo_details["last_known_location"], "HAMBURG")
self.assertEqual(cargo_details["transport_status"], "IN_PORT")
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(
cargo_details["next_expected_activity"],
("LOAD", "HAMBURG", "V4"),
)
# Load in Hamburg
self.service.register_handling_event(tracking_id, "V4", "HAMBURG", "LOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], "V4")
self.assertEqual(cargo_details["last_known_location"], "HAMBURG")
self.assertEqual(
cargo_details["transport_status"],
"ONBOARD_CARRIER",
)
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(
cargo_details["next_expected_activity"],
("UNLOAD", "STOCKHOLM", "V4"),
)
# Unload in Stockholm
self.service.register_handling_event(tracking_id, "V4", "STOCKHOLM", "UNLOAD")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], None)
self.assertEqual(
cargo_details["last_known_location"],
"STOCKHOLM",
)
self.assertEqual(cargo_details["transport_status"], "IN_PORT")
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(
cargo_details["next_expected_activity"],
("CLAIM", "STOCKHOLM", ""),
)
# Finally, cargo is claimed in Stockholm.
self.service.register_handling_event(tracking_id, None, "STOCKHOLM", "CLAIM")
cargo_details = self.service.get_cargo_details(tracking_id)
self.assertEqual(cargo_details["current_voyage_number"], None)
self.assertEqual(
cargo_details["last_known_location"],
"STOCKHOLM",
)
self.assertEqual(cargo_details["transport_status"], "CLAIMED")
self.assertEqual(cargo_details["is_misdirected"], False)
self.assertEqual(cargo_details["next_expected_activity"], None)
Wiki application¶
This example demonstrates the use of version 5 UUIDs for both discovery of aggregate IDs and also to implement an application-wide rule (or “invariant”), the use of the declarative syntax for domain models with a “non-trivial” command method, automatic snapshotting, automatic setting of a common attribute on all events without needing to mention this attribute in the command methods, and a recipe for an event-sourced log.
Domain model¶
In the domain model below, the Page
aggregate has a base class Event
which is defined with a user_id
dataclass field
that is defined not
to be included in its __init__
method, and so does not need to be matched
by parameters in the command method signatures. It has a default factory which
gets the event attribute value from a Python context variable. This base aggregate
event class is inherited by all its concrete aggregate event classes.
The update_body()
command method does a “non-trival” amount of work
before the BodyUpdated
event is triggered, by creating a “diff” of the
current version of the body
and the new version. It then triggers an event,
which contains the diff. The event is applied to the body
by “patching” the
current version of the body
with this diff.
The Index
aggregate has a version 5 UUID which is a function of a slug
.
The Index
and Page
aggregates are used in combination to maintain editable
pages of text, with editable titles, and with editable “slugs” that can be used in page URLs.
A PageLogged
event is also defined, and used to define a “page log” in the application.
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
from uuid import NAMESPACE_URL, UUID, uuid5
from eventsourcing.domain import Aggregate, AggregateEvent, event
from eventsourcing.examples.wiki.utils import apply_patch, create_diff
user_id_cvar: ContextVar[Optional[UUID]] = ContextVar("user_id", default=None)
@dataclass
class Page(Aggregate):
title: str
slug: str
body: str = ""
modified_by: Optional[UUID] = field(default=None, init=False)
class Event(Aggregate.Event["Page"]):
user_id: Optional[UUID] = field(default_factory=user_id_cvar.get, init=False)
def apply(self, aggregate: "Page") -> None:
aggregate.modified_by = self.user_id
@event("SlugUpdated")
def update_slug(self, slug: str) -> None:
self.slug = slug
@event("TitleUpdated")
def update_title(self, title: str) -> None:
self.title = title
def update_body(self, body: str) -> None:
self._update_body(create_diff(old=self.body, new=body))
@event("BodyUpdated")
def _update_body(self, diff: str) -> None:
self.body = apply_patch(old=self.body, diff=diff)
@dataclass
class Index(Aggregate):
slug: str
ref: Optional[UUID]
@staticmethod
def create_id(slug: str) -> UUID:
return uuid5(NAMESPACE_URL, f"/slugs/{slug}")
@event("RefChanged")
def update_ref(self, ref: Optional[UUID]) -> None:
self.ref = ref
class PageLogged(AggregateEvent[Aggregate]):
page_id: UUID
The create_diff()
and apply_patch()
functions use the Unix command line
tools patch
and diff
.
import os
from tempfile import TemporaryDirectory
def create_diff(old: str, new: str) -> str:
return run("diff %s %s > %s", old, new)
def apply_patch(old: str, diff: str) -> str:
return run("patch -s %s %s -o %s", old, diff)
def run(cmd: str, a: str, b: str) -> str:
with TemporaryDirectory() as td:
a_path = os.path.join(td, "a")
b_path = os.path.join(td, "b")
c_path = os.path.join(td, "c")
with open(a_path, "w") as a_file:
a_file.write(a)
with open(b_path, "w") as b_file:
b_file.write(b)
os.system(cmd % (a_path, b_path, c_path))
with open(c_path, "r") as c_file:
return c_file.read()
Application¶
The application provides methods to create a new page, get the details for a page by its
slug, update the title of a page referenced by its slug, update the body of a page,
and change the page slug. Please note that none of these methods mention a user_id
argument. To get to a page, the slug is used to identify an index, and the index is used
to get the page ID, and then the page ID is used to get the body and title of
the page. To change a slug, the index objects for the old and the new are identified,
the page ID is removed as the reference from the old index and set as the reference
on the new index. The indexes are also used to implement a application-wide rule (or
“invariant”) that a slug can be used by only one page, such that if an attempt is made
to change the slug of one page to a slug that is already being used by another page,
then a SlugConflictError
will be raised, and no changes made.
The application also demonstrates the “event-sourced log” recipe, by showing how all the
IDs of the Page
aggregates can be listed, by logging the IDs when a new page is
created, in a sequence of stored events, and then selecting from this sequence when
presenting a list of pages.
from typing import (
Any,
Dict,
Generic,
Iterator,
Mapping,
Optional,
Type,
Union,
cast,
)
from uuid import NAMESPACE_URL, UUID, uuid5
from eventsourcing.application import AggregateNotFound, Application
from eventsourcing.domain import Aggregate, AggregateEvent, TDomainEvent
from eventsourcing.examples.wiki.domainmodel import Index, Page, PageLogged
from eventsourcing.persistence import EventStore
PageDetailsType = Dict[str, Union[str, Any]]
class WikiApplication(Application[Aggregate]):
env = {"COMPRESSOR_TOPIC": "gzip"}
snapshotting_intervals = {Page: 5}
def __init__(self, env: Optional[Mapping[str, str]] = None) -> None:
super().__init__(env)
self.page_log: Log[PageLogged] = Log(
self.events, uuid5(NAMESPACE_URL, "/page_log"), PageLogged
)
def create_page(self, title: str, slug: str) -> None:
page = Page(title=title, slug=slug)
page_logged = self.page_log.trigger_event(page_id=page.id)
index_entry = Index(slug, ref=page.id)
self.save(page, page_logged, index_entry)
def get_page_details(self, slug: str) -> PageDetailsType:
page = self._get_page_by_slug(slug)
return self._details_from_page(page)
def _details_from_page(self, page: Page) -> PageDetailsType:
return {
"title": page.title,
"slug": page.slug,
"body": page.body,
"modified_by": page.modified_by,
}
def update_title(self, slug: str, title: str) -> None:
page = self._get_page_by_slug(slug)
page.update_title(title=title)
self.save(page)
def update_slug(self, old_slug: str, new_slug: str) -> None:
page = self._get_page_by_slug(old_slug)
page.update_slug(new_slug)
old_index = self._get_index(old_slug)
old_index.update_ref(None)
try:
new_index = self._get_index(new_slug)
except AggregateNotFound:
new_index = Index(new_slug, page.id)
else:
if new_index.ref is None:
new_index.update_ref(page.id)
else:
raise SlugConflictError()
self.save(page, old_index, new_index)
def update_body(self, slug: str, body: str) -> None:
page = self._get_page_by_slug(slug)
page.update_body(body)
self.save(page)
def _get_page_by_slug(self, slug: str) -> Page:
try:
index = self._get_index(slug)
except AggregateNotFound:
raise PageNotFound(slug)
if index.ref is None:
raise PageNotFound(slug)
page_id = index.ref
return self._get_page_by_id(page_id)
def _get_page_by_id(self, page_id: UUID) -> Page:
return cast(Page, self.repository.get(page_id))
def _get_index(self, slug: str) -> Index:
return cast(Index, self.repository.get(Index.create_id(slug)))
def get_pages(self, limit: int = 10, offset: int = 0) -> Iterator[PageDetailsType]:
for page_logged in self.page_log.get(limit, offset):
page = self._get_page_by_id(page_logged.page_id)
yield self._details_from_page(page)
class Log(Generic[TDomainEvent]):
def __init__(
self,
events: EventStore[AggregateEvent[Aggregate]],
originator_id: UUID,
logged_cls: Type[TDomainEvent],
):
self.events = events
self.originator_id = originator_id
self.logged_cls = logged_cls
def trigger_event(self, **kwargs: Any) -> TDomainEvent:
last_logged = self._get_last_logged()
if last_logged:
next_originator_version = last_logged.originator_version + 1
else:
next_originator_version = Aggregate.INITIAL_VERSION
return self.logged_cls( # type: ignore
originator_id=self.originator_id,
originator_version=next_originator_version,
timestamp=self.logged_cls.create_timestamp(),
**kwargs,
)
def get(self, limit: int = 10, offset: int = 0) -> Iterator[TDomainEvent]:
# Calculate lte.
lte = None
if offset > 0:
last = self._get_last_logged()
if last:
lte = last.originator_version - offset
# Get logged events.
return cast(
Iterator[TDomainEvent],
self.events.get(
originator_id=self.originator_id,
lte=lte,
desc=True,
limit=limit,
),
)
def _get_last_logged(
self,
) -> Optional[TDomainEvent]:
events = self.events.get(originator_id=self.originator_id, desc=True, limit=1)
try:
return cast(TDomainEvent, next(events))
except StopIteration:
return None
class PageNotFound(Exception):
"""
Raised when a page is not found.
"""
class SlugConflictError(Exception):
"""
Raised when updating a page to a slug used by another page.
"""
Test case¶
The test case below sets a user ID in the context variable. A page is created and updated in various ways. At the end, all the page events are checked to make sure they all have the user ID that was set in the context variable.
from typing import cast
from unittest import TestCase
from uuid import uuid4
from eventsourcing.examples.wiki.application import (
PageNotFound,
SlugConflictError,
WikiApplication,
)
from eventsourcing.examples.wiki.domainmodel import Index, Page, user_id_cvar
from eventsourcing.system import NotificationLogReader
class TestWiki(TestCase):
def test(self) -> None:
# Set user_id context variable.
user_id = uuid4()
user_id_cvar.set(user_id)
# Construct application.
app = WikiApplication()
# Check the page doesn't exist.
with self.assertRaises(PageNotFound):
app.get_page_details(slug="welcome")
# Check the list of pages is empty.
pages = list(app.get_pages())
self.assertEqual(len(pages), 0)
pages = list(app.get_pages(offset=1))
self.assertEqual(len(pages), 0)
# Create a page.
app.create_page(title="Welcome", slug="welcome")
# Present page identified by the given slug.
page = app.get_page_details(slug="welcome")
# Check we got a dict that has the given title and slug.
self.assertEqual(page["title"], "Welcome")
self.assertEqual(page["slug"], "welcome")
self.assertEqual(page["body"], "")
self.assertEqual(page["modified_by"], user_id)
# Update the title.
app.update_title(slug="welcome", title="Welcome Visitors")
# Check the title was updated.
page = app.get_page_details(slug="welcome")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)
# Update the slug.
app.update_slug(old_slug="welcome", new_slug="welcome-visitors")
# Check the index was updated.
with self.assertRaises(PageNotFound):
app.get_page_details(slug="welcome")
# Check we can get the page by the new slug.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["slug"], "welcome-visitors")
# Update the body.
app.update_body(slug="welcome-visitors", body="Welcome to my wiki")
# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["body"], "Welcome to my wiki")
# Update the body.
app.update_body(slug="welcome-visitors", body="Welcome to this wiki")
# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["body"], "Welcome to this wiki")
# Update the body.
app.update_body(
slug="welcome-visitors",
body="""
Welcome to this wiki!
This is a wiki about...
""",
)
# Check the body was updated.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(
page["body"],
"""
Welcome to this wiki!
This is a wiki about...
""",
)
# Check all the Page events have the user_id.
for notification in NotificationLogReader(app.log).read(start=1):
domain_event = app.mapper.to_domain_event(notification)
if isinstance(domain_event, Page.Event):
self.assertEqual(domain_event.user_id, user_id)
# Change user_id context variable.
user_id = uuid4()
user_id_cvar.set(user_id)
# Update the body.
app.update_body(
slug="welcome-visitors",
body="""
Welcome to this wiki!
This is a wiki about us!
""",
)
# Check 'modified_by' changed.
page = app.get_page_details(slug="welcome-visitors")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)
# Check a snapshot was created by now.
assert app.snapshots
index = cast(Index, app.repository.get(Index.create_id("welcome-visitors")))
assert index.ref
self.assertTrue(len(list(app.snapshots.get(index.ref))))
# Create some more pages and list all the pages.
app.create_page("Page 2", "page-2")
app.create_page("Page 3", "page-3")
app.create_page("Page 4", "page-4")
app.create_page("Page 5", "page-5")
pages = list(app.get_pages())
self.assertEqual(pages[0]["title"], "Page 5")
self.assertEqual(pages[0]["slug"], "page-5")
self.assertEqual(pages[1]["title"], "Page 4")
self.assertEqual(pages[1]["slug"], "page-4")
self.assertEqual(pages[2]["title"], "Page 3")
self.assertEqual(pages[2]["slug"], "page-3")
self.assertEqual(pages[3]["title"], "Page 2")
self.assertEqual(pages[3]["slug"], "page-2")
self.assertEqual(pages[4]["title"], "Welcome Visitors")
self.assertEqual(pages[4]["slug"], "welcome-visitors")
pages = list(app.get_pages(limit=3))
self.assertEqual(len(pages), 3)
self.assertEqual(pages[0]["slug"], "page-5")
self.assertEqual(pages[1]["slug"], "page-4")
self.assertEqual(pages[2]["slug"], "page-3")
pages = list(app.get_pages(limit=3, offset=3))
self.assertEqual(len(pages), 2)
self.assertEqual(pages[0]["slug"], "page-2")
self.assertEqual(pages[1]["slug"], "welcome-visitors")
pages = list(app.get_pages(offset=3))
self.assertEqual(len(pages), 2)
self.assertEqual(pages[0]["slug"], "page-2")
self.assertEqual(pages[1]["slug"], "welcome-visitors")
# Check we can't change the slug of a page to one
# that is being used by another page.
with self.assertRaises(SlugConflictError):
app.update_slug("page-2", "page-3")
# Check we can change the slug of a page to one
# that was previously being used.
app.update_slug("welcome-visitors", "welcome")
page = app.get_page_details(slug="welcome")
self.assertEqual(page["title"], "Welcome Visitors")
self.assertEqual(page["modified_by"], user_id)
Release notes¶
It is the aim of the project that releases with the same major version number are backwards compatible, within the scope of the documented examples. New major versions indicate backwards incompatible changes have been introduced since the previous major version. New minor version indicate new functionality has been added, or existing functionality extended. New point version indicates existing code or documentation has been improved in a way that neither breaks backwards compatibility nor extends the functionality of the library.
Version 9.x¶
Version 9.x series is a rewrite of the library that distills most of the best parts of the previous versions of the library into faster and simpler code. This version is recommended for new projects. It is not backwards-compatible with previous major versions. However the underlying principles are the same, and so conversion of code and stored events is very possible.
Version 9.1.5 (released 17 November 2021)¶
Improved the documentation, examples, and tests. Fixed PostgreSQL recorder to use bigint for notification_id in tracking table, and to lock table only when inserting stored events into a total order (ie not when inserting snapshots). Refactored several things: extracted register_topic() function; changed handling of event attributes to pass in what is expected by a decorated method; extracted aggregate mutator function allowing non-default mutator function to be used with repository get() method; stopped using deprecated Thread.setDaemon() method. Improved static type hinting.
Version 9.1.4 (released 20 October 2021)¶
Fixed discrepancy between Application save() and Follower record() methods, so that Follower applications will do automatic snapshotting based on their ‘snapshotting_intervals’ after their policy() has been called, as expected.
Version 9.1.3 (released 8 October 2021)¶
Added “trove classifier” for Python 3.10.
Version 9.1.2 (released 1 October 2021)¶
Clarified Postgres configuration options (POSTGRES_LOCK_TIMEOUT and POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT) require integer seconds. Added py.typed file (was missing since v9).
Version 9.1.1 (released 20 August 2021)¶
Changed PostgreSQL schema to use BIGSERIAL (was SERIAL) for notification IDs.
Version 9.1.0 (released 18 August 2021)¶
Added support for setting environment when constructing application. Added “eq” and “repr” methods on aggregate base class. Reinstated explicit definition of Aggregate.Created class. Added Invoice example, and Parking Lot example. Fixed bug when decorating property setter (use method argument name). Improved type annotations. Adjusted order of calling domain event mutate() and apply() methods, so apply() method is called first, in case exceptions are raised by apply() method so that the aggregate object can emerge unscathed whereas previously its version number and modified time would always be changed. Improved robustness of recorder classes, with more attention to connection state, closing connections on certain errors, retrying operations under certain conditions, and especially by changing the postgres recorders to obtain ‘EXCLUSIVE’ mode table lock when inserting events. Obtaining the table lock in PostgreSQL avoids interleaving of inserts between commits, which avoids event notifications from being committed with lower notification IDs than event notifications that have already been committed, and thereby prevents readers who are tailing the notification log of an application from missing event notifications for this reason. Added various environment variable options: for sqlite a lock timeout option; and for postgres a max connection age option which allows connections over a certain age to be closed when idle, a connection pre-ping option, a lock timeout option, and an option to timeout sessions idle in transaction so that locks can be released even if the database client has somehow ceased to continue its interactions with the server in a way that leave the session open. Improved the exception classes, to follow the standard Python DBAPI class names, and to encapsulate errors from drivers with library errors following this standard. Added methods to notification log and reader classes to allow notifications to be selected directly. Changed Follower class to select() rather than read() notifications. Supported defining initial version number of aggregates on aggregate class (with INITIAL_VERSION attribute).
Version 9.0.3 (released 17 May 2021)¶
Changed PostgreSQL queries to use transaction class context manager (transactions were started and not closed). Added possibility to specify a port for Postgres (thanks to Valentin Dion). Added **kwargs to Application.save() method signature, so other things can be passed down the stack. Fixed reference in installing.rst (thanks to Karl Heinrichmeyer). Made properties out of aggregate attributes: ‘modified_on’ and ‘version’. Improved documentation.
Version 9.0.2 (released 16 April 2021)¶
Fixed issue with type hints in PyCharm v2021.1 for methods decorated with the @event decorator.
Version 9.0.1 (released 29 March 2021)¶
Improved documentation. Moved cipher base class to avoid importing cipher module.
Version 9.0.0 (released 13 March 2021)¶
First release of the distilled version of the library. Compared with previous versions, the code and documentation are much simpler. This version focuses directly on expressing the important concerns, without the variations and alternatives that had been accumulated over the past few years of learning and pathfinding.
The highlight is the new declarative syntax for event sourced domain models.
Dedicated persistence modules for SQLite and PostgresSQL have been introduced. Support for SQLAlchemy and Django, and other databases, has been removed. The plan is to support these in separate package distributions. The default “plain old Python object” infrastructure continues to exist, and now offers event storage and retrieval performance of around 20x the speed of using PostgreSQL and around 4x the speed of using SQLite in memory.
The event storage format is more efficient, because originator IDs and originator versions are removed from the stored event state before serialisation, and then reinstated on serialisation.
Rather than the using “INSERT SELECT MAX” SQL statements, database sequences are used to generate event notifications. This avoids table conflicts that sometimes caused exceptions and required retries when storing events. Although this leads to notification ID sequences that may have gaps, the use of sequences means there is still no risk of event notifications being inserted in the gaps after later event notifications have been processed, which was the motivation for using gapless sequences in previous versions. The notification log and log reader classes have been adjusted to support the possible existence of gaps in the notification log sequence.
The transcoder is more easily extensible, with the new style for defining and registering individual transcoding objects to support individual types of object that are not supported by default.
Domain event classes have been greatly simplified, with the deep hierarchy of entity and event classes removed in favour of the simple aggregate base class.
The repository class has been changed to provide a single get() method. It no longer supports the Python “indexing” square-bracket syntax, so that there is just one way to get an aggregate regardless of whether the requested version is specified or not.
Application configuration of persistence infrastructure is now driven by environment variables rather than constructor parameters, leading to a simpler interface for application object classes. The mechanism for storing aggregates has been simplified, so that aggregates are saved using the application “save” method. A new “notify” method has been added to the application class, to support applications that need to know when new events have just been recorded.
The mechanism by which aggregates published their events and a “persistence subscriber” subscribed and persisted published domain events has been completely removed, since aggregates that are saved always need some persistence infrastructure to store the events, and it is the responsibility of the application to bring together the domain model and infrastructure, so that when an aggregate can be saved there is always an application.
Process application policy methods are now given a process event object and will use it to collect domain events, using its “save” method, which has the same method signature as the application “save” method. This allows policies to accumulate new events on the process event object in the order they were generated, whereas previously if new events were generated on one aggregate and then a second and then the first, the events of one aggregate would be stored first and the events of the second aggregate would be stored afterwards, leading to an incorrect ordering of the domain events in the notification log. The process event object existed in previous versions, was used to keep track of the position in a notification log of the event notification that was being processed by a policy, and continues to be used for that purpose.
The system runners have been reduced to the single-threaded and multi-threaded runners, with support for running with Ray and gRPC and so on removed (the plan being to support these in separate package distributions).
Altogether, these changes mean the core library now depends only on the PythonStandard Library, except for the optional extra dependencies on a cryptographic library (PyCryptodome) and a PostgresSQL driver (psycopg2), and the dependencies of development tools. Altogether, these changes make the test suite much faster to run (several seconds rather than several minutes for the previous version). These changes make the build time on CI services much quicker (around one minute, rather than nearly ten minutes for the previous version). And these changes make the library more approachable and fun for users and library developers. Test coverage has been increased to 100% line and branch coverage. Also mypy and flake8 checking is done.
The documentation has been rewritten to focus more on usage of the library code, and less on explaining surrounding concepts and considerations.
Version 8.x¶
Version 8.x series brings more efficient storage, static type hinting, improved transcoding, event and entity versioning, and integration with Axon Server (specialist event store) and Ray. Code for defining and running systems of application, previously in the “application” package, has been moved to a new “system” package.
Version 8.3.0 (released 9 January 2021)¶
Added gRPC runner. Improved Django record manager, so that it supports setting notification log IDs in the application like the SQLAlchemy record manager (this optionally avoids use of the “insert select max” statement and thereby makes it possible to exclude domain events from the notification log at the risk of non-gapless notification log sequences). Also improved documentation.
Version 8.2.5 (released 22 Dec 2020)¶
Increased versions of dependencies on requests, Django, Celery, PyMySQL.
Version 8.2.4 (released 12 Nov 2020)¶
Fixed issue with using Oracle database, where a trailing semicolon in an SQL statement caused the “invalid character” error (ORA-00911).
Version 8.2.3 (released 19 May 2020)¶
Improved interactions with process applications in RayRunner so that they have the same style as interactions with process applications in other runners. This makes the RayRunner more interchangeable with the other runners, so that system client code can be written to work with any runner.
Version 8.2.2 (released 16 May 2020)¶
Improved documentation. Updated dockerization for local development. Added Makefile, to setup development environment, to build and run docker containers, to run the test suite, to format the code, and to build the docs. Reformatted the code.
Version 8.2.1 (released 11 March 2020)¶
Improved documentation.
Version 8.2.0 (released 10 March 2020)¶
Added optional versioning of domain events and entities, so that domain events and entity snapshots can be versioned and old versions of state can be upcast to new versions.
Added optional correlation and causation IDs for domain events, so that a story can be traced through a system of applications.
Added AxonApplication and AxonRecordManager so that Axon Server can be used as an event store by event-sourced applications.
Added RayRunner, which allows a system of applications to be run with the Ray framework.
Version 8.1.0 (released 11 January 2020)¶
Improved documentation. Improved transcoding (e.g. tuples are encoded as tuples also within other collections). Added event hash method name to event attributes, so that event hashes created with old version of event hashing can still be checked. Simplified repository base classes (removed “event player” class).
Version 8.0.0 (released 7 December 2019)¶
The storage of event state has been changed from strings to bytes. This is definitely a backwards incompatible change. Previously state bytes were encoded with base64 before being saved as strings, which adds 33% to the size of each stored state. Compression of event state is now an option, independently of encryption, and compression is now configurable (defaults to zlib module, other compressors can be used). Attention will need to be paid to one of two alternatives. One alternative is to migrate your stored events (the state field), either from being stored as plaintext strings to being stored as plaintext bytes (you need to encode as utf-8), or from being stored as ciphertext bytes encoded with base64 decoded as utf-8 to being stored as ciphertext bytes (you need to encode as utf-8 and decode base64). The other alternative is to carry on using the same database schema, define custom stored event record classes in your project (copied from the previous version of the library), and extend the record manager to convert the bytes to strings and back. A later version of this library may bring support for one or both of these options, so if this change presents a challenge, please hold off from upgrading, and discuss your situation with the project developer(s). There is nothing wrong with the previous version, and you can continue to use it.
Other backwards incompatible changes involve renaming a number of methods, and moving classes and also modules (for example, the system modules have been moved from the applications package to a separate package). Please see the commit log for all the details.
This version also brings improved and expanded transcoding, additional type annotations, automatic subclassing on domain entities of domain events (not enabled by default), an option to apply the policy of a process application to all events that are generated by its policy when an event notification is processed (continues until all successively generated events have been processed, with all generated events stored in the same atomic process event, as if all generated events were generated in a single policy function).
Please note, the transcoding now supports the encoding of tuples, and named tuples, as tuples. Previously tuples were encoded by the JSON transcoding as lists, and so tuples became lists, which is the default behaviour on the core json package. So if you have code that depends on the transcoder converting tuples to lists, then attention will have to paid to the fact that tuples will now be encoded and returned as tuples. However, any existing stored events generated with an earlier version of this library will continue to be returned as lists, since they were encoded as lists not tuples.
Please note, the system runner class was changed to keep references to constructed process application classes in the runner object, rather than the system object. If you have code that accesses the process applications as attributes on the system object, then attention will need to be paid to accessing the process applications by class on the runner object.
Version 7.x¶
Version 7.x series refined the “process and system” code.
Version 7.2.4 (released 9 Oct 2019)¶
Version 7.2.4 fixed an issue in running the test suite.
Version 7.2.3 (released 9 Oct 2019)¶
Version 7.2.3 fixed a bug in MultiThreadedRunner.
Version 7.2.2 (released 6 Oct 2019)¶
Version 7.2.2 has improved documentation for “reliable projections”.
Version 7.2.1 (released 6 Oct 2019)¶
Version 7.2.1 has improved support for “reliable projections”, which allows custom records to be deleted (previously only create and update was supported). The documentation for “reliable projections” was improved. The previous code snippet, which was merely suggestive, was replaced by a working example.
Version 7.2.0 (released 1 Oct 2019)¶
Version 7.2.0 has support for “reliable projections” into custom ORM objects that can be coded as process application policies.
Also a few issues were resolved: avoiding importing Django models from library when custom models are being used to store events prevents model conflicts; fixed multiprocess runner to work when an application is not being followed by another; process applications now reflect off the sequenced item tuple when reading notifications so that custom field names are used.
Version 7.1.6 (released 2 Aug 2019)¶
Version 7.1.6 fixed an issue with the notification log reader. The notification log reader was sometimes using a “fast path” to get all the notifications without paging through the notification log using the linked sections. However, when there were too many notification, this failed to work. A few adjustments were made to fix the performance and robustness and configurability of the notification log reading functionality.
Version 7.1.5 (released 26 Jul 2019)¶
Version 7.1.5 improved the library documentation with better links to module reference pages. The versions of dependencies were also updated, so that all versions of dependencies are the current stable versions of the package distributions on PyPI. In particular, requests was updated to a version that fixes a security vulnerability.
Version 7.1.4 (released 10 Jul 2019)¶
Version 7.1.4 improved the library documentation.
Version 7.1.3 (released 4 Jul 2019)¶
Version 7.1.3 improved the domain model layer documentation.
Version 7.1.2 (released 26 Jun 2019)¶
Version 7.1.2 fixed method ‘construct_app()’ on class ‘System’ to set ‘setup_table’ on its process applications using the system’s value of ‘setup_tables’. Also updated version of dependency of SQLAlchemy-Utils.
Version 7.1.1 (released 21 Jun 2019)¶
Version 7.1.1 added ‘Support options’ and ‘Contributing’ sections to the documentation.
Version 7.1.0 (released 11 Jun 2019)¶
Version 7.1.0 improved structure to the documentation.
Version 7.0.0 (released 21 Feb 2019)¶
Version 7.0.0 brought many incremental improvements across the library, especially the ability to define an entire system of process applications independently of infrastructure. Please note, records fields have been renamed.
Version 6.x¶
Version 6.x series was the first release of the “process and system” code.
Version 6.2.0 (released 15 Jul 2018)¶
Version 6.2.0 (released 26 Jun 2018)¶
Version 6.1.0 (released 14 Jun 2018)¶
Version 6.0.0 (released 23 Apr 2018)¶
Version 5.x¶
Version 5.x added support for Django ORM. It was released as a new major version after quite a lot of refactoring made things backward-incompatible.
Version 5.1.1 (released 4 Apr 2018)¶
Version 5.1.0 (released 16 Feb 2018)¶
Version 5.0.0 (released 24 Jan 2018)¶
Support for Django ORM was added in version 5.0.0.
Version 4.x¶
Version 4.x series was released after quite a lot of refactoring made things backward-incompatible. Object namespaces for entity and event classes was cleaned up, by moving library names to double-underscore prefixed and postfixed names. Domain events can be hashed, and also hash-chained together, allowing entity state to be verified. Created events were changed to have originator_topic, which allowed other things such as mutators and repositories to be greatly simplified. Mutators are now by default expected to be implemented on entity event classes. Event timestamps were changed from floats to decimal objects, an exact number type. Cipher was changed to use AES-GCM to allow verification of encrypted data retrieved from a database.
Also, the record classes for SQLAlchemy were changed to have an auto-incrementing ID, to make it easy to follow the events of an application, for example when updating view models, without additional complication of a separate application log. This change makes the SQLAlchemy library classes ultimately less “scalable” than the Cassandra classes, because an auto-incrementing ID must operate from a single thread. Overall, it seems like a good trade-off for early-stage development. Later, when the auto-incrementing ID bottleneck would otherwise throttle performance, “scaling-up” could involve switching application infrastructure to use a separate application log.
Version 4.0.0 (released 11 Dec 2017)¶
Version 3.x¶
Version 3.x series was a released after quite of a lot of refactoring made things backwards-incompatible. Documentation was greatly improved, in particular with pages reflecting the architectural layers of the library (infrastructure, domain, application).
Version 3.1.0 (released 23 Nov 2017)¶
Version 3.0.0 (released 25 May 2017)¶
Version 2.x¶
Version 2.x series was a major rewrite that implemented two distinct kinds of sequences: events sequenced by integer version numbers and events sequenced in time, with an archetypal “sequenced item” persistence model for storing events.
Version 2.1.1 (released 30 Mar 2017)¶
Version 2.1.0 (released 27 Mar 2017)¶
Version 2.0.0 (released 27 Mar 2017)¶
Version 1.x¶
Version 1.x series was an extension of the version 0.x series, and attempted to bridge between sequencing events with both timestamps and version numbers.
Version 1.2.1 (released 23 Oct 2016)¶
Version 1.2.0 (released 23 Oct 2016)¶
Version 1.1.0 (released 19 Oct 2016)¶
Version 1.0.10 (released 5 Oct 2016)¶
Version 1.0.9 (released 17 Aug 2016)¶
Version 1.0.8 (released 30 Jul 2016)¶
Version 1.0.7 (released 13 Jul 2016)¶
Version 1.0.6 (released 7 Jul 2016)¶
Version 1.0.5 (released 1 Jul 2016)¶
Version 1.0.4 (released 30 Jun 2016)¶
Version 1.0.3 (released 30 Jun 2016)¶
Version 1.0.2 (released 8 Jun 2016)¶
Version 1.0.1 (released 7 Jun 2016)¶
Version 0.x¶
Version 0.x series was the initial cut of the code, all events were sequenced by timestamps, or TimeUUIDs in Cassandra, because the project originally emerged whilst working with Cassandra.