Snapshotting¶
Snapshots provide a fast path for obtaining the state of an entity or aggregate that skips replaying some or all of the entity’s events.
If the library repository class EventSourcedRepository
is constructed with a
snapshot strategy object, it will try to get the closest snapshot to the required
version of a requested entity, and then replay only those events that will take
the snapshot up to the state at that version.
Snapshots can be taken manually. To automatically generate snapshots, a snapshotting policy can take snapshots whenever a particular condition occurs, for example after every ten events.
Domain¶
To avoid duplicating code from the previous sections, let’s
use the example entity class Example
and its factory function create_new_example()
from the library.
from eventsourcing.example.domainmodel import Example, create_new_example
Application¶
The library class SnapshottingApplication
,
extends Application
by setting up
the application for snapshotting with a snapshot store, a dedicated table for
snapshots, and a policy to take snapshots every so many events. A separate
table is used for snapshot records.
from eventsourcing.application.snapshotting import SnapshottingApplication
from eventsourcing.application.sqlalchemy import SQLAlchemyApplication
class MyApplication(SQLAlchemyApplication, SnapshottingApplication):
pass
Run the code¶
In the example below, snapshots of entities are taken every period
number of
events.
with MyApplication(snapshot_period=2, persist_event_type=Example.Event) as app:
# Create an entity.
entity = create_new_example(foo='bar1')
# Check there's no snapshot, only one event so far.
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot is None
# Change an attribute, generates a second event.
entity.foo = 'bar2'
# Check the snapshot.
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot is not None
assert snapshot.state['_foo'] == 'bar2'
# Check can recover entity using snapshot.
assert entity.id in app.repository
assert app.repository[entity.id].foo == 'bar2'
# Check snapshot after five events.
entity.foo = 'bar3'
entity.foo = 'bar4'
entity.foo = 'bar5'
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state['_foo'] == 'bar4'
# Check snapshot after seven events.
entity.foo = 'bar6'
entity.foo = 'bar7'
assert app.repository[entity.id].foo == 'bar7'
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state['_foo'] == 'bar6'
# Check snapshot state is None after discarding the entity on the eighth event.
entity.__discard__()
assert entity.id not in app.repository
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state is None
try:
app.repository[entity.id]
except KeyError:
pass
else:
raise Exception('KeyError was not raised')
# Get historical snapshots.
snapshot = app.snapshot_strategy.get_snapshot(entity.id, lte=2)
assert snapshot.state['___version__'] == 1 # one behind
assert snapshot.state['_foo'] == 'bar2'
snapshot = app.snapshot_strategy.get_snapshot(entity.id, lte=3)
assert snapshot.state['___version__'] == 3
assert snapshot.state['_foo'] == 'bar4'
# Get historical entities.
entity = app.repository.get_entity(entity.id, at=0)
assert entity.__version__ == 0
assert entity.foo == 'bar1', entity.foo
entity = app.repository.get_entity(entity.id, at=1)
assert entity.__version__ == 1
assert entity.foo == 'bar2', entity.foo
entity = app.repository.get_entity(entity.id, at=2)
assert entity.__version__ == 2
assert entity.foo == 'bar3', entity.foo
entity = app.repository.get_entity(entity.id, at=3)
assert entity.__version__ == 3
assert entity.foo == 'bar4', entity.foo