Source code for eventsourcing.infrastructure.iterators

from abc import abstractmethod
from threading import Thread
from typing import Any, Iterable, Iterator, List, NamedTuple, Optional
from uuid import UUID

from eventsourcing.infrastructure.base import AbstractRecordManager, BaseRecordManager


[docs]class AbstractSequencedItemIterator(Iterable): DEFAULT_PAGE_SIZE = 1000
[docs] def __init__( self, record_manager: AbstractRecordManager, sequence_id: UUID, page_size: Optional[int] = None, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, limit: Optional[int] = None, is_ascending: bool = True, ): """ Initialises sequenced item iterator. :param record_manager: The record manager used to get sequenced items. :param sequence_id: The id of the sequence being iterated over. :param page_size: The number of items requested from the record manager. :param gt: Exclusive lower bound on position of items returned. :param gte: Inclusive lower bound on position of items returned. :param lt: Exclusive upper bound on position of items returned. :param lte: Inclusive upper bound on position of items returned. :param limit: Limit to the number of items returned. :param is_ascending: Whether or not to iterate in ascending order. """ assert isinstance(record_manager, BaseRecordManager), type(record_manager) assert isinstance(page_size, (int, type(None))) assert isinstance(limit, (int, type(None))) self.record_manager = record_manager self.sequence_id = sequence_id self.page_size = page_size or self.DEFAULT_PAGE_SIZE self.gt = gt self.gte = gte self.lte = lte self.lt = lt self.limit = limit self.query_counter = 0 self.page_counter = 0 self.all_item_counter = 0 self.is_ascending = is_ascending self._position = None
[docs] def _inc_page_counter(self) -> None: """ Increments the page counter. Each query result as a page, even if there are no items in the page. This really counts queries. - it is easy to divide the number of events by the page size if the "correct" answer is required - there will be a difference in the counts when the number of events can be exactly divided by the page size, because there is no way to know in advance that a full page is also the last page. """ self.page_counter += 1
[docs] def _inc_query_counter(self) -> None: """ Increments the query counter. """ self.query_counter += 1
def _inc_all_event_counter(self) -> None: self.all_item_counter += 1 def _update_position(self, sequenced_item: NamedTuple) -> None: assert isinstance( sequenced_item, self.record_manager.sequenced_item_class ), type(sequenced_item) self._position = getattr( sequenced_item, self.record_manager.field_names.position )
[docs] @abstractmethod def __iter__(self) -> Iterator[NamedTuple]: """ Yields a continuous sequence of items. """
[docs]class SequencedItemIterator(AbstractSequencedItemIterator):
[docs] def __iter__(self) -> Iterator[NamedTuple]: """ Yields a continuous sequence of items from "pages" of sequenced items retrieved using the record manager. """ gt = self.gt gte = self.gte lt = self.lt lte = self.lte while True: # Get next page of events. if self.limit is not None: limit = min(self.page_size, self.limit - self.all_item_counter) else: limit = self.page_size if limit == 0: return # Get the events. if self._position is not None: if self.is_ascending: gt = self._position gte = None else: lt = self._position lte = None sequenced_items = self.record_manager.get_items( sequence_id=self.sequence_id, gt=gt, gte=gte, lt=lt, lte=lte, limit=limit, query_ascending=self.is_ascending, results_ascending=self.is_ascending, ) self._inc_query_counter() # Start counting events in this page. page_item_counter = 0 # Yield each stored event. for sequenced_item in sequenced_items: # Count each event. self._inc_all_event_counter() page_item_counter += 1 # Yield the event. yield sequenced_item # Remember the position as the last event. self._update_position(sequenced_item) # If that wasn't an empty page, count the page. if page_item_counter: self._inc_page_counter() # If that wasn't a full page, stop iterating (there can be no more items). if page_item_counter != self.page_size: return
[docs]class ThreadedSequencedItemIterator(AbstractSequencedItemIterator):
[docs] def __iter__(self) -> Iterator[NamedTuple]: # Start a thread to get a page of events. thread = self.start_thread() # Get pages of stored events, until the page isn't full. while True: # Wait for the next page of events. thread.join(timeout=30) # Count the query. self._inc_query_counter() # Get the stored events from the thread. stored_events = thread.stored_events # Count the number of stored events that were retrieved. num_stored_events = len(stored_events) if num_stored_events: self._inc_page_counter() # Decide if this is the last page. is_last_page = (num_stored_events != self.page_size) or ( self.all_item_counter + num_stored_events == self.limit ) if not is_last_page: # Update loop variables. position = stored_events[-1] self._update_position(position) # Start the next thread. thread = self.start_thread() # Yield each stored event. for stored_event in stored_events: # Stop if we're over the limit. if self.limit and self.all_item_counter >= self.limit: return # Count each event. self._inc_all_event_counter() # Yield the event. yield stored_event # If that was the last page, then stop iterating. if is_last_page: return
def start_thread(self) -> "GetEntityEventsThread": gt = self.gt gte = self.gte lt = self.lt lte = self.lte if self._position is not None: if self.is_ascending: gt = self._position gte = None else: lt = self._position lte = None thread = GetEntityEventsThread( record_manager=self.record_manager, sequence_id=self.sequence_id, gt=gt, gte=gte, lt=lt, lte=lte, page_size=self.page_size, is_ascending=self.is_ascending, ) thread.start() return thread
[docs]class GetEntityEventsThread(Thread):
[docs] def __init__( self, record_manager: AbstractRecordManager, sequence_id: UUID, gt: Optional[int] = None, gte: Optional[int] = None, lt: Optional[int] = None, lte: Optional[int] = None, page_size: Optional[int] = None, is_ascending: bool = True, *args: Any, **kwargs: Any ): super(GetEntityEventsThread, self).__init__(*args, **kwargs) assert isinstance(record_manager, BaseRecordManager), type(record_manager) self.record_manager = record_manager self.stored_entity_id = sequence_id self.gt = gt self.gte = gte self.lt = lt self.lte = lte self.page_size = page_size self.is_ascending = is_ascending self.stored_events: List[NamedTuple] = []
[docs] def run(self) -> None: self.stored_events = list( self.record_manager.get_items( sequence_id=self.stored_entity_id, gt=self.gt, gte=self.gte, lt=self.lt, lte=self.lte, limit=self.page_size, query_ascending=self.is_ascending, results_ascending=self.is_ascending, ) )