from __future__ import absolute_import, division, print_function, unicode_literals
import json
from abc import ABC, abstractmethod
import requests
from eventsourcing.domain.model.array import BigArray
from eventsourcing.infrastructure.base import ACIDRecordManager
from eventsourcing.utils.transcoding import ObjectJSONDecoder, ObjectJSONEncoder, json_dumps
DEFAULT_SECTION_SIZE = 20
[docs]class Section(object):
"""
Section of a notification log.
Contains items, and has an ID.
May also have either IDs of previous and next sections of the notification log.
"""
def __init__(self, section_id, items, previous_id=None, next_id=None):
self.section_id = section_id
self.items = items
self.previous_id = previous_id
self.next_id = next_id
[docs]class AbstractNotificationLog(ABC):
"""
Presents a sequence of sections from a sequence of notifications.
"""
[docs] @abstractmethod
def __getitem__(self, section_id):
"""
Get section of notification log.
:rtype: Section
"""
[docs]class LocalNotificationLog(AbstractNotificationLog):
"""
Presents a sequence of sections from a sequence of notifications.
"""
def __init__(self, section_size=None):
self.section_size = section_size or DEFAULT_SECTION_SIZE
# self.last_start = None
def __getitem__(self, section_id):
# Get section of notification log.
next_position = None
if section_id == 'current':
# Get start of the last section.
next_position = self.get_next_position()
start = next_position // self.section_size * self.section_size
else:
try:
first_item_number, last_item_number = section_id.split(',')
except ValueError as e:
raise ValueError("Couldn't split '{}': {}".format(section_id, e))
# Convert from 1-based to 0-based.
start = int(first_item_number) - 1
# Go back to nearest section start.
start = start // self.section_size * self.section_size
# Get stop index (section doesn't include this one).
stop = start + self.section_size
# Get the items.
items = list(self.get_items(start, stop, next_position))
# Get previous and next section IDs.
if start:
first_item_number = start + 1 - self.section_size
last_item_number = first_item_number - 1 + self.section_size
previous_id = self.format_section_id(first_item_number, last_item_number)
else:
previous_id = None
if len(items) == self.section_size:
first_item_number = start + 1 + self.section_size
last_item_number = first_item_number - 1 + self.section_size
next_id = self.format_section_id(first_item_number, last_item_number)
else:
next_id = None
# Return a section of the notification log.
section_id = self.format_section_id(start + 1, start + self.section_size)
return Section(
section_id=section_id,
items=items,
previous_id=previous_id,
next_id=next_id,
)
[docs] @abstractmethod
def get_next_position(self):
"""
Returns items for section.
:rtype: int
"""
[docs] @abstractmethod
def get_items(self, start, stop, next_position=None):
"""
Returns items for section.
:rtype: list
"""
[docs]class RecordManagerNotificationLog(LocalNotificationLog):
def __init__(self, record_manager, section_size):
super(RecordManagerNotificationLog, self).__init__(section_size)
assert isinstance(record_manager, ACIDRecordManager), record_manager
assert record_manager.contiguous_record_ids
self.record_manager = record_manager
[docs] def get_items(self, start, stop, next_position=None):
notifications = []
for record in self.record_manager.get_notifications(start, stop):
notification = {
'id': getattr(record, self.record_manager.notification_id_name)
}
for field_name in self.record_manager.field_names:
notification[field_name] = getattr(record, field_name)
if hasattr(record, 'causal_dependencies'):
notification['causal_dependencies'] = record.causal_dependencies
notifications.append(notification)
return notifications
[docs] def get_next_position(self):
"""Next unoccupied position in zero-based sequence.
Since the notification IDs are one-based, the next position is
the current max notification ID. If there are no records,
the max notification ID will be None, and the next position is zero.
"""
return self.record_manager.get_max_record_id() or 0
[docs]class BigArrayNotificationLog(LocalNotificationLog):
def __init__(self, big_array, section_size):
super(BigArrayNotificationLog, self).__init__(section_size)
assert isinstance(big_array, BigArray)
if big_array.repo.array_size % section_size:
raise ValueError("Section size {} doesn't divide array size {}".format(
section_size, big_array.repo.array_size
))
self.big_array = big_array
[docs] def get_items(self, start, stop, next_position=None):
next_position = self.get_next_position() if next_position is None else next_position
stop = min(stop, next_position)
return self.big_array[start:stop]
[docs] def get_next_position(self):
return self.big_array.get_next_position()
[docs]class RemoteNotificationLog(AbstractNotificationLog):
def __init__(self, base_url, json_decoder_class=None):
self.base_url = base_url
self.json_decoder_class = json_decoder_class
def __getitem__(self, section_id):
section_json = self.get_json(section_id)
return self.deserialize_section(section_json)
[docs] def deserialize_section(self, section_json):
try:
decoder_class = self.json_decoder_class or ObjectJSONDecoder
section = Section(**json.loads(section_json, cls=decoder_class))
except ValueError as e:
raise ValueError("Couldn't deserialize notification log section: "
"{}: {}".format(e, section_json))
return section
[docs] def get_json(self, section_id):
notification_log_url = self.make_notification_log_url(section_id)
return self.get_resource(notification_log_url)
[docs] def get_resource(self, doc_url):
representation = requests.get(doc_url).content
if isinstance(representation, type(b'')):
representation = representation.decode('utf8')
return representation
[docs] def make_notification_log_url(self, section_id):
return '{}/{}/'.format(self.base_url.strip('/'), section_id)
[docs]class NotificationLogReader(ABC):
def __init__(self, notification_log, use_direct_query_if_available=False):
assert isinstance(notification_log, AbstractNotificationLog)
self.notification_log = notification_log
self.section_count = 0
self.position = 0
self.use_direct_query_if_available = use_direct_query_if_available
def __getitem__(self, item=None):
if isinstance(item, slice):
assert item.start >= 0, item.start
self.seek(item.start)
return self.read_items(item.stop)
else:
assert isinstance(item, int), type(item)
assert item >= 0, item
self.seek(item)
return self.read_list(advance_by=1)[0]
def __iter__(self):
return self.read_items()
def __next__(self):
try:
return self.read_list(advance_by=1)[0]
except IndexError:
raise StopIteration
[docs] def seek(self, position):
if position < 0:
raise ValueError("Position less than zero: {}".format(position))
self.position = position
[docs] def read(self, advance_by=None):
return self.read_items(advance_by=advance_by)
[docs] def read_items(self, stop_index=None, advance_by=None):
self.section_count = 0
start_item_num = self.position + 1
# Validate the position.
if self.position < 0:
raise ValueError("Position less than zero: {}".format(self.position))
if self.use_direct_query_if_available and isinstance(
self.notification_log, RecordManagerNotificationLog):
if advance_by is not None:
stop_item_num = start_item_num + advance_by
else:
stop_item_num = None
# Directly query for notifications.
for item in self.notification_log.get_items(start_item_num - 1, stop_item_num):
yield item
self.position += 1
else:
# Otherwise, use sections (Vaughn Vernon's linked section design).
section = self.notification_log['current']
# Follow previous links.
while section.previous_id:
# Break if we can go forward from here.
if start_item_num is not None:
if int(section.section_id.split(',')[0]) <= start_item_num:
break
# Get the previous document.
section_id = section.previous_id
section = self.notification_log[section_id]
# Yield items in first section, optionally after last item number.
self.section_count += 1
items = section.items
if start_item_num is not None:
section_start_num = int(section.section_id.split(',')[0])
from_index = start_item_num - section_start_num
items = items[from_index:]
if advance_by is None:
advance_by = -1
# Yield all items in all subsequent sections.
while True:
items_iter = iter(items)
while True:
if stop_index is not None and self.position >= stop_index:
return
if advance_by == 0:
return
try:
item = next(items_iter)
except StopIteration:
break
self.position += 1
advance_by -= 1
yield item
if section.next_id:
# Follow link to get next section.
section = self.notification_log[section.next_id]
items = section.items
self.section_count += 1
else:
break
[docs] def read_list(self, advance_by=None):
return list(self.read_items(advance_by=advance_by))
[docs]class NotificationLogView(object):
def __init__(self, notification_log, json_encoder_class=None):
assert isinstance(notification_log, LocalNotificationLog), type(notification_log)
self.notification_log = notification_log
self.json_encoder_class = json_encoder_class or ObjectJSONEncoder
[docs] def present_section(self, section_id):
section = self.notification_log[section_id]
is_archived = bool(section.next_id)
section_json = json_dumps(section.__dict__, self.json_encoder_class)
return section_json, is_archived