from typing import Any, Optional
from eventsourcing.application.policies import SnapshottingPolicy
from eventsourcing.application.simple import SimpleApplication
from eventsourcing.domain.model.entity import TVersionedEntity, TVersionedEvent
from eventsourcing.infrastructure.base import (
AbstractEventStore,
AbstractRecordManager,
AbstractSnapshop,
)
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.snapshotting import EventSourcedSnapshotStrategy
[docs]class SnapshottingApplication(SimpleApplication[TVersionedEntity, TVersionedEvent]):
# Todo: Change this to default to None?
snapshot_period = 2
[docs] def __init__(self, snapshot_period: Optional[int] = None, **kwargs: Any):
self.snapshot_period = snapshot_period or self.snapshot_period
self.snapshot_store: Optional[
AbstractEventStore[AbstractSnapshop, AbstractRecordManager]
] = None
self.snapshot_strategy: Optional[EventSourcedSnapshotStrategy] = None
self.snapshotting_policy: Optional[SnapshottingPolicy] = None
super(SnapshottingApplication, self).__init__(**kwargs)
[docs] def construct_event_store(self) -> None:
super(SnapshottingApplication, self).construct_event_store()
# Setup event store for snapshots.
assert self.infrastructure_factory
record_manager = self.infrastructure_factory.construct_snapshot_record_manager()
assert self.sequenced_item_mapper_class is not None
assert self.sequenced_item_class is not None
sequenced_item_mapper = self.sequenced_item_mapper_class(
sequenced_item_class=self.sequenced_item_class
)
self.snapshot_store = EventStore(
record_manager=record_manager, event_mapper=sequenced_item_mapper
)
[docs] def construct_repository(self, **kwargs: Any) -> None:
# Setup repository with a snapshot strategy.
assert self.snapshot_store
self.snapshot_strategy = EventSourcedSnapshotStrategy(
snapshot_store=self.snapshot_store
)
super(SnapshottingApplication, self).construct_repository(
snapshot_strategy=self.snapshot_strategy, **kwargs
)
[docs] def construct_persistence_policy(self) -> None:
super(SnapshottingApplication, self).construct_persistence_policy()
assert self.snapshot_store
self.snapshotting_policy = SnapshottingPolicy(
repository=self.repository,
snapshot_store=self.snapshot_store,
persist_event_type=self.persist_event_type,
period=self.snapshot_period,
)
[docs] def setup_table(self) -> None:
super(SnapshottingApplication, self).setup_table()
if self._datastore is not None:
assert self.snapshot_store
self._datastore.setup_table(self.snapshot_store.record_manager.record_class)
[docs] def drop_table(self) -> None:
super(SnapshottingApplication, self).drop_table()
if self._datastore is not None:
assert self.snapshot_store
self._datastore.drop_table(self.snapshot_store.record_manager.record_class)
def close(self) -> None:
super(SnapshottingApplication, self).close()
if self.snapshotting_policy is not None:
self.snapshotting_policy.close()
self.snapshotting_policy = None