Source code for eventsourcing.infrastructure.eventstore

from typing import Iterable, NamedTuple, Optional
from uuid import UUID

from eventsourcing.exceptions import ConcurrencyError, RecordConflictError
from eventsourcing.infrastructure.base import AbstractEventStore, TRecordManager
from eventsourcing.infrastructure.iterators import SequencedItemIterator
from eventsourcing.whitehead import TEvent

# Todo: Unify iterators in EventStore and in NotificationLog,
#  by pushing behaviour down to record manager?


[docs]class EventStore(AbstractEventStore[TEvent, TRecordManager]): """ Event store appends domain events to stored sequences. It uses a record manager to map named tuples to database records, and it uses a sequenced item mapper to map named tuples to application-level objects. """ iterator_class = SequencedItemIterator
[docs] def store_events(self, events: Iterable[TEvent]) -> None: """ Appends given domain event, or list of domain events, to their sequence. :param events: domain event, or list of domain events """ # Convert to sequenced item. sequenced_items = self.items_from_events(events) # Append to the sequenced item(s) to the sequence. try: self.record_manager.record_items(sequenced_items) except RecordConflictError as e: raise ConcurrencyError(e)
[docs] def items_from_events(self, events: Iterable[TEvent]) -> Iterable[NamedTuple]: """ Maps domain event to sequenced item namedtuple. An iterable of events. """ # Convert the domain event(s) to sequenced item(s). return map(self.event_mapper.item_from_event, events)
[docs] def iter_events( self, originator_id: UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True, page_size: Optional[int] = None, ) -> Iterable[TEvent]: """ Gets domain events from the sequence identified by `originator_id`. :param originator_id: ID of a sequence of events :param gt: get items after this position :param gte: get items at or after this position :param lt: get items before this position :param lte: get items before or at this position :param limit: get limited number of items :param is_ascending: get items from lowest position :param page_size: restrict and repeat database query :return: list of domain events """ if self.record_manager.can_limit_get_records and page_size: sequenced_items: Iterable = self.iterator_class( record_manager=self.record_manager, sequence_id=originator_id, page_size=page_size, gt=gt, gte=gte, lt=lt, lte=lte, limit=limit, is_ascending=is_ascending, ) else: sequenced_items = self.record_manager.get_items( sequence_id=originator_id, gt=gt, gte=gte, lt=lt, lte=lte, limit=limit, query_ascending=is_ascending, results_ascending=is_ascending, ) # Deserialize to domain events. return map(self.event_mapper.event_from_item, sequenced_items)
[docs] def get_event(self, originator_id: UUID, position: int) -> TEvent: """ Gets a domain event from the sequence identified by `originator_id` at position `eq`. :param originator_id: ID of a sequence of events :param position: get item at this position :return: domain event """ sequenced_item = self.record_manager.get_item( sequence_id=originator_id, position=position ) return self.event_mapper.event_from_item(sequenced_item)
[docs] def get_most_recent_event( self, originator_id: UUID, lt: Optional[int] = None, lte: Optional[int] = None ) -> Optional[TEvent]: """ Gets a domain event from the sequence identified by `originator_id` at the highest position. :param originator_id: ID of a sequence of events :param lt: get highest before this position :param lte: get highest at or before this position :return: domain event """ events = self.list_events( originator_id=originator_id, lt=lt, lte=lte, limit=1, is_ascending=False ) len_events = len(events) if len_events == 1: return events[0] elif len_events == 0: return None else: raise AssertionError("Too many events: %s" % len_events)
[docs] def all_events(self) -> Iterable[TEvent]: """ Yields all domain events in the event store. This method iterates over the sequence IDs, and returns all the events for each sequence effectively concatenated together. Use a notification log to propagate events from an application as a stable append-only sequence. """ for originator_id in self.record_manager.all_sequence_ids(): for domain_event in self.iter_events( originator_id=originator_id, page_size=100 ): yield domain_event