Snapshotting

Snapshots provide a fast path for obtaining the state of an entity or aggregate that skips replaying some or all of the entity’s events.

If a repository is constructed with a snapshot strategy object, it will try to get the closest snapshot to the required version of a requested entity, and then replay only those events that will take the snapshot up to the state at that version.

It is recommended not to co-mingle saved snapshots with the entity event sequence.

Snapshots can be taken manually. To automatically generate snapshots, a snapshotting policy can take snapshots whenever a particular condition occurs, for example after every ten events.

Domain

To avoid duplicating code from the previous sections, let’s use the example entity class Example and its factory function create_new_example() from the library.

from eventsourcing.example.domainmodel import Example, create_new_example

Application

The library class SnapshottingApplication, extends SimpleApplication by setting up infrastructure for snapshotting, such as a snapshot store, a dedicated table for snapshots, and a policy to take snapshots every so many events.

from eventsourcing.application.simple import SnapshottingApplication

Run the code

In the example below, snapshots of entities are taken every period number of events.

with SnapshottingApplication(period=2) as app:

    # Create an entity.
    entity = create_new_example(foo='bar1')

    # Check there's no snapshot, only one event so far.
    snapshot = app.snapshot_strategy.get_snapshot(entity.id)
    assert snapshot is None

    # Change an attribute, generates a second event.
    entity.foo = 'bar2'

    # Check the snapshot.
    snapshot = app.snapshot_strategy.get_snapshot(entity.id)
    assert snapshot.state['_foo'] == 'bar2'

    # Check can recover entity using snapshot.
    assert entity.id in app.repository
    assert app.repository[entity.id].foo == 'bar2'

    # Check snapshot after five events.
    entity.foo = 'bar3'
    entity.foo = 'bar4'
    entity.foo = 'bar5'
    snapshot = app.snapshot_strategy.get_snapshot(entity.id)
    assert snapshot.state['_foo'] == 'bar4'

    # Check snapshot after seven events.
    entity.foo = 'bar6'
    entity.foo = 'bar7'
    assert app.repository[entity.id].foo == 'bar7'
    snapshot = app.snapshot_strategy.get_snapshot(entity.id)
    assert snapshot.state['_foo'] == 'bar6'

    # Check snapshot state is None after discarding the entity on the eighth event.
    entity.__discard__()
    assert entity.id not in app.repository
    snapshot = app.snapshot_strategy.get_snapshot(entity.id)
    assert snapshot.state is None

    try:
        app.repository[entity.id]
    except KeyError:
        pass
    else:
        raise Exception('KeyError was not raised')

    # Get historical snapshots.
    snapshot = app.snapshot_strategy.get_snapshot(entity.id, lte=2)
    assert snapshot.state['___version__'] == 1  # one behind
    assert snapshot.state['_foo'] == 'bar2'

    snapshot = app.snapshot_strategy.get_snapshot(entity.id, lte=3)
    assert snapshot.state['___version__'] == 3
    assert snapshot.state['_foo'] == 'bar4'

    # Get historical entities.
    entity = app.repository.get_entity(entity.id, at=0)
    assert entity.__version__ == 0
    assert entity.foo == 'bar1', entity.foo

    entity = app.repository.get_entity(entity.id, at=1)
    assert entity.__version__ == 1
    assert entity.foo == 'bar2', entity.foo

    entity = app.repository.get_entity(entity.id, at=2)
    assert entity.__version__ == 2
    assert entity.foo == 'bar3', entity.foo

    entity = app.repository.get_entity(entity.id, at=3)
    assert entity.__version__ == 3
    assert entity.foo == 'bar4', entity.foo