from eventsourcing.domain.model.entity import AbstractEntityRepository, mutate_entity
from eventsourcing.exceptions import RepositoryKeyError
from eventsourcing.infrastructure.eventplayer import EventPlayer
from eventsourcing.infrastructure.eventstore import AbstractEventStore
from eventsourcing.infrastructure.snapshotting import entity_from_snapshot
[docs]class EventSourcedRepository(AbstractEntityRepository):
# If the entity won't have very many events, marking the entity as
# "short" by setting __is_short__ value equal to True will mean
# the fastest path for getting all the events is used. If you set
# a value for page size (see below), this option will have no effect.
__is_short__ = False
# The page size by which events are retrieved. If this
# value is set to a positive integer, the events of
# the entity will be retrieved in pages, using a series
# of queries, rather than with one potentially large query.
__page_size__ = None
# The mutator function used by this repository. Can either
# be set as a class attribute, or passed as a constructor arg.
mutator = mutate_entity
def __init__(self, event_store, mutator=None, snapshot_strategy=None, use_cache=False, *args, **kwargs):
super(EventSourcedRepository, self).__init__(*args, **kwargs)
self._cache = {}
self._snapshot_strategy = snapshot_strategy
# self._use_cache = use_cache
# Check we got an event store.
assert isinstance(event_store, AbstractEventStore), type(event_store)
self._event_store = event_store
# Instantiate an event player for this repo.
mutator = mutator or type(self).mutator
self.event_player = EventPlayer(
event_store=self.event_store,
mutator=mutator,
page_size=self.__page_size__,
is_short=self.__is_short__,
snapshot_strategy=self._snapshot_strategy,
)
@property
def event_store(self):
return self._event_store
def __contains__(self, entity_id):
"""
Returns a boolean value according to whether entity with given ID exists.
"""
return self.get_entity(entity_id) is not None
def __getitem__(self, entity_id):
"""
Returns entity with given ID.
"""
# # Get entity from the cache.
# if self._use_cache:
# try:
# return self._cache[entity_id]
# except KeyError:
# pass
# Reconstitute the entity.
entity = self.get_entity(entity_id)
# Never created or already discarded?
if entity is None:
raise RepositoryKeyError(entity_id)
# # Put entity in the cache.
# if self._use_cache:
# self.add_cache(entity_id, entity)
# Return entity.
return entity
# def add_cache(self, entity_id, entity):
# self._cache[entity_id] = entity
[docs] def get_entity(self, entity_id, lt=None, lte=None):
"""
Returns entity with given ID, optionally until position.
"""
# Get a snapshot (None if none exist).
if self._snapshot_strategy is not None:
snapshot = self._snapshot_strategy.get_snapshot(entity_id, lt=lt, lte=lte)
else:
snapshot = None
# Decide the initial state of the entity, and the
# version of the last item applied to the entity.
if snapshot is None:
initial_state = None
gt = None
else:
initial_state = entity_from_snapshot(snapshot)
gt = snapshot.originator_version
# Replay domain events.
return self.event_player.replay_entity(entity_id, gt=gt, lt=lt, lte=lte, initial_state=initial_state)
[docs] def take_snapshot(self, entity_id, lt=None, lte=None):
return self.event_player.take_snapshot(entity_id, lt=lt, lte=lte)