Source code for eventsourcing.interface

import json
from abc import ABC, abstractmethod
from base64 import b64decode, b64encode
from typing import Generic
from uuid import UUID

from eventsourcing.application import NotificationLog, Section, TApplication
from eventsourcing.persistence import Notification


[docs]class NotificationLogInterface(ABC): """ Abstract base class for obtaining serialised sections of a notification log. """
[docs] @abstractmethod def get_log_section(self, section_id: str) -> str: """ Returns a serialised :class:`~eventsourcing.application.Section` from a notification log. """
[docs]class NotificationLogJSONService(NotificationLogInterface, Generic[TApplication]): """ Presents serialised sections of a notification log. """
[docs] def __init__(self, app: TApplication): """ Initialises service with given application. """ self.app = app
[docs] def get_log_section(self, section_id: str) -> str: """ Returns JSON serialised :class:`~eventsourcing.application.Section` from a notification log. """ section = self.app.log[section_id] return json.dumps( { "id": section.id, "next_id": section.next_id, "items": [ { "id": item.id, "originator_id": item.originator_id.hex, "originator_version": item.originator_version, "topic": item.topic, "state": b64encode(item.state).decode("utf8"), } for item in section.items ], } )
[docs]class NotificationLogJSONClient(NotificationLog): """ Presents deserialized sections of a notification log. """
[docs] def __init__(self, interface: NotificationLogInterface): """ Initialises log with a given interface. """ self.interface = interface
[docs] def __getitem__(self, section_id: str) -> Section: body = self.interface.get_log_section(section_id) section = json.loads(body) return Section( id=section["id"], next_id=section["next_id"], items=[ Notification( id=item["id"], originator_id=UUID(item["originator_id"]), originator_version=item["originator_version"], topic=item["topic"], state=b64decode(item["state"].encode("utf8")), ) for item in section["items"] ], )