# coding=utf-8
from abc import ABC, abstractmethod
from eventsourcing.exceptions import ConcurrencyError, RecordConflictError
from eventsourcing.infrastructure.base import AbstractSequencedItemRecordManager
from eventsourcing.infrastructure.iterators import SequencedItemIterator
from eventsourcing.infrastructure.sequenceditemmapper import AbstractSequencedItemMapper
[docs]class AbstractEventStore(ABC):
"""
Abstract base class for event stores. Defines the methods
expected of an event store by other classes in the library.
"""
[docs] @abstractmethod
def store(self, domain_event_or_events):
"""
Put domain event in event store for later retrieval.
"""
[docs] @abstractmethod
def get_domain_events(self, originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True,
page_size=None):
"""
Returns domain events for given entity ID.
"""
[docs] @abstractmethod
def get_domain_event(self, originator_id, position):
"""
Returns a single domain event.
"""
[docs] @abstractmethod
def get_most_recent_event(self, originator_id, lt=None, lte=None):
"""
Returns most recent domain event for given entity ID.
"""
[docs] @abstractmethod
def all_domain_events(self):
"""
Returns all domain events in the event store.
"""
# Todo: Unify iterators in EventStore and in NotificationLog, by pushing behaviour down to record manager?
[docs]class EventStore(AbstractEventStore):
"""
Event store appends domain events to stored sequences. It uses
a record manager to map named tuples to database
records, and it uses a sequenced item mapper to map named
tuples to application-level objects.
"""
iterator_class = SequencedItemIterator
[docs] def __init__(self, record_manager, sequenced_item_mapper):
"""
Initialises event store object.
:param record_manager: record manager
:param sequenced_item_mapper: sequenced item mapper
"""
assert isinstance(record_manager, AbstractSequencedItemRecordManager), record_manager
assert isinstance(sequenced_item_mapper, AbstractSequencedItemMapper), sequenced_item_mapper
self.record_manager = record_manager
self.mapper = sequenced_item_mapper
[docs] def store(self, domain_event_or_events):
"""
Appends given domain event, or list of domain events, to their sequence.
:param domain_event_or_events: domain event, or list of domain events
"""
# Convert to sequenced item.
sequenced_item_or_items = self.item_from_event(domain_event_or_events)
# Append to the sequenced item(s) to the sequence.
try:
self.record_manager.record_sequenced_items(sequenced_item_or_items)
except RecordConflictError as e:
raise ConcurrencyError(e)
[docs] def item_from_event(self, domain_event_or_events):
"""
Maps domain event to sequenced item namedtuple.
:param domain_event_or_events: application-level object (or list)
:return: namedtuple: sequence item namedtuple (or list)
"""
# Convert the domain event(s) to sequenced item(s).
if isinstance(domain_event_or_events, (list, tuple)):
return [self.item_from_event(e) for e in domain_event_or_events]
else:
return self.mapper.item_from_event(domain_event_or_events)
[docs] def get_domain_events(self, originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True,
page_size=None):
"""
Gets domain events from the sequence identified by `originator_id`.
:param originator_id: ID of a sequence of events
:param gt: get items after this position
:param gte: get items at or after this position
:param lt: get items before this position
:param lte: get items before or at this position
:param limit: get limited number of items
:param is_ascending: get items from lowest position
:param page_size: restrict and repeat database query
:return: list of domain events
"""
if page_size:
sequenced_items = self.iterator_class(
record_manager=self.record_manager,
sequence_id=originator_id,
page_size=page_size,
gt=gt,
gte=gte,
lt=lt,
lte=lte,
limit=limit,
is_ascending=is_ascending,
)
else:
sequenced_items = self.record_manager.get_items(
sequence_id=originator_id,
gt=gt,
gte=gte,
lt=lt,
lte=lte,
limit=limit,
query_ascending=is_ascending,
results_ascending=is_ascending,
)
# Deserialize to domain events.
domain_events = map(self.mapper.event_from_item, sequenced_items)
return list(domain_events)
[docs] def get_domain_event(self, originator_id, position):
"""
Gets a domain event from the sequence identified by `originator_id`
at position `eq`.
:param originator_id: ID of a sequence of events
:param position: get item at this position
:return: domain event
"""
sequenced_item = self.record_manager.get_item(
sequence_id=originator_id,
position=position,
)
return self.mapper.event_from_item(sequenced_item)
[docs] def get_most_recent_event(self, originator_id, lt=None, lte=None):
"""
Gets a domain event from the sequence identified by `originator_id`
at the highest position.
:param originator_id: ID of a sequence of events
:param lt: get highest before this position
:param lte: get highest at or before this position
:return: domain event
"""
events = self.get_domain_events(originator_id=originator_id, lt=lt, lte=lte, limit=1, is_ascending=False)
events = list(events)
try:
return events[0]
except IndexError:
pass
[docs] def all_domain_events(self):
"""
Yields all domain events in the event store.
"""
for originator_id in self.record_manager.all_sequence_ids():
for domain_event in self.get_domain_events(originator_id=originator_id, page_size=100):
yield domain_event