Source code for eventsourcing.infrastructure.snapshotting

from abc import ABCMeta, abstractmethod
from copy import deepcopy

import six

from eventsourcing.domain.model.events import publish
from eventsourcing.domain.model.snapshot import AbstractSnapshop, Snapshot
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.sequenceditemmapper import reconstruct_object
from eventsourcing.utils.topic import get_topic, resolve_topic


[docs]class AbstractSnapshotStrategy(six.with_metaclass(ABCMeta)):
[docs] @abstractmethod def get_snapshot(self, entity_id, lt=None, lte=None): """ Gets the last snapshot for entity, optionally until a particular version number. :rtype: Snapshot """
[docs] @abstractmethod def take_snapshot(self, entity_id, entity, last_event_version): """ Takes a snapshot of entity, using given ID, state and version number. :rtype: AbstractSnapshop """
[docs]class EventSourcedSnapshotStrategy(AbstractSnapshotStrategy): """Snapshot strategy that uses an event sourced snapshot. """ def __init__(self, event_store): assert isinstance(event_store, EventStore) self.event_store = event_store
[docs] def get_snapshot(self, entity_id, lt=None, lte=None): """ Gets the last snapshot for entity, optionally until a particular version number. :rtype: Snapshot """ snapshots = self.event_store.get_domain_events(entity_id, lt=lt, lte=lte, limit=1, is_ascending=False) if len(snapshots) == 1: return snapshots[0]
[docs] def take_snapshot(self, entity_id, entity, last_event_version): """ Takes a snapshot by instantiating and publishing a Snapshot domain event. :rtype: Snapshot """ # Create the snapshot event. snapshot = Snapshot( originator_id=entity_id, originator_version=last_event_version, topic=get_topic(entity.__class__), state=None if entity is None else deepcopy(entity.__dict__) ) # Publish the snapshot event. publish(snapshot) # Return the snapshot event. return snapshot
[docs]def entity_from_snapshot(snapshot): """ Reconstructs domain entity from given snapshot. """ assert isinstance(snapshot, AbstractSnapshop), type(snapshot) if snapshot.state is not None: entity_class = resolve_topic(snapshot.topic) return reconstruct_object(entity_class, snapshot.state)