Optimistic concurrency control

Because of the unique constraint on the sequenced item table, it isn’t possible to branch the evolution of an entity and store two events at the same version. Hence, if the entity you are working on has been updated elsewhere, an attempt to update your object will raise a concurrency exception.

Application and infrastructure

Set up infrastructure using library classes.

from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemySettings, SQLAlchemyDatastore
from eventsourcing.infrastructure.sqlalchemy.activerecords import IntegerSequencedItemRecord

datastore = SQLAlchemyDatastore(
    settings=SQLAlchemySettings(uri='sqlite:///:memory:'),
    tables=(IntegerSequencedItemRecord,),
)

datastore.setup_connection()
datastore.setup_tables()

Define a factory that uses library classes to construct an application object.

from eventsourcing.example.application import ExampleApplication
from eventsourcing.infrastructure.sqlalchemy.activerecords import SQLAlchemyActiveRecordStrategy
from eventsourcing.infrastructure.sequenceditem import SequencedItem

def construct_example_application(session):
    active_record_strategy = SQLAlchemyActiveRecordStrategy(
        active_record_class=IntegerSequencedItemRecord,
        sequenced_item_class=SequencedItem,
        session=session
    )
    app = ExampleApplication(
        entity_active_record_strategy=active_record_strategy
    )
    return app

Run the code

Use the application to get two instances of the same entity, and try to change them independently.

from eventsourcing.exceptions import ConcurrencyError

with construct_example_application(datastore.session) as app:

    entity = app.create_new_example(foo='bar1')

    a = app.example_repository[entity.id]
    b = app.example_repository[entity.id]

    # Change the entity using instance 'a'.
    a.foo = 'bar2'

    # Because 'a' has been changed since 'b' was obtained,
    # 'b' cannot be updated unless it is firstly refreshed.
    try:
        b.foo = 'bar3'
    except ConcurrencyError:
        pass
    else:
        raise Exception("Failed to control concurrency of 'b'.")

    # Refresh object 'b', so that 'b' has the current state of the entity.
    b = app.example_repository[entity.id]
    assert b.foo == 'bar2'

    # Changing the entity using instance 'b' now works because 'b' is up to date.
    b.foo = 'bar3'
    assert app.example_repository[entity.id].foo == 'bar3'

    # Now 'a' does not have the current state of the entity, and cannot be changed.
    try:
        a.foo = 'bar4'
    except ConcurrencyError:
        pass
    else:
        raise Exception("Failed to control concurrency of 'a'.")

Cassandra

The Cassandra database management system, which implements the Paxos protocol, can (allegedly) accomplish linearly-scalable distributed optimistic concurrency control, guaranteeing sequential consistency of the events of an entity. It is also possible to serialize calls to the methods of an entity, but that is out of the scope of this package — if you wish to do that, perhaps something like Zookeeper might help.