Source code for eventsourcing.infrastructure.eventplayer

from functools import reduce

from eventsourcing.domain.model.entity import AbstractEventPlayer
from eventsourcing.infrastructure.eventstore import AbstractEventStore


[docs]class EventPlayer(AbstractEventPlayer): # The page size by which events are retrieved. If this # value is set to a positive integer, the events of # the entity will be retrieved in pages, using a series # of queries, rather than with one potentially large query. __page_size__ = None def __init__(self, event_store, snapshot_strategy=None, use_cache=False, mutator_func=None): super(EventPlayer, self).__init__() # Check we got an event store. assert isinstance(event_store, AbstractEventStore), type(event_store) self._event_store = event_store self._snapshot_strategy = snapshot_strategy self._cache = {} # NB If you use the cache, make sure to del entities # when records fail to write otherwise the cache will # give an entity that is ahead of the event records, # and writing more records will give a broken sequence. self._use_cache = use_cache self._mutator_func = mutator_func @property def event_store(self): return self._event_store
[docs] def project_events(self, initial_state, domain_events): """ Evolves initial state using the sequence of domain events and a mutator function. """ return reduce(self._mutator_func or self.mutate, domain_events, initial_state)
[docs] @staticmethod def mutate(initial, event): if initial is not None: event.__check_obj__(initial) return event.__mutate__(initial)
# def clone_object(initial_state): # initial_state_copy = object.__new__(type(initial_state)) # initial_state_copy.__dict__.update(deepcopy(initial_state.__dict__)) # return initial_state_copy