Source code for eventsourcing.infrastructure.timebucketedlog_reader

from eventsourcing.domain.model.timebucketedlog import MessageLogged, Timebucketedlog, make_timebucket_id, \
    next_bucket_starts, previous_bucket_starts
from eventsourcing.infrastructure.eventstore import AbstractEventStore
from eventsourcing.utils.times import decimaltimestamp


[docs]def get_timebucketedlog_reader(log, event_store): """ :rtype: TimebucketedlogReader """ return TimebucketedlogReader(log=log, event_store=event_store)
[docs]class TimebucketedlogReader(object): def __init__(self, log, event_store, page_size=50): assert isinstance(log, Timebucketedlog) self.log = log assert isinstance(event_store, AbstractEventStore), event_store self.event_store = event_store assert isinstance(page_size, int) self.page_size = page_size self.position = None
[docs] def get_messages(self, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=False, page_size=None): events = self.get_events(gt=gt, gte=gte, lt=lt, lte=lte, limit=limit, is_ascending=is_ascending, page_size=page_size) for event in events: if isinstance(event, MessageLogged): self.position = event.timestamp yield event.message
[docs] def get_events(self, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=False, page_size=None): assert limit is None or limit > 0 # Identify the first time bucket. now = decimaltimestamp() started_on = self.log.started_on absolute_latest = min(now, lt or now, lte or now) absolute_earlyist = max(started_on, gt or 0, gte or 0) if is_ascending: position = absolute_earlyist else: position = absolute_latest # Start counting events. count_events = 0 while True: bucket_id = make_timebucket_id(self.log.name, position, self.log.bucket_size) for message_logged_event in self.event_store.get_domain_events( originator_id=bucket_id, gt=gt, gte=gte, lt=lt, lte=lte, limit=limit, is_ascending=is_ascending, page_size=page_size, ): yield message_logged_event if limit is not None: count_events += 1 if count_events >= limit: return # See if there's another bucket. if is_ascending: next_timestamp = next_bucket_starts(position, self.log.bucket_size) if next_timestamp > absolute_latest: return else: position = next_timestamp else: if position < absolute_earlyist: return else: position = previous_bucket_starts(position, self.log.bucket_size)