Infrastructure

The library’s infrastructure layer provides a cohesive mechanism for storing events as sequences of items. The entire mechanism is encapsulated by the library’s EventStore class.

The event store uses a “sequenced item mapper” and an “active record strategy”. The sequenced item mapper and the active record strategy share a common “sequenced item” type. The sequenced item mapper can convert objects such as domain events to sequenced items, and the active record strategy can write sequenced items to a database.

Sequenced items

A sequenced item type provides a common persistence model across the components of the mechanism. The sequenced item type is normally declared as a namedtuple.

from collections import namedtuple

SequencedItem = namedtuple('SequencedItem', ['sequence_id', 'position', 'topic', 'data'])

The names of the fields are arbitrary. However, the first field of a sequenced item namedtuple represents the identity of a sequence to which an item belongs, the second field represents the position of the item in its sequence, the third field represents a topic to which the item pertains (dimension of concern), and the fourth field represents the data associated with the item.

SequencedItem namedtuple

The library provides a sequenced item namedtuple called SequencedItem.

from eventsourcing.infrastructure.sequenceditem import SequencedItem

The attributes of SequencedItem are sequence_id, position, topic, and data.

The sequence_id identifies the sequence in which the item belongs.

The position identifies the position of the item in its sequence.

The topic identifies the dimension of concern to which the item pertains.

The data holds the values of the item, perhaps serialized to JSON, and optionally encrypted.

from uuid import uuid4

sequence1 = uuid4()

sequenced_item1 = SequencedItem(
    sequence_id=sequence1,
    position=0,
    topic='eventsourcing.domain.model.events#DomainEvent',
    data='{"foo":"bar"}'
)
assert sequenced_item1.sequence_id == sequence1
assert sequenced_item1.position == 0
assert sequenced_item1.topic == 'eventsourcing.domain.model.events#DomainEvent'
assert sequenced_item1.data == '{"foo":"bar"}'

StoredEvent namedtuple

As an alternative, the library also provides a sequenced item namedtuple called StoredEvent. The attributes of the StoredEvent namedtuple are originator_id, originator_version, event_type, and state.

The originator_id is the ID of the aggregate that published the event, and is equivalent to sequence_id above.

The originator_version is the version of the aggregate that published the event, and is equivalent to position above.

The event_type identifies the class of the domain event that is stored, and is equivalent to topic above.

The state holds the state of the domain event, and is equivalent to data above.

from eventsourcing.infrastructure.sequenceditem import StoredEvent

aggregate1 = uuid4()

stored_event1 = StoredEvent(
    originator_id=aggregate1,
    originator_version=0,
    event_type='eventsourcing.domain.model.events#DomainEvent',
    state='{"foo":"bar"}'
)
assert stored_event1.originator_id == aggregate1
assert stored_event1.originator_version == 0
assert stored_event1.event_type == 'eventsourcing.domain.model.events#DomainEvent'
assert stored_event1.state == '{"foo":"bar"}'

Active record strategies

An active record strategy writes sequenced items to database records.

The library has an abstract base class AbstractActiveRecordStrategy with abstract methods append() and get_items(), which can be used on concrete implementations to read and write sequenced items in a database.

An active record strategy is constructed with a sequenced_item_class and a matching active_record_class. The field names of a suitable active record class will match the field names of the sequenced item namedtuple.

SQLAlchemy

The library has a concrete active record strategy for SQLAlchemy provided by the object class SQLAlchemyActiveRecordStrategy.

from eventsourcing.infrastructure.sqlalchemy.activerecords import SQLAlchemyActiveRecordStrategy

The library also provides active record classes for SQLAlchemy, such as IntegerSequencedItemRecord and StoredEventRecord. The IntegerSequencedItemRecord class matches the default SequencedItem namedtuple. The StoredEventRecord class matches the alternative StoredEvent namedtuple.

The code below uses the namedtuple StoredEvent and the active record StoredEventRecord.

from eventsourcing.infrastructure.sqlalchemy.activerecords import StoredEventRecord

Database settings can be configured using SQLAlchemySettings, which is constructed with a uri connection string. The code below uses an in-memory SQLite database.

from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemySettings

settings = SQLAlchemySettings(uri='sqlite:///:memory:')

To help setup a database connection and tables, the library has object class SQLAlchemyDatastore.

The SQLAlchemyDatastore is constructed with the settings object, and a tuple of active record classes passed using the tables arg.

from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemyDatastore

datastore = SQLAlchemyDatastore(
    settings=settings,
    tables=(StoredEventRecord,)
)

Please note, if you have declared your own SQLAlchemy model Base class, you may wish to define your own active record classes which inherit from your Base class. If so, if may help to refer to the library active record classes to see how SQLALchemy ORM columns and indexes can be used to persist sequenced items.

The methods setup_connection() and setup_tables() of the datastore object can be used to setup the database connection and the tables.

datastore.setup_connection()
datastore.setup_tables()

As well as sequenced_item_class and a matching active_record_class, the SQLAlchemyActiveRecordStrategy requires a scoped session object, passed using the constructor arg session. For convenience, the SQLAlchemyDatabase has a thread-scoped session facade set as its a session attribute. You may wish to use a different scoped session facade, such as a request-scoped session object provided by a Web framework.

active_record_strategy = SQLAlchemyActiveRecordStrategy(
    sequenced_item_class=StoredEvent,
    active_record_class=StoredEventRecord,
    session=datastore.session,
)

Sequenced items (or “stored events” in this example) can be appended to the database using the append() method of the active record strategy.

active_record_strategy.append(stored_event1)

(Please note, since the position is given by the sequenced item itself, the word “append” means here “to add something extra” rather than the perhaps more common but stricter meaning “to add to the end of a document”. That is, the database is deliberately not responsible for positioning a new item at the end of a sequence. So perhaps “save” would be a better name for this operation.)

All the previously appended items of a sequence can be retrieved by using the get_items() method.

results = active_record_strategy.get_items(aggregate1)

Since by now only one item was stored, so there is only one item in the results.

assert len(results) == 1
assert results[0] == stored_event1

Apache Cassandra

The library also has a concrete active record strategy for Apache Cassandra provided by CassandraActiveRecordStrategy class.

Similarly, for the CassandraActiveRecordStrategy, the IntegerSequencedItemRecord from eventsourcing.infrastructure.cassandra.activerecords matches the SequencedItem namedtuple. The StoredEventRecord from the same module matches the StoredEvent namedtuple.

The CassandraDatastore class uses the CassandraSettings class to setup a Cassandra database.

from eventsourcing.infrastructure.cassandra.datastore import CassandraDatastore, CassandraSettings
from eventsourcing.infrastructure.cassandra.activerecords import CassandraActiveRecordStrategy, StoredEventRecord

cassandra_datastore = CassandraDatastore(
    settings=CassandraSettings(),
    tables=(StoredEventRecord,)
)
cassandra_datastore.setup_connection()
cassandra_datastore.setup_tables()

cassandra_active_record_strategy = CassandraActiveRecordStrategy(
    active_record_class=StoredEventRecord,
    sequenced_item_class=StoredEvent,
)

results = cassandra_active_record_strategy.get_items(aggregate1)
assert len(results) == 0

cassandra_active_record_strategy.append(stored_event1)

results = cassandra_active_record_strategy.get_items(aggregate1)
assert results[0] == stored_event1

cassandra_datastore.drop_tables()
cassandra_datastore.drop_connection()

Please refer to CassandraSettings class for information about configuring away from default settings.

Sequenced item conflicts

It is a feature of the active record strategy that it isn’t possible successfully to append two items at the same position in the same sequence. If such an attempt is made, a SequencedItemConflict will be raised by the active record strategy.

from eventsourcing.exceptions import SequencedItemConflict

# Fail to append an item at the same position in the same sequence as a previous item.
try:
    active_record_strategy.append(stored_event1)
except SequencedItemConflict:
    pass
else:
    raise Exception("SequencedItemConflict not raised")

This feature is implemented using optimistic concurrency control features of the underlying database. With SQLAlchemy, the primary key constraint involves both the sequence and the position columns. With Cassandra the position is the primary key in the sequence partition, and the “IF NOT EXISTS” feature is applied.

Sequenced item mapper

A sequenced item mapper is used by the event store to map between sequenced item namedtuple objects and application-level objects such as domain events.

The library provides a sequenced item mapper object class called SequencedItemMapper.

from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper

The SequencedItemMapper has a constructor arg sequenced_item_class, which defaults to the library’s sequenced item namedtuple SequencedItem.

sequenced_item_mapper = SequencedItemMapper()

The method from_sequenced_item() can be used to convert sequenced item objects to application-level objects.

domain_event = sequenced_item_mapper.from_sequenced_item(sequenced_item1)

assert domain_event.sequence_id == sequence1
assert domain_event.position == 0
assert domain_event.foo == 'bar'

The method to_sequenced_item() can be used to convert application-level objects to sequenced item namedtuples.

assert sequenced_item_mapper.to_sequenced_item(domain_event) == sequenced_item1

If the names of the first two fields of the sequenced item namedtuple (e.g. sequence_id and position) do not match the names of the attributes of the application-level object which identify a sequence and a position (e.g. originator_id and originator_version) then the attribute names can be given to the sequenced item mapper using constructor args sequence_id_attr_name and position_attr_name.

sequenced_item_mapper = SequencedItemMapper(
    sequence_id_attr_name='originator_id',
    position_attr_name='originator_version'
)

domain_event1 = sequenced_item_mapper.from_sequenced_item(sequenced_item1)

assert domain_event1.foo == 'bar', domain_event1
assert domain_event1.originator_id == sequence1
assert domain_event1.originator_version == 0
assert sequenced_item_mapper.to_sequenced_item(domain_event1) == sequenced_item1

Alternatively, the constructor arg sequenced_item_class can be set with a sequenced item namedtuple type that is different from the default SequencedItem namedtuple, such as the library’s StoredEvent namedtuple.

sequenced_item_mapper = SequencedItemMapper(
    sequenced_item_class=StoredEvent,
)

domain_event1 = sequenced_item_mapper.from_sequenced_item(stored_event1)

assert domain_event1.foo == 'bar', domain_event1
assert domain_event1.originator_id == aggregate1
assert sequenced_item_mapper.to_sequenced_item(domain_event1) == stored_event1

Since the alternative StoredEvent namedtuple can be used instead of the default SequencedItem namedtuple, so it is possible to use a custom namedtuple. Which alternative you use for your project depends on your preferences for the names in the your domain events and your persistence model.

Please note, it is required of these application-level objects that the “topic” generated by get_topic() from the object class is resolved by resolve_topic() back to the same object class.

from eventsourcing.domain.model.events import Created
from eventsourcing.infrastructure.topic import get_topic, resolve_topic

topic = get_topic(Created)
assert resolve_topic(topic) == Created
assert topic == 'eventsourcing.domain.model.events#Created'

Custom JSON transcoding

The SequencedItemMapper can be constructed with optional args json_encoder_class and json_decoder_class. The defaults are the library’s ObjectJSONEncoder and ObjectJSONDecoder which can be extended to support types of value objects that are not currently supported by the library.

The code below extends the JSON transcoding to support sets.

from eventsourcing.infrastructure.transcoding import ObjectJSONEncoder, ObjectJSONDecoder


class CustomObjectJSONEncoder(ObjectJSONEncoder):
    def default(self, obj):
        if isinstance(obj, set):
            return {'__set__': list(obj)}
        else:
            return super(CustomObjectJSONEncoder, self).default(obj)


class CustomObjectJSONDecoder(ObjectJSONDecoder):
    @classmethod
    def from_jsonable(cls, d):
        if '__set__' in d:
            return cls._decode_set(d)
        else:
            return ObjectJSONDecoder.from_jsonable(d)

    @staticmethod
    def _decode_set(d):
        return set(d['__set__'])


customized_sequenced_item_mapper = SequencedItemMapper(
    json_encoder_class=CustomObjectJSONEncoder,
    json_decoder_class=CustomObjectJSONDecoder,
)

domain_event = customized_sequenced_item_mapper.from_sequenced_item(
    SequencedItem(
        sequence_id=sequence1,
        position=0,
        topic='eventsourcing.domain.model.events#DomainEvent',
        data='{"foo":{"__set__":["bar","baz"]}}'
    )
)
assert domain_event.foo == set(["bar", "baz"])

sequenced_item = customized_sequenced_item_mapper.to_sequenced_item(domain_event)
assert sequenced_item.data.startswith('{"foo":{"__set__":["ba')

Application-level encryption

The SequencedItemMapper can be constructed with a symmetric cipher object. The library provides an AES cipher object class called AESCipher.

The AESCipher is given an encryption key, using constructor arg aes_key, which must be either 16, 24, or 32 random bytes (128, 192, or 256 bits). Longer keys take more time to encrypt plaintext, but produce more secure ciphertext. Generating and storing a secure key requires functionality beyond the scope of this library.

from eventsourcing.infrastructure.cipher.aes import AESCipher

cipher = AESCipher(aes_key=b'01234567890123456789012345678901')  # Key with 256 bits.

ciphertext = cipher.encrypt('plaintext')
plaintext = cipher.decrypt(ciphertext)

assert ciphertext != 'plaintext'
assert plaintext == 'plaintext'

If the SequencedItemMapper has an optional constructor arg cipher. If always_encrypt is True, then the state field of every stored event will be encrypted with the cipher.

# Construct sequenced item mapper to always encrypt domain events.
ciphered_sequenced_item_mapper = SequencedItemMapper(
    sequenced_item_class=StoredEvent,
    cipher=cipher,
    always_encrypt=True,
)

# Domain event attribute ``foo`` has value ``'bar'``.
assert domain_event1.foo == 'bar'

# Map the domain event to an encrypted stored event namedtuple.
stored_event = ciphered_sequenced_item_mapper.to_sequenced_item(domain_event1)

# Attribute names and values of the domain event are not visible in the encrypted ``state`` field.
assert 'foo' not in stored_event.state
assert 'bar' not in stored_event.state

# Recover the domain event from the encrypted state.
domain_event = ciphered_sequenced_item_mapper.from_sequenced_item(stored_event)

# Domain event has decrypted attributes.
assert domain_event.foo == 'bar'

Please note, the sequence ID and position values are necessarily not encrypted. However, by encrypting the state of the event, sensitive information, such as personally identifiable information, will be encrypted at the level of the application, before being sent to the database, and so it will be encrypted in the database (and in all backups of the database).

Event store

The library’s EventStore provides an interface to the library’s cohesive mechanism for storing events as sequences of items, and can be used directly within an event sourced application to append and retrieve its domain events.

The EventStore is constructed with a sequenced item mapper and an active record strategy, both are discussed in detail in the sections above.

from eventsourcing.infrastructure.eventstore import EventStore

event_store = EventStore(
    sequenced_item_mapper=sequenced_item_mapper,
    active_record_strategy=active_record_strategy,
)

The event store’s append() method can append a domain event to its sequence. The event store uses the sequenced_item_mapper to obtain a sequenced item namedtuple from a domain events, and it uses the active_record_strategy to write a sequenced item to a database.

In the code below, a DomainEvent is appended to sequence aggregate1 at position 1.

from eventsourcing.domain.model.events import DomainEvent

event_store.append(
    DomainEvent(
        originator_id=aggregate1,
        originator_version=1,
        foo='baz',
    )
)

The event store’s method get_domain_events() is used to retrieve events that have previously been appended. The event store uses the active_record_strategy to read the sequenced items from a database, and it uses the sequenced_item_mapper to obtain domain events from the sequenced items.

results = event_store.get_domain_events(aggregate1)

Since by now two domain events have been stored, so there are two domain events in the results.

assert len(results) == 2

assert results[0].originator_id == aggregate1
assert results[0].foo == 'bar'

assert results[1].originator_id == aggregate1
assert results[1].foo == 'baz'

The optional arguments of get_domain_events() can be used to select some of the items in the sequence.

The lt arg is used to select items below the given position in the sequence.

The lte arg is used to select items below and at the given position in the sequence.

The gte arg is used to select items at and above the given position in the sequence.

The gt arg is used to select items above the given position in the sequence.

The limit arg is used to limit the number of items selected from the sequence.

The is_ascending arg is used when selecting items. It affects how any limit is applied, and determines the order of the results. Hence, it can affect both the content of the results and the performance of the method.

# Get events below and at position 0.
result = event_store.get_domain_events(aggregate1, lte=0)
assert len(result) == 1, result
assert result[0].originator_id == aggregate1
assert result[0].originator_version == 0
assert result[0].foo == 'bar'

# Get events at and above position 1.
result = event_store.get_domain_events(aggregate1, gte=1)
assert len(result) == 1, result
assert result[0].originator_id == aggregate1
assert result[0].originator_version == 1
assert result[0].foo == 'baz'

# Get the first event in the sequence.
result = event_store.get_domain_events(aggregate1, limit=1)
assert len(result) == 1, result
assert result[0].originator_id == aggregate1
assert result[0].originator_version == 0
assert result[0].foo == 'bar'

# Get the last event in the sequence.
result = event_store.get_domain_events(aggregate1, limit=1, is_ascending=False)
assert len(result) == 1, result
assert result[0].originator_id == aggregate1
assert result[0].originator_version == 1
assert result[0].foo == 'baz'

Optimistic concurrency control

It is a feature of the event store that it isn’t possible successfully to append two events at the same position in the same sequence. This condition is coded as a ConcurrencyError since a correct program running in a single thread wouldn’t attempt to append an event that it had already successfully appended.

from eventsourcing.exceptions import ConcurrencyError

# Fail to append an event at the same position in the same sequence as a previous event.
try:
    event_store.append(
        DomainEvent(
            originator_id=aggregate1,
            originator_version=1,
            foo='baz',
        )
    )
except ConcurrencyError:
    pass
else:
    raise Exception("ConcurrencyError not raised")

This feature depends on the behaviour of the active record strategy’s append() method: the event store will raise a ConcurrencyError if a SequencedItemConflict is raised by its active record strategy.

If a command fails due to a concurrency error, the command can be retried with the lastest state. The @retry decorator can help code retries on commands.

from eventsourcing.domain.model.decorators import retry

errors = []

@retry(ConcurrencyError, max_retries=5)
def set_password():
    exc = ConcurrencyError()
    errors.append(exc)
    raise exc

try:
    set_password()
except ConcurrencyError:
    pass
else:
    raise Exception("Shouldn't get here")

assert len(errors) == 5

Event store factory

As a convenience, the library function construct_sqlalchemy_eventstore() can be used to construct an event store that uses the SQLAlchemy classes.

from eventsourcing.infrastructure.sqlalchemy import factory

event_store = factory.construct_sqlalchemy_eventstore(session=datastore.session)

By default, the event store is constructed with the StoredEvent sequenced item namedtuple, and the active record class StoredEventRecord. The optional args sequenced_item_class and active_record_class can be used to construct different kinds of event store.

Timestamped event store

The examples so far have used an integer sequenced event store, where the items are sequenced by integer version.

The example below constructs an event store for timestamp-sequenced domain events, using the library active record class TimestampedSequencedItemRecord.

import time
from uuid import uuid4

from eventsourcing.infrastructure.sqlalchemy.activerecords import TimestampSequencedItemRecord

# Setup database table for timestamped sequenced items.
datastore.setup_table(TimestampSequencedItemRecord)

# Construct event store for timestamp sequenced events.
timestamped_event_store = factory.construct_sqlalchemy_eventstore(
    sequenced_item_class=SequencedItem,
    active_record_class=TimestampSequencedItemRecord,
    sequence_id_attr_name='originator_id',
    position_attr_name='timestamp',
    session=datastore.session,
)

# Construct an event.
aggregate_id = uuid4()
event = DomainEvent(
    originator_id=aggregate_id,
    timestamp=time.time(),
)

# Store the event.
timestamped_event_store.append(event)

# Check the event was stored.
events = timestamped_event_store.get_domain_events(aggregate_id)
assert len(events) == 1
assert events[0].originator_id == aggregate_id
assert events[0].timestamp < time.time()

Please note, optimistic concurrent control doesn’t work to maintain entity consistency, because each event is likely to have a unique timestamp, and so conflicts are very unlikely to arise when concurrent operations appending to the same sequence. For this reason, although domain events can be timestamped, it is not a very good idea to store the events of an entity or aggregate as timestamp-sequenced items. Timestamp-sequenced items are useful for storing events that are logically independent of others, such as messages in a log, things that do not risk causing a consistency error due to concurrent operations.

construct an event store that uses the Apache Cassandra classes.