Infrastructure¶
The library’s infrastructure layer provides a cohesive mechanism for storing events as sequences of items.
The entire mechanism is encapsulated by the library’s
EventStore
class. The event store uses a sequenced item mapper and a
record manager.
The sequenced item mapper converts objects such as domain events to sequenced items, and the record manager writes sequenced items to database records. The sequenced item mapper and the record manager operate by reflection off a common sequenced item type.
Sequenced item type¶
Sequenced item types are declared as named tuples (namedtuple
from collections
).
Below is an example of a sequenced item named tuple.
from collections import namedtuple
SequencedItem = namedtuple('SequencedItem', ['sequence_id', 'position', 'topic', 'data'])
The fields can be named differently, however a suitable database table will have matching column names.
Whatever the names of the fields, the first field of a sequenced item will represent the identity of a sequence to which an item belongs. The second field will represent the position of the item in its sequence. The third field will represent a topic to which the item pertains. And the fourth field will represent the data associated with the item.
SequencedItem namedtuple¶
The library provides a sequenced item named tuple called
SequencedItem
.
from eventsourcing.infrastructure.sequenceditem import SequencedItem
Like in the example above, the library’s SequencedItem
namedtuple has four fields. The
sequence_id
identifies the sequence in which the item belongs. The position
identifies the position of the item in its sequence. The topic
identifies a
dimension of concern to which the item pertains. The data
holds the data associated
with the item.
A sequenced item is just a tuple, and can be used as such. In the example
below, a sequenced item happens to be constructed with a UUID to identify
a sequence. The item has also been given an integer position value, it has a
topic that happens to correspond to a domain event class in the library. The
item’s data is a JSON object in which foo
is bar
.
from uuid import uuid4
sequence1 = uuid4()
sequenced_item1 = SequencedItem(
sequence_id=sequence1,
position=0,
topic='eventsourcing.domain.model.events#DomainEvent',
data='{"foo":"bar"}',
)
As expected, the attributes of the sequenced item object are simply the values given when the object was constructed.
assert sequenced_item1.sequence_id == sequence1
assert sequenced_item1.position == 0
assert sequenced_item1.topic == 'eventsourcing.domain.model.events#DomainEvent'
assert sequenced_item1.data == '{"foo":"bar"}'
StoredEvent namedtuple¶
The library provides a sequenced item named tuple called StoredEvent
. The attributes of the
StoredEvent
namedtuple are originator_id
, originator_version
, event_type
, and state
.
The originator_id
is the ID of the aggregate that published the event, and is equivalent to sequence_id
above.
The originator_version
is the version of the aggregate that published the event, and is equivalent to
position
above.
The event_type
identifies the class of the domain event that is stored, and is equivalent to topic
above.
The state
holds the state of the domain event, and is equivalent to data
above.
from eventsourcing.infrastructure.sequenceditem import StoredEvent
aggregate1 = uuid4()
stored_event1 = StoredEvent(
originator_id=aggregate1,
originator_version=0,
event_type='eventsourcing.domain.model.events#DomainEvent',
state='{"foo":"bar"}',
)
assert stored_event1.originator_id == aggregate1
assert stored_event1.originator_version == 0
assert stored_event1.event_type == 'eventsourcing.domain.model.events#DomainEvent'
assert stored_event1.state == '{"foo":"bar"}'
Sequenced item mapper¶
The event store uses a sequenced item mapper to map between sequenced items and application-level objects such as domain events.
The library provides a sequenced item mapper object class called SequencedItemMapper
.
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
The SequencedItemMapper
has a constructor arg sequenced_item_class
, which defaults to the library’s
sequenced item named tuple SequencedItem
.
sequenced_item_mapper = SequencedItemMapper()
The method from_sequenced_item()
can be used to convert sequenced item objects to application-level objects.
domain_event = sequenced_item_mapper.from_sequenced_item(sequenced_item1)
assert domain_event.foo == 'bar'
The method to_sequenced_item()
can be used to convert application-level objects to sequenced item named tuples.
assert sequenced_item_mapper.to_sequenced_item(domain_event).data == sequenced_item1.data
If the names of the first two fields of the sequenced item named tuple (e.g. sequence_id
and position
) do not
match the names of the attributes of the application-level object which identify a sequence and a position (e.g.
originator_id
and originator_version
) then the attribute names can be given to the sequenced item mapper
using constructor args sequence_id_attr_name
and position_attr_name
.
from eventsourcing.domain.model.events import DomainEvent
domain_event1 = DomainEvent(
originator_id=aggregate1,
originator_version=1,
foo='baz',
)
sequenced_item_mapper = SequencedItemMapper(
sequence_id_attr_name='originator_id',
position_attr_name='originator_version'
)
assert domain_event1.foo == 'baz'
assert sequenced_item_mapper.to_sequenced_item(domain_event1).sequence_id == aggregate1
Alternatively, a sequenced item named tuple type that is different from the
default SequencedItem
namedtuple, for example the library’s StoredEvent
namedtuple, can be passed with the constructor arg sequenced_item_class
.
sequenced_item_mapper = SequencedItemMapper(
sequenced_item_class=StoredEvent
)
domain_event1 = sequenced_item_mapper.from_sequenced_item(stored_event1)
assert domain_event1.foo == 'bar', domain_event1
Since the alternative StoredEvent
namedtuple can be used instead of the default
SequencedItem
namedtuple, so it is possible to use a custom named tuple.
Which alternative you use for your project depends on your preferences for the names
in the your domain events and your persistence model.
Please note, it is required of these application-level objects that the “topic” generated by
get_topic()
from the object class is resolved by resolve_topic()
back to the same object class.
from eventsourcing.domain.model.events import Created
from eventsourcing.utils.topic import get_topic, resolve_topic
topic = get_topic(Created)
assert resolve_topic(topic) == Created
assert topic == 'eventsourcing.domain.model.events#Created'
Custom JSON transcoding¶
The SequencedItemMapper
can be constructed with optional args json_encoder_class
and
json_decoder_class
. The defaults are the library’s ObjectJSONEncoder
and
ObjectJSONDecoder
which can be extended to support types of value objects that are not
currently supported by the library.
The code below extends the JSON transcoding to support sets.
from eventsourcing.utils.transcoding import ObjectJSONEncoder, ObjectJSONDecoder
class CustomObjectJSONEncoder(ObjectJSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return {'__set__': list(obj)}
else:
return super(CustomObjectJSONEncoder, self).default(obj)
class CustomObjectJSONDecoder(ObjectJSONDecoder):
@classmethod
def from_jsonable(cls, d):
if '__set__' in d:
return cls._decode_set(d)
else:
return ObjectJSONDecoder.from_jsonable(d)
@staticmethod
def _decode_set(d):
return set(d['__set__'])
customized_sequenced_item_mapper = SequencedItemMapper(
json_encoder_class=CustomObjectJSONEncoder,
json_decoder_class=CustomObjectJSONDecoder
)
domain_event = customized_sequenced_item_mapper.from_sequenced_item(
SequencedItem(
sequence_id=sequence1,
position=0,
topic='eventsourcing.domain.model.events#DomainEvent',
data='{"foo":{"__set__":["bar","baz"]}}',
)
)
assert domain_event.foo == set(["bar", "baz"])
sequenced_item = customized_sequenced_item_mapper.to_sequenced_item(domain_event)
assert sequenced_item.data.startswith('{"foo":{"__set__":["ba')
Application-level encryption¶
The SequencedItemMapper
can be constructed with a symmetric cipher. If
a cipher is given, then the state
field of every sequenced item will be
encrypted before being sent to the database. The data retrieved from the
database will be decrypted and verified, which protects against tampering.
The library provides an AES cipher object class called AESCipher
. It
uses the AES cipher from the Python Cryptography Toolkit, as forked by
the actively maintained PyCryptodome project.
The AESCipher
class uses AES in GCM mode, which is a padding-less,
authenticated encryption mode. Other AES modes aren’t supported by this
class, at the moment.
The AESCipher
constructor arg cipher_key
is required. The key must
be either 16, 24, or 32 random bytes (128, 192, or 256 bits). Longer keys
take more time to encrypt plaintext, but produce more secure ciphertext.
Generating and storing a secure key requires functionality beyond the scope of this library.
However, the utils package does contain a function encode_random_bytes()
that may help
to generate a unicode key string, representing random bytes encoded with Base64. A companion
function decode_random_bytes()
decodes the unicode key string into a sequence of bytes.
from eventsourcing.utils.cipher.aes import AESCipher
from eventsourcing.utils.random import encode_random_bytes, decode_random_bytes
# Unicode string representing 256 random bits encoded with Base64.
cipher_key = encode_random_bytes(num_bytes=32)
# Construct AES-256 cipher.
cipher = AESCipher(cipher_key=decode_random_bytes(cipher_key))
# Encrypt some plaintext (using nonce arguments).
ciphertext = cipher.encrypt('plaintext')
assert ciphertext != 'plaintext'
# Decrypt some ciphertext.
plaintext = cipher.decrypt(ciphertext)
assert plaintext == 'plaintext'
The SequencedItemMapper
has constructor arg cipher
, which can
be used to pass in a cipher object, and thereby enable encryption.
# Construct sequenced item mapper to always encrypt domain events.
ciphered_sequenced_item_mapper = SequencedItemMapper(
sequenced_item_class=StoredEvent,
cipher=cipher,
)
# Domain event attribute ``foo`` has value ``'bar'``.
assert domain_event1.foo == 'bar'
# Map the domain event to an encrypted stored event namedtuple.
stored_event = ciphered_sequenced_item_mapper.to_sequenced_item(domain_event1)
# Attribute names and values of the domain event are not visible in the encrypted ``state`` field.
assert 'foo' not in stored_event.state
assert 'bar' not in stored_event.state
# Recover the domain event from the encrypted state.
domain_event = ciphered_sequenced_item_mapper.from_sequenced_item(stored_event)
# Domain event has decrypted attributes.
assert domain_event.foo == 'bar'
Please note, the sequence ID and position values are not encrypted, necessarily. However, by encrypting the state of the item within the application, potentially sensitive information, for example personally identifiable information, will be encrypted in transit to the database, at rest in the database, and in all backups and other copies.
Record managers¶
The event store uses a record manager to write sequenced items to database records.
The library has an abstract base class AbstractActiveRecordManager
with abstract methods append()
and
get_items()
, which can be used on concrete implementations to read and write sequenced items in a
database.
A record manager is constructed with a sequenced_item_class
and a matching
record_class
. The field names of a suitable record class will match the field names of the
sequenced item named tuple.
SQLAlchemy¶
The library has a record manager for SQLAlchemy provided by the object class
SQLAlchemyRecordManager
.
To run the example below, please install the library with the ‘sqlalchemy’ option.
$ pip install eventsourcing[sqlalchemy]
The library provides record classes for SQLAlchemy, such as IntegerSequencedRecord
and
StoredEventRecord
. The IntegerSequencedRecord
class matches the default SequencedItem
namedtuple. The StoredEventRecord
class matches the alternative StoredEvent
namedtuple.
There is also a TimestampSequencedRecord
and a SnapshotRecord
.
The code below uses the namedtuple StoredEvent
and the record class StoredEventRecord
.
from eventsourcing.infrastructure.sqlalchemy.records import StoredEventRecord
Database settings can be configured using SQLAlchemySettings
, which is constructed with a uri
connection
string. The code below uses an in-memory SQLite database.
from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemySettings
settings = SQLAlchemySettings(uri='sqlite:///:memory:')
To help setup a database connection and tables, the library has object class SQLAlchemyDatastore
.
The SQLAlchemyDatastore
is constructed with the settings
object,
and a tuple of record classes passed using the tables
arg.
from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemyDatastore
datastore = SQLAlchemyDatastore(
settings=settings,
tables=(StoredEventRecord,)
)
Please note, if you have declared your own SQLAlchemy model Base
class, you may wish to define your own
record classes which inherit from your Base
class. If so, if may help to refer to the library record
classes to see how SQLALchemy ORM columns and indexes can be used to persist sequenced items.
The methods setup_connection()
and setup_tables()
of the datastore object
can be used to setup the database connection and the tables.
datastore.setup_connection()
datastore.setup_tables()
As well as sequenced_item_class
and a matching record_class
, the SQLAlchemyRecordManager
requires a scoped session object, passed using the constructor arg session
. For convenience, the
SQLAlchemyDatabase
has a thread-scoped session facade set as its a session
attribute. You may
wish to use a different scoped session facade, such as a request-scoped session object provided by a Web
framework.
With the database setup, the SQLAlchemyRecordManager
can be constructed,
and used to store events using SQLAlchemy.
from eventsourcing.infrastructure.sqlalchemy.manager import SQLAlchemyRecordManager
record_manager = SQLAlchemyRecordManager(
sequenced_item_class=StoredEvent,
record_class=StoredEventRecord,
session=datastore.session,
contiguous_record_ids=True,
application_id=uuid4()
)
Sequenced items (or “stored events” in this example) can be appended to the database
using the append()
method of the record manager.
record_manager.append(stored_event1)
(Please note, since the position is given by the sequenced item itself, the word “append” means here “to add something extra” rather than the perhaps more common but stricter meaning “to add to the end of a document”. That is, the database is deliberately not responsible for positioning a new item at the end of a sequence. So perhaps “save” would be a better name for this operation.)
All the previously appended items of a sequence can be retrieved by using the get_items()
method.
results = record_manager.list_items(aggregate1)
Since by now only one item was stored, so there is only one item in the results.
assert len(results) == 1
assert results[0] == stored_event1
SQLAlchemy dialects¶
The databases supported by core SQLAlchemy dialects are Firebird, Microsoft SQL Server, MySQL, Oracle, PostgreSQL, SQLite, and Sybase. This library’s infrastructure classes for SQLAlchemy have been tested with MySQL, PostgreSQL, and SQLite.
MySQL¶
For MySQL, the Python package mysql-connector-python-rf can be used (licenced GPL v2). Please note, I had problems running this driver with Python 2.7 (unicode error when it raises exceptions).
$ pip install pymysql-connector-python-rf
The uri
for MySQL used with this driver would look something like this.
mysql+pymysql://username:password@localhost/eventsourcing
Alternatively for MySQL, the Python package mysqlclient can be used (also licenced GPL v2). I didn’t have problems using this driver with Python 2.7.
$ pip install mysqlclient
The uri
for MySQL used with this driver would look something like this.
mysql+mysqldb://username:password@localhost/eventsourcing
Another alternative is PyMySQL. It has a BSD licence.
$ pip install PyMySQL
The uri
for MySQL used with this driver would look something like this.
mysql+pymysql://username:password@localhost/eventsourcing
PostgreSQL¶
For PostgreSQL, the Python package psycopg2 can be used.
$ pip install psycopg2
The uri
for PostgreSQL used with this driver would look something like this.
postgresql+psycopg2://username:password@localhost:5432/eventsourcing
SQLite¶
SQLite is shipped with core Python packages, so nothing extra needs to be installed.
The uri
for a temporary SQLite database might look something like this.
sqlite:::////tmp/eventsourcing.db
Please note, the library’s SQLAlchemy insfrastructure defaults to using an in memory SQLite database, which is the fastest way to run the library, and is recommended as a convenience for development.
Django ORM¶
The library has a record manager for the Django ORM provided by
DjangoRecordManager
class.
To run the example below, please install the library with the ‘django’ option.
$ pip install eventsourcing[django]
For the DjangoRecordManager
, the IntegerSequencedRecord
from eventsourcing.infrastructure.django.models
matches the SequencedItem
namedtuple. The StoredEventRecord
from the same module matches the StoredEvent
namedtuple. There is also a TimestampSequencedRecord
and a SnapshotRecord
.
These are all Django models.
The package eventsourcing.infrastructure.django
is a little Django app. To involve
its models in your Django project, simply include the application in your project’s list
of INSTALLED_APPS
.
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'eventsourcing.infrastructure.django'
]
Alternatively, import or write the classes you want into one of your own Django app’s models.py
.
The Django application at eventsourcing.infrastructure.django
has database
migrations that will add four tables, one for each of the
record classes mentioned above. So if you use the application directly in
INSTALLED_APPS
then the app’s migrations will be picked up by Django.
If, instead of using the app directly, you import some of its model classes
into your own application’s models.py
, you will need to run
python manage.py makemigrations
before tables for event sourcing can be
created by Django. This way you can avoid creating tables you won’t use.
The library has a little Django project for testing the library’s Django app, it is used in this example to help run the library’s Django app.
import os
os.environ['DJANGO_SETTINGS_MODULE'] = 'eventsourcing.tests.djangoproject.djangoproject.settings'
This Django project is simply the files that django-admin.py startproject
generates, with the SQLite
database set to be in memory, and with the library’s Django app added to the INSTALLED_APPS
setting.
With the environment variable DJANGO_SETTINGS_MODULE
referring to the Django project, Django can be
started. If you aren’t running tests with the Django test runner, you may need to run django.setup()
.
import django
django.setup()
Before using the database, make sure the migrations have been applied, so the necessary database tables exist.
An alternative to python manage.py migrate
is the call_command()
function, provided by Django. If you aren’t running tests with the Django
test runner, this can help e.g. to setup an SQLite database in memory
before each test by calling it in the setUp()
method of a test case.
from django.core.management import call_command
call_command('migrate', verbosity=0, interactive=False)
So long as a table exists for its record class, the DjangoRecordManager
can be used to store events using the Django ORM.
from eventsourcing.infrastructure.django.manager import DjangoRecordManager
from eventsourcing.infrastructure.django.models import StoredEventRecord
django_record_manager = DjangoRecordManager(
record_class=StoredEventRecord,
sequenced_item_class=StoredEvent,
contiguous_record_ids=True
)
results = django_record_manager.list_items(aggregate1)
assert len(results) == 0
django_record_manager.append(stored_event1)
results = django_record_manager.list_items(aggregate1)
assert results[0] == stored_event1
Django backends¶
The supported Django backends are PostgreSQL, MySQL, SQLite, and Oracle. This library’s Django infrastructure classes have been tested with PostgreSQL, MySQL, SQLite.
Contiguous record IDs¶
The contiguous_record_ids
argument, used in the examples above, is
optional, and is by default False
. If set to a True
value, and
if the record class has an ID field, then the records will be inserted
(using an “insert select from” query) that generates a table of records
with IDs that form a contiguous integer sequence.
Application events recorded in this way can be accurately followed as a single sequence without overbearing complexity to mitigate gaps and race conditions. This feature is only available on the relational record managers (Django and SQLAlchemy, not Cassandra).
If the record ID is merely auto-incrementing, as it is when the the library’s integer sequenced record classes are used without this feature being enabled, then gaps could be generated. Whenever there is contention in the aggregate sequence (record ID) that causes the unique record ID constraint to be violated, the transaction will being rolled back, and an ID that was issued was could be discarded and lost. Other greater IDs may already have been issued. The complexity for followers is that a gap may be permanent or temporary. It may be that a gap is eventually filled by a transaction that was somehow delayed. Although some database appear to have auto-incrementing functionaliuty that does not lead to gaps even with transactions being rolled back, I don’t understand when this happens and when it doesn’t and so feel unable to reply on it, at least at the moment. It appears to be an inherently unreliable situation that could probably be mitigated satisfactorily by followers if they need to project the application events accurately, but only with increased complexity.
Each relational record manager has a raw SQL query with an “insert select from” statement. If possible, the raw query is compiled when the record manager object is constructed. When a record is inserted, the new field values are bound to the raw query and executed within a transaction. When executed, the query firstly selects the maximum ID from all records currently existing in the table (as visible in its transaction), and then attempts to insert a record with an ID value of the max existing ID plus one (the next unused ID). The record table must have a unique constraint for the ID, so that records aren’t overwritten by this query. The record ID must also be indexed, so that the max value can be identified efficiently. The b-tree commonly used for databases indexes supports this purpose well. The transaction isolation level must be at least “read committed”, which is true by default for MySQL and PostgreSQL.
Any resulting contention in the record ID will raise an exception so that the
query can be retried. The library exception class RecordConflictError
will
be raised.
Cassandra¶
The library has a record manager for
Apache Cassandra
provided by the CassandraRecordManager
class.
from eventsourcing.infrastructure.cassandra.manager import CassandraRecordManager
To run the example below, please install the library with the ‘cassandra’ option.
$ pip install eventsourcing[cassandra]
It takes a while to build the driver. If you want to do that last step
quickly, set the environment variable CASS_DRIVER_NO_CYTHON
.
$ CASS_DRIVER_NO_CYTHON=1 pip install eventsourcing[cassandra]
For the CassandraRecordManager
, the IntegerSequencedRecord
from eventsourcing.infrastructure.cassandra.models
matches the SequencedItem
namedtuple. The StoredEventRecord
from the same module matches the StoredEvent
namedtuple. There is also a TimestampSequencedRecord
, a TimeuuidSequencedRecord
,
and a SnapshotRecord
.
The CassandraDatastore
and CassandraSettings
can be used in the same was as
SQLAlchemyDatastore
and SQLAlchemySettings
above. Please investigate
library class CassandraSettings
for information about configuring away from default settings.
from eventsourcing.infrastructure.cassandra.datastore import CassandraDatastore, CassandraSettings
from eventsourcing.infrastructure.cassandra.records import StoredEventRecord
cassandra_datastore = CassandraDatastore(
settings=CassandraSettings(),
tables=(StoredEventRecord,)
)
cassandra_datastore.setup_connection()
cassandra_datastore.setup_tables()
With the database setup, the CassandraRecordManager
can be constructed,
and used to store events using Apache Cassandra.
from eventsourcing.infrastructure.cassandra.manager import CassandraRecordManager
cassandra_record_manager = CassandraRecordManager(
record_class=StoredEventRecord,
sequenced_item_class=StoredEvent,
)
results = cassandra_record_manager.list_items(aggregate1)
assert len(results) == 0
cassandra_record_manager.append(stored_event1)
results = cassandra_record_manager.list_items(aggregate1)
assert results[0] == stored_event1
cassandra_datastore.drop_tables()
cassandra_datastore.close_connection()
Sequenced item conflicts¶
It is a common feature of the record manager classes that it isn’t possible successfully
to append two items at the same position in the same sequence. If such an attempt is made, a
RecordConflictError
will be raised.
from eventsourcing.exceptions import RecordConflictError
# Fail to append an item at the same position in the same sequence as a previous item.
try:
record_manager.append(stored_event1)
except RecordConflictError:
pass
else:
raise Exception("RecordConflictError not raised")
This feature is implemented using optimistic concurrency control features of the underlying database. With SQLAlchemy, a unique constraint is used that involves both the sequence and the position columns. The Django ORM strategy works in the same way.
With Cassandra the position is the primary key in the sequence partition, and the “IF NOT EXISTS” feature is applied. The Cassandra database management system implements the Paxos protocol, and can thereby accomplish linearly-scalable distributed optimistic concurrency control, guaranteeing sequential consistency of the events of an entity despite the database being distributed. It is also possible to serialize calls to the methods of an entity, but that is out of the scope of this package — if you wish to do that, perhaps something like an actor framework or Zookeeper might help.
Event store¶
The library’s EventStore
provides an interface to the library’s cohesive mechanism for storing events as sequences
of items, and can be used directly within an event sourced application to append and retrieve its domain events.
The EventStore
is constructed with a sequenced item mapper and an
record manager, both are discussed in detail in the sections above.
from eventsourcing.infrastructure.eventstore import EventStore
event_store = EventStore(
sequenced_item_mapper=sequenced_item_mapper,
record_manager=record_manager,
)
The event store’s append()
method can append a domain event to its sequence. The event store uses the
sequenced_item_mapper
to obtain a sequenced item named tuple from a domain events, and it uses the
record_manager
to write a sequenced item to a database.
In the code below, a DomainEvent
is appended to sequence aggregate1
at position 1
.
event_store.append(
DomainEvent(
originator_id=aggregate1,
originator_version=1,
foo='baz',
)
)
The event store’s method get_domain_events()
is used to retrieve events that have previously been appended.
The event store uses the record_manager
to read the sequenced items from a database, and it
uses the sequenced_item_mapper
to obtain domain events from the sequenced items.
results = event_store.get_domain_events(aggregate1)
Since by now two domain events have been stored, so there are two domain events in the results.
assert len(results) == 2
assert results[0].foo == 'bar'
assert results[1].foo == 'baz'
The optional arguments of get_domain_events()
can be used to select some of the items in the sequence.
The lt
arg is used to select items below the given position in the sequence.
The lte
arg is used to select items below and at the given position in the sequence.
The gte
arg is used to select items at and above the given position in the sequence.
The gt
arg is used to select items above the given position in the sequence.
The limit
arg is used to limit the number of items selected from the sequence.
The is_ascending
arg is used when selecting items. It affects how any limit
is applied, and determines the
order of the results. Hence, it can affect both the content of the results and the performance of the method.
# Get events below and at position 0.
result = event_store.get_domain_events(aggregate1, lte=0)
assert len(result) == 1, result
assert result[0].foo == 'bar'
# Get events at and above position 1.
result = event_store.get_domain_events(aggregate1, gte=1)
assert len(result) == 1, result
assert result[0].foo == 'baz'
# Get the first event in the sequence.
result = event_store.get_domain_events(aggregate1, limit=1)
assert len(result) == 1, result
assert result[0].foo == 'bar'
# Get the last event in the sequence.
result = event_store.get_domain_events(aggregate1, limit=1, is_ascending=False)
assert len(result) == 1, result
assert result[0].foo == 'baz'
Optimistic concurrency control¶
It is a feature of the event store that it isn’t possible successfully to append two events at the same position in
the same sequence. This condition is coded as a ConcurrencyError
since a correct program running in a
single thread wouldn’t attempt to append an event that it had already successfully appended.
from eventsourcing.exceptions import ConcurrencyError
# Fail to append an event at the same position in the same sequence as a previous event.
try:
event_store.append(
DomainEvent(
originator_id=aggregate1,
originator_version=1,
foo='baz',
)
)
except ConcurrencyError:
pass
else:
raise Exception("ConcurrencyError not raised")
This feature depends on the behaviour of the record manager’s append()
method: the event store will
raise a ConcurrencyError
if a RecordConflictError
is raised by its record manager.
If a command fails due to a concurrency error, the command can be retried with the lastest state. The @retry
decorator can help code retries on commands.
from eventsourcing.domain.model.decorators import retry
errors = []
@retry(ConcurrencyError, max_attempts=5)
def set_password():
exc = ConcurrencyError()
errors.append(exc)
raise exc
try:
set_password()
except ConcurrencyError:
pass
else:
raise Exception("Shouldn't get here")
assert len(errors) == 5
This feature avoids the sequence of records being corrupted due to concurrent threads operating on the same aggregate. However, the result is that success of appending an event in such circumstances is only probabilistic with respect to concurrency conflicts. Concurrency conflicts can be avoided if all commands for a single aggregate are executed in series, for example by treating each aggregate as an actor within an actor framework, or with locks provided by something like Zookeeper.
Event store factory¶
As a convenience, the library function construct_sqlalchemy_eventstore()
can be used to construct an event store that uses the SQLAlchemy classes.
from eventsourcing.infrastructure.sqlalchemy import factory
event_store = factory.construct_sqlalchemy_eventstore(
session=datastore.session,
application_id=uuid4(),
contiguous_record_ids=True,
)
By default, the event store is constructed with the StoredEvent
sequenced item named tuple,
and the record class StoredEventRecord
. The optional args sequenced_item_class
and record_class
can be used to construct different kinds of event store.
Timestamped event store¶
The examples so far have used an integer sequenced event store, where the items are sequenced by integer version.
The example below constructs an event store for timestamp-sequenced domain events, using the library
record class TimestampSequencedRecord
.
from uuid import uuid4
from eventsourcing.infrastructure.sqlalchemy.records import TimestampSequencedRecord
from eventsourcing.utils.times import decimaltimestamp
# Setup database table for timestamped sequenced items.
datastore.setup_table(TimestampSequencedRecord)
# Construct event store for timestamp sequenced events.
timestamped_event_store = factory.construct_sqlalchemy_eventstore(
sequenced_item_class=SequencedItem,
record_class=TimestampSequencedRecord,
sequence_id_attr_name='originator_id',
position_attr_name='timestamp',
session=datastore.session,
)
# Construct an event.
aggregate_id = uuid4()
event = DomainEvent(
originator_id=aggregate_id,
timestamp=decimaltimestamp(),
)
# Store the event.
timestamped_event_store.append(event)
# Check the event was stored.
events = timestamped_event_store.get_domain_events(aggregate_id)
assert len(events) == 1
assert events[0].originator_id == aggregate_id
assert events[0].timestamp < decimaltimestamp()
Please note, optimistic concurrent control doesn’t work with timestamped sequenced items to maintain consistency of a domain entity, because each event is likely to have a unique timestamp, and so branches can occur without restraint. Optimistic concurrency control will prevent one timestamp sequenced event from overwritting another. For this reason, although domain events are usefully timestamped, it is not a very good idea to store the events of an entity or aggregate as timestamp-sequenced items. Timestamp-sequenced items are useful for storing events that are logically independent of others, such as messages in a log, things that do not risk causing a consistency error due to concurrent operations. It remains that timestamp sequenced items can happen to occur at the same timestamp, in which case there would be a concurrency error exception, and the event could be retried with a later timestamp.
TimeUUIDs¶
If throughput is so high that such conflicts are too frequent, the library also supports sequencing items by TimeUUID, which includes a random component that makes it very unlikely two events will conflict. This feature currently works with Apache Cassandra only. Tests exist in the library, other documentation is forthcoming.