import json
from abc import ABC, abstractmethod
from base64 import b64decode, b64encode
from typing import Generic, List, Optional, Sequence
from uuid import UUID
from eventsourcing.application import NotificationLog, Section, TApplication
from eventsourcing.persistence import Notification
[docs]class NotificationLogInterface(ABC):
"""
Abstract base class for obtaining serialised
sections of a notification log.
"""
[docs] @abstractmethod
def get_log_section(self, section_id: str) -> str:
"""
Returns a serialised :class:`~eventsourcing.application.Section`
from a notification log.
"""
[docs] @abstractmethod
def get_notifications(
self, start: int, limit: int, topics: Sequence[str] = ()
) -> str:
"""
Returns a serialised list of :class:`~eventsourcing.persistence.Notification`
objects from a notification log.
"""
[docs]class NotificationLogJSONService(NotificationLogInterface, Generic[TApplication]):
"""
Presents serialised sections of a notification log.
"""
[docs] def __init__(self, app: TApplication):
"""
Initialises service with given application.
"""
self.app = app
[docs] def get_log_section(self, section_id: str) -> str:
"""
Returns JSON serialised :class:`~eventsourcing.application.Section`
from a notification log.
"""
section = self.app.notification_log[section_id]
return json.dumps(
{
"id": section.id,
"next_id": section.next_id,
"items": [
{
"id": item.id,
"originator_id": item.originator_id.hex,
"originator_version": item.originator_version,
"topic": item.topic,
"state": b64encode(item.state).decode("utf8"),
}
for item in section.items
],
}
)
[docs] def get_notifications(
self, start: int, limit: int, topics: Sequence[str] = ()
) -> str:
notifications = self.app.notification_log.select(
start=start, limit=limit, topics=topics
)
return json.dumps(
[
{
"id": notification.id,
"originator_id": notification.originator_id.hex,
"originator_version": notification.originator_version,
"topic": notification.topic,
"state": b64encode(notification.state).decode("utf8"),
}
for notification in notifications
]
)
[docs]class NotificationLogJSONClient(NotificationLog):
"""
Presents deserialized sections of a notification log.
"""
[docs] def __init__(self, interface: NotificationLogInterface):
"""
Initialises log with a given interface.
"""
self.interface = interface
[docs] def __getitem__(self, section_id: str) -> Section:
"""
Returns a :class:`Section` of
:class:`~eventsourcing.persistence.Notification` objects
from the notification log.
"""
body = self.interface.get_log_section(section_id)
section = json.loads(body)
return Section(
id=section["id"],
next_id=section["next_id"],
items=[
Notification(
id=item["id"],
originator_id=UUID(item["originator_id"]),
originator_version=item["originator_version"],
topic=item["topic"],
state=b64decode(item["state"].encode("utf8")),
)
for item in section["items"]
],
)
[docs] def select(
self,
start: int,
limit: int,
stop: Optional[int] = None,
topics: Sequence[str] = (),
) -> List[Notification]:
"""
Returns a selection
:class:`~eventsourcing.persistence.Notification` objects
from the notification log.
"""
return [
Notification(
id=item["id"],
originator_id=UUID(item["originator_id"]),
originator_version=item["originator_version"],
topic=item["topic"],
state=b64decode(item["state"].encode("utf8")),
)
for item in json.loads(
self.interface.get_notifications(
start=start, limit=limit, topics=topics
)
)
]