Infrastructure layer

The library’s infrastructure layer provides a cohesive mechanism for storing events as sequences of items.

The entire mechanism is encapsulated by the library’s EventStore class. The event store uses a sequenced item mapper and a record manager.

The sequenced item mapper converts objects such as domain events to sequenced items, and the record manager writes sequenced items to database records. The sequenced item mapper and the record manager operate by reflection off a common sequenced item type.

Sequenced items

Sequenced item types are declared as Python named tuples. The example below is a sequenced item type with four fields.

from collections import namedtuple

SequencedItem = namedtuple('SequencedItem', ['sequence_id', 'position', 'topic', 'data'])

The field names are arbitrary, however a suitable database table will have matching column names.

Whatever the names of the fields, the first field of a sequenced item will represent the identity of a sequence to which an item belongs. The second field will represent the position of the item in its sequence. The third field will represent a topic to which the item pertains. And the fourth field will represent the state of the item.


The library provides a sequenced item type called SequencedItem.

from eventsourcing.infrastructure.sequenceditem import SequencedItem

Like in the example above, the library’s SequencedItem has four fields. The sequence_id identifies the sequence to which the item belongs. The position identifies the position of the item in its sequence. The topic identifies a dimension of concern to which the item pertains. The state holds the state of the item.

A sequenced item is just a tuple, and can be used as such. In the example below, a sequenced item happens to be constructed with a UUID to identify a sequence. The item has also been given an integer position value, it has a topic that happens to correspond to a domain event class in the library. The item’s state is a JSON string in which foo is bar.

from uuid import uuid4

sequence1 = uuid4()

state = '{"foo":"bar","position":0,"sequence_id":{"UUID":"%s"}}' % sequence1.hex

sequenced_item1 = SequencedItem(

As expected, the attributes of the sequenced item object are simply the values given when the object was constructed.

assert sequenced_item1.sequence_id == sequence1
assert sequenced_item1.position == 0
assert sequenced_item1.topic == ''
assert sequenced_item1.state == state, sequenced_item1.state


The library also provides a sequenced item type called StoredEvent. Its attributes are originator_id, originator_version, topic, and state.

The originator_id is perhaps the ID of a domain entity that triggered the event, and is equivalent to sequence_id above. The originator_version could be the version of a domain entity that triggered the event, and is equivalent to position above. The topic identifies the class of the domain event that is stored, and is equivalent to topic above. The state holds the state of the domain event, and is equivalent to state above.

from eventsourcing.infrastructure.sequenceditem import StoredEvent

aggregate1 = uuid4()

stored_event1 = StoredEvent(
    state='{"foo":"bar","originator_version":0,"originator_id":{"UUID":"%s"}}' % aggregate1.hex,
assert stored_event1.originator_id == aggregate1
assert stored_event1.originator_version == 0
assert stored_event1.topic == ''
assert stored_event1.state == '{"foo":"bar","originator_version":0,"originator_id":{"UUID":"%s"}}' % aggregate1.hex

Sequenced item mapper

The event store uses a sequenced item mapper to map between sequenced items and application-level objects such as domain events.

The library provides a sequenced item mapper object class called SequencedItemMapper.

from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper

The SequencedItemMapper has a constructor arg sequenced_item_class, which defaults to the library’s sequenced item named tuple SequencedItem.

sequenced_item_mapper = SequencedItemMapper()

The method event_from_item() can be used to convert sequenced item objects to application-level objects.

domain_event = sequenced_item_mapper.event_from_item(sequenced_item1)

assert == 'bar'

The method item_from_event() can be used to convert application-level objects to sequenced item named tuples.

recovered_state = sequenced_item_mapper.item_from_event(domain_event).state
assert recovered_state == sequenced_item1.state, (recovered_state, sequenced_item1.state)

If the names of the first two fields of the sequenced item named tuple (e.g. sequence_id and position) do not match the names of the attributes of the application-level object which identify a sequence and a position (e.g. originator_id and originator_version) then the attribute names can be given to the sequenced item mapper using constructor args sequence_id_attr_name and position_attr_name.

from import DomainEvent

domain_event1 = DomainEvent(

sequenced_item_mapper = SequencedItemMapper(

assert == 'baz'

assert sequenced_item_mapper.item_from_event(domain_event1).sequence_id == aggregate1

Alternatively, a sequenced item named tuple type that is different from the default SequencedItem namedtuple, for example the library’s StoredEvent namedtuple, can be passed with the constructor arg sequenced_item_class.

sequenced_item_mapper = SequencedItemMapper(

domain_event1 = sequenced_item_mapper.event_from_item(stored_event1)

assert == 'bar', domain_event1

Since the alternative StoredEvent namedtuple can be used instead of the default SequencedItem namedtuple, so it is possible to use a custom named tuple. Which alternative you use for your project depends on your preferences for the names in the your domain events and your persistence model.

Please note, it is required of these application-level objects that the “topic” generated by get_topic() from the object class is resolved by resolve_topic() back to the same object class.

from import Created
from eventsourcing.utils.topic import get_topic, resolve_topic

topic = get_topic(Created)
assert resolve_topic(topic) == Created
assert topic == ''

Custom JSON transcoding

The SequencedItemMapper can be constructed with optional args json_encoder_class and json_decoder_class. The defaults are the library’s ObjectJSONEncoder and ObjectJSONDecoder which can be extended to support types of value objects that are not currently supported by the library.

The code below extends the JSON transcoding to support sets.

from eventsourcing.utils.transcoding import ObjectJSONEncoder, ObjectJSONDecoder

class CustomObjectJSONEncoder(ObjectJSONEncoder):
    def default(self, obj):
        if isinstance(obj, set):
            return {'__set__': list(obj)}
            return super(CustomObjectJSONEncoder, self).default(obj)

class CustomObjectJSONDecoder(ObjectJSONDecoder):
    def from_jsonable(cls, d):
        if '__set__' in d:
            return cls._decode_set(d)
            return ObjectJSONDecoder.from_jsonable(d)

    def _decode_set(d):
        return set(d['__set__'])

customized_sequenced_item_mapper = SequencedItemMapper(

domain_event = customized_sequenced_item_mapper.event_from_item(
        state='{"foo":{"__set__":["bar","baz"]},"originator_version":0,"originator_id":{"UUID":"%s"}}' % sequence1
assert == set(["bar", "baz"])

sequenced_item = customized_sequenced_item_mapper.item_from_event(domain_event)
assert sequenced_item.state.startswith('{"foo":{"__set__":["ba')

Application-level encryption

The SequencedItemMapper can be constructed with a symmetric cipher. If a cipher is given, then the state field of every sequenced item will be encrypted before being sent to the database. The state retrieved from the database will be decrypted and verified, which protects against tampering.

The library provides an AES cipher object class called AESCipher. It uses the AES cipher from the Python Cryptography Toolkit, as forked by the actively maintained PyCryptodome project.

The AESCipher class uses AES in GCM mode, which is a padding-less, authenticated encryption mode. Other AES modes aren’t supported by this class, at the moment.

The AESCipher constructor arg cipher_key is required. The key must be either 16, 24, or 32 random bytes (128, 192, or 256 bits). Longer keys take more time to encrypt plaintext, but produce more secure ciphertext.

Generating and storing a secure key requires functionality beyond the scope of this library. However, the library contains a function encode_random_bytes() that may help to generate a unicode key string, representing random bytes encoded with Base64. A companion function decode_bytes() decodes the unicode key string into a sequence of bytes.

from eventsourcing.utils.cipher.aes import AESCipher
from eventsourcing.utils.random import encode_random_bytes, decode_bytes

# Unicode string representing 256 random bits encoded with Base64.
cipher_key = encode_random_bytes(num_bytes=32)

# Construct AES-256 cipher.
cipher = AESCipher(cipher_key=decode_bytes(cipher_key))

# Encrypt some plaintext (using nonce arguments).
ciphertext = cipher.encrypt('plaintext')
assert ciphertext != 'plaintext'

# Decrypt some ciphertext.
plaintext = cipher.decrypt(ciphertext)
assert plaintext == 'plaintext'

The SequencedItemMapper has constructor arg cipher, which can be used to pass in a cipher object, and thereby enable encryption.

# Construct sequenced item mapper to always encrypt domain events.
ciphered_sequenced_item_mapper = SequencedItemMapper(

# Domain event attribute ``foo`` has value ``'bar'``.
assert == 'bar'

# Map the domain event to an encrypted stored event namedtuple.
stored_event = ciphered_sequenced_item_mapper.item_from_event(domain_event1)

# Attribute names and values of the domain event are not visible in the encrypted ``state`` field.
assert 'foo' not in stored_event.state
assert 'bar' not in stored_event.state

# Recover the domain event from the encrypted state.
domain_event = ciphered_sequenced_item_mapper.event_from_item(stored_event)

# Domain event has decrypted attributes.
assert == 'bar'

Please note, the sequence ID and position values are not encrypted, necessarily. However, by encrypting the state of the item within the application, potentially sensitive information, for example personally identifiable information, will be encrypted in transit to the database, at rest in the database, and in all backups and other copies.

Record managers

The event store uses a record manager to write sequenced items to database records.

The library has an abstract base class AbstractSequencedItemRecordManager with abstract methods record_sequenced_items() and get_items(), which can be used on concrete implementations to read and write sequenced items in a database.

A record manager is constructed with a sequenced_item_class and a matching record_class. The field names of a suitable record class will match the field names of the sequenced item named tuple.


The library class SQLAlchemyRecordManager is a record manager for SQLAlchemy.

To run the example below, please install the library with the ‘sqlalchemy’ option.

$ pip install eventsourcing[sqlalchemy]

The library provides record classes for SQLAlchemy, such as IntegerSequencedRecord and StoredEventRecord. The class IntegerSequencedRecord matches the default SequencedItem namedtuple. The StoredEventRecord class matches the alternative StoredEvent namedtuple. There is also a TimestampSequencedRecord and a SnapshotRecord.

The code below uses the namedtuple StoredEvent and the record class StoredEventRecord.

from eventsourcing.infrastructure.sqlalchemy.records import StoredEventRecord

Database settings can be configured using SQLAlchemySettings, which is constructed with a uri connection string. The code below uses an in-memory SQLite database.

from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemySettings

settings = SQLAlchemySettings(uri='sqlite:///:memory:')

To help setup a database connection and tables, the library has object class SQLAlchemyDatastore.

The SQLAlchemyDatastore is constructed with the settings object, and a tuple of record classes passed using the tables arg.

from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemyDatastore

datastore = SQLAlchemyDatastore(

Please note, if you have declared your own SQLAlchemy model Base class, you may wish to define your own record classes which inherit from your Base class. If so, if may help to refer to the library record classes to see how SQLALchemy ORM columns and indexes can be used to persist sequenced items.

The methods setup_connection() and setup_tables() of a datastore object can be used to setup the database connection and the tables.


As well as sequenced_item_class and a matching record_class, the SQLAlchemyRecordManager requires a scoped session object, passed using the constructor arg session. For convenience, the SQLAlchemyDatastore has a thread-scoped session facade set as its a session attribute. You may wish to use a different scoped session facade, such as a request-scoped session object provided by a Web framework.

With the database setup, an SQLAlchemyRecordManager can be constructed, and used to store events using SQLAlchemy.

from eventsourcing.infrastructure.sqlalchemy.manager import SQLAlchemyRecordManager

record_manager = SQLAlchemyRecordManager(

Sequenced items (or “stored events” in this example) can be appended to the database using the record_sequenced_items() method of the record manager.


All the previously appended items of a sequence can be retrieved by using the list_items() method.

results = record_manager.list_items(aggregate1)

Since by now only one item was stored, so there is only one item in the results.

assert len(results) == 1
assert results[0] == stored_event1

SQLAlchemy dialects

The databases supported by core SQLAlchemy dialects are Firebird, Microsoft SQL Server, MySQL, Oracle, PostgreSQL, SQLite, and Sybase. This library’s infrastructure classes for SQLAlchemy have been tested with MySQL, PostgreSQL, and SQLite.


For MySQL, the Python package mysql-connector-python-rf can be used (licenced GPL v2). Please note, I had problems running this driver with Python 2.7 (unicode error when it raises exceptions).

$ pip install pymysql-connector-python-rf

The uri for MySQL used with this driver would look something like this.


Alternatively for MySQL, the Python package mysqlclient can be used (also licenced GPL v2). I didn’t have problems using this driver with Python 2.7.

$ pip install mysqlclient

The uri for MySQL used with this driver would look something like this.


Another alternative is PyMySQL. It has a BSD licence.

$ pip install PyMySQL

The uri for MySQL used with this driver would look something like this.



For PostgreSQL, the Python package psycopg2 can be used.

$ pip install psycopg2

The uri for PostgreSQL used with this driver would look something like this.



SQLite is shipped with core Python packages, so nothing extra needs to be installed.

The uri for a temporary SQLite database might look something like this.


Please note, the library’s SQLAlchemy insfrastructure defaults to using an in memory SQLite database, which is the fastest way to run the library, and is recommended as a convenience for development.

Django ORM

The library has a record manager for the Django ORM provided by DjangoRecordManager class.

To run the example below, please install the library with the ‘django’ option.

$ pip install eventsourcing[django]

For the DjangoRecordManager, the IntegerSequencedRecord matches the SequencedItem namedtuple. The StoredEventRecord from the same module matches the StoredEvent namedtuple. There is also a TimestampSequencedRecord and a SnapshotRecord. These are all Django models.

The package eventsourcing.infrastructure.django is a little Django app. To involve its models in your Django project, simply include the application in your project’s list of INSTALLED_APPS.


Alternatively, import or write the classes you want into one of your own Django app’s

The Django application at eventsourcing.infrastructure.django has database migrations that will add four tables, one for each of the record classes mentioned above. So if you use the application directly in INSTALLED_APPS then the app’s migrations will be picked up by Django.

If, instead of using the app directly, you import some of its model classes into your own application’s, you will need to run python makemigrations before tables for event sourcing can be created by Django. This way you can avoid creating tables you won’t use.

The library has a little Django project for testing the library’s Django app, it is used in this example to help run the library’s Django app.

import os

os.environ['DJANGO_SETTINGS_MODULE'] = 'eventsourcing.tests.djangoproject.djangoproject.settings'

This Django project is simply the files that startproject generates, with the SQLite database set to be in memory, and with the library’s Django app added to the INSTALLED_APPS setting.

With the environment variable DJANGO_SETTINGS_MODULE referring to the Django project, Django can be started. If you aren’t running tests with the Django test runner, you may need to run django.setup().

import django


Before using the database, make sure the migrations have been applied, so the necessary database tables exist.

An alternative to python migrate is the call_command() function, provided by Django. If you aren’t running tests with the Django test runner, this can help e.g. to setup an SQLite database in memory before each test by calling it in the setUp() method of a test case.

from import call_command

call_command('migrate', verbosity=0, interactive=False)

So long as a table exists for its record class, the DjangoRecordManager can be used to store events using the Django ORM.

from eventsourcing.infrastructure.django.manager import DjangoRecordManager
from eventsourcing.infrastructure.django.models import StoredEventRecord

django_record_manager = DjangoRecordManager(

results = django_record_manager.list_items(aggregate1)
assert len(results) == 0


results = django_record_manager.list_items(aggregate1)
assert results[0] == stored_event1

Django backends

The supported Django backends are PostgreSQL, MySQL, SQLite, and Oracle. This library’s Django infrastructure classes have been tested with PostgreSQL, MySQL, SQLite.

Contiguous record IDs

The contiguous_record_ids argument, used in the examples above, is optional, and is by default False. If set to a True value, and if the record class has an ID field, then the records will be inserted (using an “insert select from” query) that generates a table of records with IDs that form a contiguous integer sequence.

Application events recorded in this way can be accurately followed as a single sequence without overbearing complexity to mitigate gaps and race conditions. This feature is only available on the relational record managers (Django and SQLAlchemy, not Cassandra).

If the record ID is merely auto-incrementing, as it is when the the library’s integer sequenced record classes are used without this feature being enabled, then gaps could be generated. Whenever there is contention in the aggregate sequence (record ID) that causes the unique record ID constraint to be violated, the transaction will being rolled back, and an ID that was issued was could be discarded and lost. Other greater IDs may already have been issued. The complexity for followers is that a gap may be permanent or temporary. It may be that a gap is eventually filled by a transaction that was somehow delayed. Although some database appear to have auto-incrementing functionality that does not lead to gaps even with transactions being rolled back, I don’t understand when this happens and when it doesn’t and so feel unable to reply on it, at least at the moment. It appears to be an inherently unreliable situation that could probably be mitigated satisfactorily by followers if they need to project the application events accurately, but only with increased complexity.

Each relational record manager has a raw SQL query with an “insert select from” statement. If possible, the raw query is compiled when the record manager object is constructed. When a record is inserted, the new field values are bound to the raw query and executed within a transaction. When executed, the query firstly selects the maximum ID from all records currently existing in the table (as visible in its transaction), and then attempts to insert a record with an ID value of the max existing ID plus one (the next unused ID). The record table must have a unique constraint for the ID, so that records aren’t overwritten by this query. The record ID must also be indexed, so that the max value can be identified efficiently. The b-tree commonly used for databases indexes supports this purpose well. The transaction isolation level must be at least “read committed”, which is true by default for MySQL and PostgreSQL.

Any resulting contention in the record ID will raise an exception so that the query can be retried. The library exception class RecordConflictError will be raised.


The library has a record manager for Apache Cassandra provided by the library class CassandraRecordManager.

from eventsourcing.infrastructure.cassandra.manager import CassandraRecordManager

To run the example below, please install the library with the ‘cassandra’ option.

$ pip install eventsourcing[cassandra]

It takes a while to build the driver. If you want to do that last step quickly, set the environment variable CASS_DRIVER_NO_CYTHON.

$ CASS_DRIVER_NO_CYTHON=1 pip install eventsourcing[cassandra]

For the CassandraRecordManager, the IntegerSequencedRecord from eventsourcing.infrastructure.cassandra.records matches the SequencedItem namedtuple. The StoredEventRecord from the same module matches the StoredEvent namedtuple. There is also a TimestampSequencedRecord, a TimeuuidSequencedRecord, and a SnapshotRecord.

The CassandraDatastore and CassandraSettings can be used in the same was as SQLAlchemyDatastore and SQLAlchemySettings above. Please investigate library class CassandraSettings for information about configuring away from default settings.

from eventsourcing.infrastructure.cassandra.datastore import CassandraDatastore, CassandraSettings
from eventsourcing.infrastructure.cassandra.records import StoredEventRecord

cassandra_datastore = CassandraDatastore(

With the database setup, the CassandraRecordManager can be constructed, and used to store events using Apache Cassandra.

from eventsourcing.infrastructure.cassandra.manager import CassandraRecordManager

cassandra_record_manager = CassandraRecordManager(

results = cassandra_record_manager.list_items(aggregate1)
assert len(results) == 0


results = cassandra_record_manager.list_items(aggregate1)
assert results[0] == stored_event1


Sequenced item conflicts

It is a common feature of the record manager classes that it isn’t possible successfully to append two items at the same position in the same sequence. If such an attempt is made, a RecordConflictError will be raised.

from eventsourcing.exceptions import RecordConflictError

# Fail to append an item at the same position in the same sequence as a previous item.
except RecordConflictError:
    raise Exception("RecordConflictError not raised")

This feature is implemented using optimistic concurrency control features of the underlying database. With SQLAlchemy, a unique constraint is used that involves both the sequence and the position columns. The Django ORM strategy works in the same way.

With Cassandra the position is the primary key in the sequence partition, and the “IF NOT EXISTS” feature is applied. The Cassandra database management system implements the Paxos protocol, and can thereby accomplish linearly-scalable distributed optimistic concurrency control, guaranteeing sequential consistency of the events of an entity despite the database being distributed. It is also possible to serialize calls to the methods of an entity, but that is out of the scope of this package — if you wish to do that, perhaps something like an actor framework or Zookeeper might help.

Event store

The library’s EventStore provides an interface to the library’s cohesive mechanism for storing events as sequences of items, and can be used directly within an event sourced application to append and retrieve its domain events.

The EventStore is constructed with a sequenced item mapper and a record manager, both are discussed in detail in the sections above.

from eventsourcing.infrastructure.eventstore import EventStore

event_store = EventStore(

The method store() can store a domain event in its sequence. The event store uses its sequenced_item_mapper to obtain a sequenced item named tuple from a domain events, and it uses its record_manager to record a sequenced item in the database.

In the code below, a DomainEvent is appended to sequence aggregate1 at position 1.

The method get_domain_events() can be used to get events that have previously been stored. The event store uses its record_manager to get the sequenced items from database records, and it uses its sequenced_item_mapper to obtain domain events from the sequenced items.

results = event_store.get_domain_events(aggregate1)

Since by now two domain events have been stored, so there are two domain events in the results.

assert len(results) == 2

assert results[0].foo == 'bar'
assert results[1].foo == 'baz'

The optional arguments of get_domain_events() can be used to select some of the items in the sequence.

The lt arg is used to select items below the given position in the sequence.

The lte arg is used to select items below and at the given position in the sequence.

The gte arg is used to select items at and above the given position in the sequence.

The gt arg is used to select items above the given position in the sequence.

The limit arg is used to limit the number of items selected from the sequence.

The is_ascending arg is used when selecting items. It affects how any limit is applied, and determines the order of the results. Hence, it can affect both the content of the results and the performance of the method.

# Get events below and at position 0.
result = event_store.get_domain_events(aggregate1, lte=0)
assert len(result) == 1, result
assert result[0].foo == 'bar'

# Get events at and above position 1.
result = event_store.get_domain_events(aggregate1, gte=1)
assert len(result) == 1, result
assert result[0].foo == 'baz'

# Get the first event in the sequence.
result = event_store.get_domain_events(aggregate1, limit=1)
assert len(result) == 1, result
assert result[0].foo == 'bar'

# Get the last event in the sequence.
result = event_store.get_domain_events(aggregate1, limit=1, is_ascending=False)
assert len(result) == 1, result
assert result[0].foo == 'baz'

Optimistic concurrency control

It is a feature of the event store that it isn’t possible successfully to append two events at the same position in the same sequence. This condition is coded as a ConcurrencyError since a correct program running in a single thread wouldn’t attempt to append an event that it had already successfully appended. The exception class ConcurrencyError is a subclass of the exception class RecordConflictError.

from eventsourcing.exceptions import ConcurrencyError

# Fail to append an event at the same position in the same sequence as a previous event.
except ConcurrencyError:
    raise Exception("ConcurrencyError not raised")

This feature depends on the behaviour of the record manager method record_sequenced_items. The event store will raise a ConcurrencyError if a RecordConflictError is raised by its record manager.

If a command fails due to a concurrency error, the command can be retried with the latest state. The decorator retry() can help code retries on commands.

from eventsourcing.domain.model.decorators import retry

errors = []

@retry(ConcurrencyError, max_attempts=5)
def set_password():
    exc = ConcurrencyError()
    raise exc

except ConcurrencyError:
    raise Exception("Shouldn't get here")

assert len(errors) == 5

This feature avoids the sequence of records being corrupted due to concurrent threads operating on the same aggregate. However, the result is that success of appending an event in such circumstances is only probabilistic with respect to concurrency conflicts. Concurrency conflicts can be avoided if all commands for a single aggregate are executed in series, for example by treating each aggregate as an actor within an actor framework, or with locks provided by something like Zookeeper.

Infrastructure factory

To help with construction of infrastructure objects, the library has a various infrastructure factory classes. The abstract base class InfrastructureFactory defines the common method signatures. The concrete subclass SQLAlchemyInfrastructureFactory helps with construction of SQLAlchemy infrastructure. Similarly DjangoInfrastructureFactory helps with Django infrastructure and CassandraInfrastructureFactory helps with Cassandra.