Source code for eventsourcing.application.snapshotting

from typing import Any, Optional

from eventsourcing.application.policies import SnapshottingPolicy
from eventsourcing.application.simple import SimpleApplication
from eventsourcing.domain.model.entity import TVersionedEntity, TVersionedEvent
from eventsourcing.domain.model.events import AbstractSnapshot
from eventsourcing.infrastructure.base import AbstractEventStore, AbstractRecordManager
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.snapshotting import EventSourcedSnapshotStrategy


[docs]class SnapshottingApplication(SimpleApplication[TVersionedEntity, TVersionedEvent]): snapshot_period = 0
[docs] def __init__(self, snapshot_period: int = 0, **kwargs: Any): self.snapshot_period = snapshot_period or self.snapshot_period self.snapshot_store: Optional[ AbstractEventStore[AbstractSnapshot, AbstractRecordManager] ] = None self.snapshot_strategy: Optional[EventSourcedSnapshotStrategy] = None self.snapshotting_policy: Optional[SnapshottingPolicy] = None super(SnapshottingApplication, self).__init__(**kwargs)
[docs] def construct_event_store(self) -> None: super(SnapshottingApplication, self).construct_event_store() # Setup event store for snapshots. assert self.infrastructure_factory record_manager = self.infrastructure_factory.construct_snapshot_record_manager() assert self.sequenced_item_mapper_class is not None assert self.sequenced_item_class is not None self.snapshot_store = EventStore( record_manager=record_manager, event_mapper=self.event_store.event_mapper )
[docs] def construct_repository(self, **kwargs: Any) -> None: # Setup repository with a snapshot strategy. assert self.snapshot_store self.snapshot_strategy = EventSourcedSnapshotStrategy( snapshot_store=self.snapshot_store ) super(SnapshottingApplication, self).construct_repository( snapshot_strategy=self.snapshot_strategy, **kwargs )
[docs] def construct_persistence_policy(self) -> None: super(SnapshottingApplication, self).construct_persistence_policy() assert self.snapshot_store self.snapshotting_policy = SnapshottingPolicy( repository=self.repository, snapshot_store=self.snapshot_store, persist_event_type=self.persist_event_type, period=self.snapshot_period, )
[docs] def setup_table(self) -> None: super(SnapshottingApplication, self).setup_table() if self._datastore is not None: assert self.snapshot_store self._datastore.setup_table(self.snapshot_store.record_manager.record_class)
[docs] def drop_table(self) -> None: super(SnapshottingApplication, self).drop_table() if self._datastore is not None: assert self.snapshot_store self._datastore.drop_table(self.snapshot_store.record_manager.record_class)
[docs] def close(self) -> None: super(SnapshottingApplication, self).close() if self.snapshotting_policy is not None: self.snapshotting_policy.close() self.snapshotting_policy = None