Snapshotting¶
Snapshots provide a fast path for obtaining the state of an entity or aggregate that skips replaying some or all of the entity’s events.
If a repository is constructed with a snapshot strategy object, it will try to get the closest snapshot to the required version of a requested entity, and then replay only those events that will take the snapshot up to the state at that version.
It is recommended not to co-mingle saved snapshots with the entity event sequence.
Snapshots can be taken manually. To automatically generate snapshots, a snapshotting policy can take snapshots whenever a particular condition occurs, for example after every ten events.
Domain¶
To avoid duplicating code from the previous sections, let’s
use the example entity class Example
and its factory function create_new_example()
from the library.
from eventsourcing.example.domainmodel import Example, create_new_example
Application¶
The library class SnapshottingApplication
,
extends SimpleApplication
by setting up
infrastructure for snapshotting, such as a snapshot store, a dedicated table for
snapshots, and a policy to take snapshots every so many events.
from eventsourcing.application.simple import SnapshottingApplication
Run the code¶
In the example below, snapshots of entities are taken every period
number of
events.
with SnapshottingApplication(period=2) as app:
# Create an entity.
entity = create_new_example(foo='bar1')
# Check there's no snapshot, only one event so far.
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot is None
# Change an attribute, generates a second event.
entity.foo = 'bar2'
# Check the snapshot.
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state['_foo'] == 'bar2'
# Check can recover entity using snapshot.
assert entity.id in app.repository
assert app.repository[entity.id].foo == 'bar2'
# Check snapshot after five events.
entity.foo = 'bar3'
entity.foo = 'bar4'
entity.foo = 'bar5'
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state['_foo'] == 'bar4'
# Check snapshot after seven events.
entity.foo = 'bar6'
entity.foo = 'bar7'
assert app.repository[entity.id].foo == 'bar7'
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state['_foo'] == 'bar6'
# Check snapshot state is None after discarding the entity on the eighth event.
entity.__discard__()
assert entity.id not in app.repository
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state is None
try:
app.repository[entity.id]
except KeyError:
pass
else:
raise Exception('KeyError was not raised')
# Get historical snapshots.
snapshot = app.snapshot_strategy.get_snapshot(entity.id, lte=2)
assert snapshot.state['___version__'] == 1 # one behind
assert snapshot.state['_foo'] == 'bar2'
snapshot = app.snapshot_strategy.get_snapshot(entity.id, lte=3)
assert snapshot.state['___version__'] == 3
assert snapshot.state['_foo'] == 'bar4'
# Get historical entities.
entity = app.repository.get_entity(entity.id, at=0)
assert entity.__version__ == 0
assert entity.foo == 'bar1', entity.foo
entity = app.repository.get_entity(entity.id, at=1)
assert entity.__version__ == 1
assert entity.foo == 'bar2', entity.foo
entity = app.repository.get_entity(entity.id, at=2)
assert entity.__version__ == 2
assert entity.foo == 'bar3', entity.foo
entity = app.repository.get_entity(entity.id, at=3)
assert entity.__version__ == 3
assert entity.foo == 'bar4', entity.foo