from eventsourcing.domain.model.timebucketedlog import (
MessageLogged,
Timebucketedlog,
make_timebucket_id,
next_bucket_starts,
previous_bucket_starts,
)
from eventsourcing.infrastructure.eventstore import AbstractEventStore
from eventsourcing.utils.times import decimaltimestamp
[docs]def get_timebucketedlog_reader(log, event_store):
"""
:rtype: TimebucketedlogReader
"""
return TimebucketedlogReader(log=log, event_store=event_store)
class TimebucketedlogReader(object):
def __init__(self, log, event_store, page_size=50):
assert isinstance(log, Timebucketedlog)
self.log = log
assert isinstance(event_store, AbstractEventStore), event_store
self.event_store = event_store
assert isinstance(page_size, int)
self.page_size = page_size
self.position = None
def get_messages(
self,
gt=None,
gte=None,
lt=None,
lte=None,
limit=None,
is_ascending=False,
page_size=None,
):
events = self.get_events(
gt=gt,
gte=gte,
lt=lt,
lte=lte,
limit=limit,
is_ascending=is_ascending,
page_size=page_size,
)
for event in events:
if isinstance(event, MessageLogged):
self.position = event.timestamp
yield event.message
def get_events(
self,
gt=None,
gte=None,
lt=None,
lte=None,
limit=None,
is_ascending=False,
page_size=None,
):
assert limit is None or limit > 0
# Identify the first time bucket.
now = decimaltimestamp()
started_on = self.log.started_on
absolute_latest = min(now, lt or now, lte or now)
absolute_earlyist = max(started_on, gt or 0, gte or 0)
if is_ascending:
position = absolute_earlyist
else:
position = absolute_latest
# Start counting events.
count_events = 0
while True:
bucket_id = make_timebucket_id(
self.log.name, position, self.log.bucket_size
)
for message_logged_event in self.event_store.get_domain_events(
originator_id=bucket_id,
gt=gt,
gte=gte,
lt=lt,
lte=lte,
limit=limit,
is_ascending=is_ascending,
page_size=page_size,
):
yield message_logged_event
if limit is not None:
count_events += 1
if count_events >= limit:
return
# See if there's another bucket.
if is_ascending:
next_timestamp = next_bucket_starts(position, self.log.bucket_size)
if next_timestamp > absolute_latest:
return
else:
position = next_timestamp
else:
if position < absolute_earlyist:
return
else:
position = previous_bucket_starts(position, self.log.bucket_size)