from __future__ import annotations
import json
from abc import ABC, abstractmethod
from base64 import b64decode, b64encode
from typing import TYPE_CHECKING, Generic
from uuid import UUID
from eventsourcing.application import NotificationLog, Section, TApplication
from eventsourcing.persistence import Notification
if TYPE_CHECKING:
from collections.abc import Sequence
[docs]
class NotificationLogInterface(ABC):
"""Abstract base class for obtaining serialised
sections of a notification log.
"""
[docs]
@abstractmethod
def get_log_section(self, section_id: str) -> str:
"""Returns a serialised :class:`~eventsourcing.application.Section`
from a notification log.
"""
[docs]
@abstractmethod
def get_notifications(
self,
start: int | None,
limit: int,
topics: Sequence[str] = (),
*,
inclusive_of_start: bool = True,
) -> str:
"""Returns a serialised list of :class:`~eventsourcing.persistence.Notification`
objects from a notification log.
"""
[docs]
class NotificationLogJSONService(NotificationLogInterface, Generic[TApplication]):
"""Presents serialised sections of a notification log."""
[docs]
def __init__(self, app: TApplication):
"""Initialises service with given application."""
self.app = app
[docs]
def get_log_section(self, section_id: str) -> str:
"""Returns JSON serialised :class:`~eventsourcing.application.Section`
from a notification log.
"""
section = self.app.notification_log[section_id]
return json.dumps(
{
"id": section.id,
"next_id": section.next_id,
"items": [
{
"id": item.id,
"originator_id": item.originator_id.hex,
"originator_version": item.originator_version,
"topic": item.topic,
"state": b64encode(item.state).decode("utf8"),
}
for item in section.items
],
}
)
[docs]
def get_notifications(
self,
start: int | None,
limit: int,
topics: Sequence[str] = (),
*,
inclusive_of_start: bool = True,
) -> str:
notifications = self.app.notification_log.select(
start=start,
limit=limit,
topics=topics,
inclusive_of_start=inclusive_of_start,
)
return json.dumps(
[
{
"id": notification.id,
"originator_id": notification.originator_id.hex,
"originator_version": notification.originator_version,
"topic": notification.topic,
"state": b64encode(notification.state).decode("utf8"),
}
for notification in notifications
]
)
[docs]
class NotificationLogJSONClient(NotificationLog):
"""Presents deserialized sections of a notification log."""
[docs]
def __init__(self, interface: NotificationLogInterface):
"""Initialises log with a given interface."""
self.interface = interface
[docs]
def __getitem__(self, section_id: str) -> Section:
"""Returns a :class:`Section` of
:class:`~eventsourcing.persistence.Notification` objects
from the notification log.
"""
body = self.interface.get_log_section(section_id)
section = json.loads(body)
return Section(
id=section["id"],
next_id=section["next_id"],
items=[
Notification(
id=item["id"],
originator_id=UUID(item["originator_id"]),
originator_version=item["originator_version"],
topic=item["topic"],
state=b64decode(item["state"].encode("utf8")),
)
for item in section["items"]
],
)
[docs]
def select(
self,
start: int | None,
limit: int,
stop: int | None = None,
topics: Sequence[str] = (),
*,
inclusive_of_start: bool = True,
) -> list[Notification]:
"""Returns a selection of
:class:`~eventsourcing.persistence.Notification` objects
from the notification log.
"""
return [
Notification(
id=item["id"],
originator_id=UUID(item["originator_id"]),
originator_version=item["originator_version"],
topic=item["topic"],
state=b64decode(item["state"].encode("utf8")),
)
for item in json.loads(
self.interface.get_notifications(
start=start,
limit=limit,
topics=topics,
inclusive_of_start=inclusive_of_start,
)
)
]