Snapshotting¶
To enable snapshots to be used when recovering an entity from a repository, construct an entity repository that has a snapshot strategy object (see below). It is recommended to store snapshots in a dedicated table.
To automatically generate snapshots, you could perhaps define a snapshotting policy, to take snapshots whenever a particular condition occurs.
Domain¶
To avoid duplicating code from the previous section, let’s
use the example entity class Example
and its factory function create_new_example()
from the library.
from eventsourcing.example.domainmodel import Example, create_new_example
Infrastructure¶
It is recommended not to store snapshots within the entity’s sequence of events,
but in a dedicated table for snapshots. So let’s setup a dedicated table
for snapshots using the library class
SnapshotRecord
,
as well as a table for the events of the entity.
from eventsourcing.infrastructure.sqlalchemy.activerecords import IntegerSequencedItemRecord, SnapshotRecord
from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemyDatastore, SQLAlchemySettings
datastore = SQLAlchemyDatastore(
settings=SQLAlchemySettings(uri='sqlite:///:memory:'),
tables=(IntegerSequencedItemRecord, SnapshotRecord,),
)
datastore.setup_connection()
datastore.setup_tables()
Application¶
Policy¶
Now let’s define a snapshotting policy object, so that snapshots of example entities are taken every so many events.
The class ExampleSnapshottingPolicy
below will take a snapshot of
the example entities every period
number of events, so that there will
never be more than period
number of events to replay when recovering the
entity. The default value of 2
is effective in the example below.
from eventsourcing.domain.model.events import subscribe, unsubscribe
class ExampleSnapshottingPolicy(object):
def __init__(self, example_repository, period=2):
self.example_repository = example_repository
self.period = period
subscribe(predicate=self.trigger, handler=self.take_snapshot)
def close(self):
unsubscribe(predicate=self.trigger, handler=self.take_snapshot)
def trigger(self, event):
return isinstance(event, Example.Event) and not (event.originator_version + 1) % self.period
def take_snapshot(self, event):
self.example_repository.take_snapshot(event.originator_id, lte=event.originator_version)
Because the event’s originator_version
is passed to the method take_snapshot()
,
with the argument lte
, the snapshot will reflect the entity as it existed just after
the event was applied. Even if a different thread operates on the same entity before the
snapshot is taken, the resulting snapshot is the same as it would have been otherwise.
Application object¶
The application class below extends the library class
ApplicationWithPersistencePolicies
,
which constructs the event stores and persistence policies we need. The supertype
has a policy to persist snapshots whenever they are taken. It also has as a policy
to persist the events of entities whenever they are published.
The example entity repository is constructed from library class
EventSourcedRepository
with a snapshot strategy, the integer sequenced event store, and a mutator function.
The snapshot strategy is constructed from library class
EventSourcedSnapshotStrategy
with an event store for snapshots that is provided by the supertype.
The application’s snapshotting policy is constructed with the example repository, which it needs in order to take snapshots.
from eventsourcing.application.base import ApplicationWithPersistencePolicies
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
from eventsourcing.infrastructure.snapshotting import EventSourcedSnapshotStrategy
from eventsourcing.infrastructure.sqlalchemy.activerecords import SQLAlchemyActiveRecordStrategy
class SnapshottedApplication(ApplicationWithPersistencePolicies):
def __init__(self, session):
# Construct event stores and persistence policies.
entity_active_record_strategy = SQLAlchemyActiveRecordStrategy(
active_record_class=IntegerSequencedItemRecord,
session=session,
)
snapshot_active_record_strategy = SQLAlchemyActiveRecordStrategy(
active_record_class=SnapshotRecord,
session=session,
)
super(SnapshottedApplication, self).__init__(
entity_active_record_strategy=entity_active_record_strategy,
snapshot_active_record_strategy=snapshot_active_record_strategy,
)
# Construct snapshot strategy.
self.snapshot_strategy = EventSourcedSnapshotStrategy(
event_store=self.snapshot_event_store
)
# Construct the entity repository, this time with the snapshot strategy.
self.example_repository = EventSourcedRepository(
event_store=self.entity_event_store,
mutator=Example._mutate,
snapshot_strategy=self.snapshot_strategy
)
# Construct the snapshotting policy.
self.snapshotting_policy = ExampleSnapshottingPolicy(
example_repository=self.example_repository,
)
def create_new_example(self, foo):
return create_new_example(foo=foo)
def close(self):
super(SnapshottedApplication, self).close()
self.snapshotting_policy.close()
Run the code¶
The application object can be used in the same way as before. Now snapshots of an example entity will be taken every second event.
with SnapshottedApplication(datastore.session) as app:
# Create an entity.
entity = app.create_new_example(foo='bar1')
# Check there's no snapshot, only one event so far.
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot is None
# Change an attribute, generates a second event.
entity.foo = 'bar2'
# Check the snapshot.
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state['_foo'] == 'bar2'
# Check can recover entity using snapshot.
assert entity.id in app.example_repository
assert app.example_repository[entity.id].foo == 'bar2'
# Check snapshot after five events.
entity.foo = 'bar3'
entity.foo = 'bar4'
entity.foo = 'bar5'
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state['_foo'] == 'bar4'
# Check snapshot after seven events.
entity.foo = 'bar6'
entity.foo = 'bar7'
assert app.example_repository[entity.id].foo == 'bar7'
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state['_foo'] == 'bar6'
# Check snapshot state is None after discarding the entity on the eighth event.
entity.discard()
assert entity.id not in app.example_repository
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state is None
try:
app.example_repository[entity.id]
except KeyError:
pass
else:
raise Exception('KeyError was not raised')
# Get historical snapshots.
snapshot = app.snapshot_strategy.get_snapshot(entity.id, lte=2)
assert snapshot.state['_version'] == 2 # one behind
assert snapshot.state['_foo'] == 'bar2'
snapshot = app.snapshot_strategy.get_snapshot(entity.id, lte=3)
assert snapshot.state['_version'] == 4
assert snapshot.state['_foo'] == 'bar4'
# Get historical entities.
entity = app.example_repository.get_entity(entity.id, lte=0)
assert entity.version == 1
assert entity.foo == 'bar1', entity.foo
entity = app.example_repository.get_entity(entity.id, lte=1)
assert entity.version == 2
assert entity.foo == 'bar2', entity.foo
entity = app.example_repository.get_entity(entity.id, lte=2)
assert entity.version == 3
assert entity.foo == 'bar3', entity.foo
entity = app.example_repository.get_entity(entity.id, lte=3)
assert entity.version == 4
assert entity.foo == 'bar4', entity.foo