Source code for eventsourcing.interface

from __future__ import annotations

import json
from abc import ABC, abstractmethod
from base64 import b64decode, b64encode
from typing import Generic, List, Optional, Sequence
from uuid import UUID

from eventsourcing.application import NotificationLog, Section, TApplication
from eventsourcing.persistence import Notification


[docs]class NotificationLogInterface(ABC): """ Abstract base class for obtaining serialised sections of a notification log. """
[docs] @abstractmethod def get_log_section(self, section_id: str) -> str: """ Returns a serialised :class:`~eventsourcing.application.Section` from a notification log. """
[docs] @abstractmethod def get_notifications( self, start: int, limit: int, topics: Sequence[str] = () ) -> str: """ Returns a serialised list of :class:`~eventsourcing.persistence.Notification` objects from a notification log. """
[docs]class NotificationLogJSONService(NotificationLogInterface, Generic[TApplication]): """ Presents serialised sections of a notification log. """
[docs] def __init__(self, app: TApplication): """ Initialises service with given application. """ self.app = app
[docs] def get_log_section(self, section_id: str) -> str: """ Returns JSON serialised :class:`~eventsourcing.application.Section` from a notification log. """ section = self.app.notification_log[section_id] return json.dumps( { "id": section.id, "next_id": section.next_id, "items": [ { "id": item.id, "originator_id": item.originator_id.hex, "originator_version": item.originator_version, "topic": item.topic, "state": b64encode(item.state).decode("utf8"), } for item in section.items ], } )
[docs] def get_notifications( self, start: int, limit: int, topics: Sequence[str] = () ) -> str: notifications = self.app.notification_log.select( start=start, limit=limit, topics=topics ) return json.dumps( [ { "id": notification.id, "originator_id": notification.originator_id.hex, "originator_version": notification.originator_version, "topic": notification.topic, "state": b64encode(notification.state).decode("utf8"), } for notification in notifications ] )
[docs]class NotificationLogJSONClient(NotificationLog): """ Presents deserialized sections of a notification log. """
[docs] def __init__(self, interface: NotificationLogInterface): """ Initialises log with a given interface. """ self.interface = interface
[docs] def __getitem__(self, section_id: str) -> Section: """ Returns a :class:`Section` of :class:`~eventsourcing.persistence.Notification` objects from the notification log. """ body = self.interface.get_log_section(section_id) section = json.loads(body) return Section( id=section["id"], next_id=section["next_id"], items=[ Notification( id=item["id"], originator_id=UUID(item["originator_id"]), originator_version=item["originator_version"], topic=item["topic"], state=b64decode(item["state"].encode("utf8")), ) for item in section["items"] ], )
[docs] def select( self, start: int, limit: int, stop: Optional[int] = None, topics: Sequence[str] = (), ) -> List[Notification]: """ Returns a selection of :class:`~eventsourcing.persistence.Notification` objects from the notification log. """ return [ Notification( id=item["id"], originator_id=UUID(item["originator_id"]), originator_version=item["originator_version"], topic=item["topic"], state=b64decode(item["state"].encode("utf8")), ) for item in json.loads( self.interface.get_notifications( start=start, limit=limit, topics=topics ) ) ]