# coding=utf-8
from abc import ABCMeta, abstractmethod
import six
from eventsourcing.exceptions import ConcurrencyError, SequencedItemConflict
from eventsourcing.infrastructure.activerecord import AbstractActiveRecordStrategy
from eventsourcing.infrastructure.iterators import SequencedItemIterator
from eventsourcing.infrastructure.sequenceditemmapper import AbstractSequencedItemMapper
[docs]class AbstractEventStore(six.with_metaclass(ABCMeta)):
"""
Abstract base class for event stores. Defines the methods
expected of an event store by other classes in the library.
"""
[docs] @abstractmethod
def append(self, domain_event_or_events):
"""
Put domain event in event store for later retrieval.
"""
[docs] @abstractmethod
def get_domain_events(self, originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True,
page_size=None):
"""
Returns domain events for given entity ID.
"""
[docs] @abstractmethod
def get_domain_event(self, originator_id, eq):
"""
Returns a single domain event.
"""
[docs] @abstractmethod
def get_most_recent_event(self, originator_id, lt=None, lte=None):
"""
Returns most recent domain event for given entity ID.
"""
[docs] @abstractmethod
def all_domain_events(self):
"""
Returns all domain events in the event store.
"""
[docs]class EventStore(AbstractEventStore):
"""
Event store appends domain events to stored sequences. It uses
an active record strategy to map named tuples to database
records, and it uses a sequenced item mapper to map named
tuples to application-level objects.
"""
iterator_class = SequencedItemIterator
[docs] def __init__(self, active_record_strategy, sequenced_item_mapper):
"""
Initialises event store object.
:param active_record_strategy: active record strategy
:param sequenced_item_mapper: sequenced item mapper
"""
assert isinstance(active_record_strategy, AbstractActiveRecordStrategy), active_record_strategy
assert isinstance(sequenced_item_mapper, AbstractSequencedItemMapper), sequenced_item_mapper
self.active_record_strategy = active_record_strategy
self.sequenced_item_mapper = sequenced_item_mapper
[docs] def append(self, domain_event_or_events):
"""
Appends given domain event, or list of domain events, to their sequence.
:param domain_event_or_events: domain event, or list of domain events
"""
# Convert the domain event(s) to sequenced item(s).
if isinstance(domain_event_or_events, (list, tuple)):
sequenced_item_or_items = [self.to_sequenced_item(e) for e in domain_event_or_events]
else:
sequenced_item_or_items = self.to_sequenced_item(domain_event_or_events)
# Append to the sequenced item(s) to the sequence.
try:
self.active_record_strategy.append(sequenced_item_or_items)
except SequencedItemConflict as e:
raise ConcurrencyError(e)
[docs] def to_sequenced_item(self, domain_event):
"""
Maps domain event to sequenced item namedtuple.
:param domain_event: application-level object
:return: namedtuple: sequence item namedtuple
"""
return self.sequenced_item_mapper.to_sequenced_item(domain_event)
[docs] def get_domain_events(self, originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True,
page_size=None):
"""
Gets domain events from the sequence identified by `originator_id`.
:param originator_id: ID of a sequence of events
:param gt: get items after this position
:param gte: get items at or after this position
:param lt: get items before this position
:param lte: get items before or at this position
:param limit: get limited number of items
:param is_ascending: get items from lowest position
:param page_size: restrict and repeat database query
:return: list of domain events
"""
if page_size:
sequenced_items = self.iterator_class(
active_record_strategy=self.active_record_strategy,
sequence_id=originator_id,
page_size=page_size,
gt=gt,
gte=gte,
lt=lt,
lte=lte,
limit=limit,
is_ascending=is_ascending,
)
else:
sequenced_items = self.active_record_strategy.get_items(
sequence_id=originator_id,
gt=gt,
gte=gte,
lt=lt,
lte=lte,
limit=limit,
query_ascending=is_ascending,
results_ascending=is_ascending,
)
# Deserialize to domain events.
domain_events = six.moves.map(self.sequenced_item_mapper.from_sequenced_item, sequenced_items)
domain_events = list(domain_events)
return domain_events
[docs] def get_domain_event(self, originator_id, eq):
"""
Gets a domain event from the sequence identified by `originator_id`
at position `eq`.
:param originator_id: ID of a sequence of events
:param eq: get item at this position
:return: domain event
"""
sequenced_item = self.active_record_strategy.get_item(
sequence_id=originator_id,
eq=eq,
)
domain_event = self.sequenced_item_mapper.from_sequenced_item(sequenced_item)
return domain_event
[docs] def get_most_recent_event(self, originator_id, lt=None, lte=None):
"""
Gets a domain event from the sequence identified by `originator_id`
at the highest position.
:param originator_id: ID of a sequence of events
:param lt: get highest before this position
:param lte: get highest at or before this position
:return: domain event
"""
events = self.get_domain_events(originator_id=originator_id, lt=lt, lte=lte, limit=1, is_ascending=False)
events = list(events)
try:
return events[0]
except IndexError:
pass
[docs] def all_domain_events(self):
"""
Gets all domain events in the event store.
:return: map object, yielding a sequence of domain events
"""
all_items = self.active_record_strategy.all_items()
return map(self.sequenced_item_mapper.from_sequenced_item, all_items)