Source code for eventsourcing.application.snapshotting

from typing import Any, Optional

from eventsourcing.application.policies import SnapshottingPolicy
from eventsourcing.application.simple import SimpleApplication
from eventsourcing.domain.model.entity import TVersionedEntity, TVersionedEvent
from eventsourcing.infrastructure.base import (
    AbstractEventStore,
    AbstractRecordManager,
    AbstractSnapshop,
)
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.snapshotting import EventSourcedSnapshotStrategy


[docs]class SnapshottingApplication(SimpleApplication[TVersionedEntity, TVersionedEvent]): # Todo: Change this to default to None? snapshot_period = 2
[docs] def __init__(self, snapshot_period: Optional[int] = None, **kwargs: Any): self.snapshot_period = snapshot_period or self.snapshot_period self.snapshot_store: Optional[ AbstractEventStore[AbstractSnapshop, AbstractRecordManager] ] = None self.snapshot_strategy: Optional[EventSourcedSnapshotStrategy] = None self.snapshotting_policy: Optional[SnapshottingPolicy] = None super(SnapshottingApplication, self).__init__(**kwargs)
[docs] def construct_event_store(self) -> None: super(SnapshottingApplication, self).construct_event_store() # Setup event store for snapshots. assert self.infrastructure_factory record_manager = self.infrastructure_factory.construct_snapshot_record_manager() assert self.sequenced_item_mapper_class is not None assert self.sequenced_item_class is not None sequenced_item_mapper = self.sequenced_item_mapper_class( sequenced_item_class=self.sequenced_item_class ) self.snapshot_store = EventStore( record_manager=record_manager, event_mapper=sequenced_item_mapper )
[docs] def construct_repository(self, **kwargs: Any) -> None: # Setup repository with a snapshot strategy. assert self.snapshot_store self.snapshot_strategy = EventSourcedSnapshotStrategy( snapshot_store=self.snapshot_store ) super(SnapshottingApplication, self).construct_repository( snapshot_strategy=self.snapshot_strategy, **kwargs )
[docs] def construct_persistence_policy(self) -> None: super(SnapshottingApplication, self).construct_persistence_policy() assert self.snapshot_store self.snapshotting_policy = SnapshottingPolicy( repository=self.repository, snapshot_store=self.snapshot_store, persist_event_type=self.persist_event_type, period=self.snapshot_period, )
[docs] def setup_table(self) -> None: super(SnapshottingApplication, self).setup_table() if self._datastore is not None: assert self.snapshot_store self._datastore.setup_table(self.snapshot_store.record_manager.record_class)
[docs] def drop_table(self) -> None: super(SnapshottingApplication, self).drop_table() if self._datastore is not None: assert self.snapshot_store self._datastore.drop_table(self.snapshot_store.record_manager.record_class)
def close(self) -> None: super(SnapshottingApplication, self).close() if self.snapshotting_policy is not None: self.snapshotting_policy.close() self.snapshotting_policy = None