from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Optional
from uuid import UUID
from eventsourcing.domain.model.entity import TVersionedEntity
from eventsourcing.domain.model.snapshot import Snapshot
from eventsourcing.infrastructure.base import (
AbstractEventStore,
AbstractRecordManager,
AbstractSnapshop,
)
from eventsourcing.infrastructure.sequenceditemmapper import reconstruct_object
from eventsourcing.utils.topic import get_topic, resolve_topic
[docs]class AbstractSnapshotStrategy(ABC):
[docs] @abstractmethod
def get_snapshot(
self, entity_id: UUID, lt: Optional[int] = None, lte: Optional[int] = None
) -> Optional[AbstractSnapshop]:
"""
Gets the last snapshot for entity, optionally until a particular version number.
:rtype: Snapshot
"""
[docs] @abstractmethod
def take_snapshot(
self, entity_id: UUID, entity: object, last_event_version: int
) -> Snapshot:
"""
Takes a snapshot of entity, using given ID, state and version number.
"""
[docs]class EventSourcedSnapshotStrategy(AbstractSnapshotStrategy):
"""Snapshot strategy that uses an event sourced snapshot.
"""
[docs] def __init__(
self,
snapshot_store: AbstractEventStore[AbstractSnapshop, AbstractRecordManager],
):
self.snapshot_store = snapshot_store
[docs] def get_snapshot(
self, entity_id: UUID, lt: Optional[int] = None, lte: Optional[int] = None
) -> Optional[AbstractSnapshop]:
"""
Gets the last snapshot for entity, optionally until a particular version number.
:rtype: Snapshot
"""
snapshots = self.snapshot_store.list_events(
entity_id, lt=lt, lte=lte, limit=1, is_ascending=False
)
snapshot = None
if len(snapshots) == 1:
snapshot = snapshots[0]
return snapshot
# Todo: Rename as create_snapshot?
[docs] def take_snapshot(
self, entity_id: UUID, entity: object, last_event_version: int
) -> Snapshot:
"""
Creates a Snapshot from the given state, and appends it
to the snapshot store.
:rtype: Snapshot
"""
# Create the snapshot.
snapshot = Snapshot(
originator_id=entity_id,
originator_version=last_event_version,
topic=get_topic(entity.__class__),
state=None if entity is None else deepcopy(entity.__dict__),
)
self.snapshot_store.store_events([snapshot])
# Return the snapshot.
return snapshot
[docs]def entity_from_snapshot(snapshot: AbstractSnapshop) -> Optional[TVersionedEntity]:
"""
Reconstructs domain entity from given snapshot.
"""
assert isinstance(snapshot, AbstractSnapshop), type(snapshot)
if snapshot.state is not None:
entity_class = resolve_topic(snapshot.topic)
return reconstruct_object(entity_class, snapshot.state)