Snapshots provide a fast path for obtaining the state of an entity or aggregate that skips replaying some or all of the entity’s events.

If the library repository class EventSourcedRepository is constructed with a snapshot strategy object, it will try to get the closest snapshot to the required version of a requested entity, and then replay only those events that will take the snapshot up to the state at that version.

Snapshots can be taken manually. To automatically generate snapshots, a snapshotting policy can take snapshots whenever a particular condition occurs, for example after every ten events.


To avoid duplicating code from the previous sections, let’s use the example entity class Example and its factory function create_new_example() from the library.

from eventsourcing.example.domainmodel import Example, create_new_example


The library class SnapshottingApplication, extends Application by setting up the application for snapshotting with a snapshot store, a dedicated table for snapshots, and a policy to take snapshots every so many events. A separate table is used for snapshot records.

from eventsourcing.application.snapshotting import SnapshottingApplication
from eventsourcing.application.sqlalchemy import SQLAlchemyApplication

class MyApplication(SQLAlchemyApplication, SnapshottingApplication):

Run the code

In the example below, snapshots of entities are taken every period number of events.

with MyApplication(snapshot_period=2, persist_event_type=Example.Event) as app:

    # Create an entity.
    entity = create_new_example(foo='bar1')

    # Check there's no snapshot, only one event so far.
    snapshot = app.snapshot_strategy.get_snapshot(
    assert snapshot is None

    # Change an attribute, generates a second event. = 'bar2'

    # Check the snapshot.
    snapshot = app.snapshot_strategy.get_snapshot(
    assert snapshot is not None
    assert snapshot.state['_foo'] == 'bar2'

    # Check can recover entity using snapshot.
    assert in app.repository
    assert app.repository[].foo == 'bar2'

    # Check snapshot after five events. = 'bar3' = 'bar4' = 'bar5'
    snapshot = app.snapshot_strategy.get_snapshot(
    assert snapshot.state['_foo'] == 'bar4'

    # Check snapshot after seven events. = 'bar6' = 'bar7'
    assert app.repository[].foo == 'bar7'
    snapshot = app.snapshot_strategy.get_snapshot(
    assert snapshot.state['_foo'] == 'bar6'

    # Check snapshot state is None after discarding the entity on the eighth event.
    assert not in app.repository
    snapshot = app.snapshot_strategy.get_snapshot(
    assert snapshot.state is None

    except KeyError:
        raise Exception('KeyError was not raised')

    # Get historical snapshots.
    snapshot = app.snapshot_strategy.get_snapshot(, lte=2)
    assert snapshot.state['___version__'] == 1  # one behind
    assert snapshot.state['_foo'] == 'bar2'

    snapshot = app.snapshot_strategy.get_snapshot(, lte=3)
    assert snapshot.state['___version__'] == 3
    assert snapshot.state['_foo'] == 'bar4'

    # Get historical entities.
    entity = app.repository.get_entity(, at=0)
    assert entity.__version__ == 0
    assert == 'bar1',

    entity = app.repository.get_entity(, at=1)
    assert entity.__version__ == 1
    assert == 'bar2',

    entity = app.repository.get_entity(, at=2)
    assert entity.__version__ == 2
    assert == 'bar3',

    entity = app.repository.get_entity(, at=3)
    assert entity.__version__ == 3
    assert == 'bar4',