Source code for eventsourcing.popo

from __future__ import annotations

from collections import defaultdict
from threading import Lock
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set
from uuid import UUID

from eventsourcing.persistence import (
    AggregateRecorder,
    ApplicationRecorder,
    InfrastructureFactory,
    IntegrityError,
    Notification,
    ProcessRecorder,
    StoredEvent,
    Tracking,
)
from eventsourcing.utils import reversed_keys


[docs]class POPOAggregateRecorder(AggregateRecorder):
[docs] def __init__(self) -> None: self._stored_events: List[StoredEvent] = [] self._stored_events_index: Dict[UUID, Dict[int, int]] = defaultdict(dict) self._database_lock = Lock()
[docs] def insert_events( self, stored_events: List[StoredEvent], **kwargs: Any ) -> Optional[Sequence[int]]: self._insert_events(stored_events, **kwargs) return None
def _insert_events( self, stored_events: List[StoredEvent], **kwargs: Any ) -> Optional[Sequence[int]]: with self._database_lock: self._assert_uniqueness(stored_events, **kwargs) return self._update_table(stored_events, **kwargs) def _assert_uniqueness( self, stored_events: List[StoredEvent], **kwargs: Any ) -> None: new = set() for s in stored_events: # Check events don't already exist. if s.originator_version in self._stored_events_index[s.originator_id]: raise IntegrityError(f"Stored event already recorded: {s}") new.add((s.originator_id, s.originator_version)) # Check new events are unique. if len(new) < len(stored_events): raise IntegrityError(f"Stored events are not unique: {stored_events}") def _update_table( self, stored_events: List[StoredEvent], **kwargs: Any ) -> Optional[Sequence[int]]: notification_ids = [] for s in stored_events: self._stored_events.append(s) self._stored_events_index[s.originator_id][s.originator_version] = ( len(self._stored_events) - 1 ) notification_ids.append(len(self._stored_events)) return notification_ids
[docs] def select_events( self, originator_id: UUID, gt: Optional[int] = None, lte: Optional[int] = None, desc: bool = False, limit: Optional[int] = None, ) -> List[StoredEvent]: with self._database_lock: results = [] index = self._stored_events_index[originator_id] positions: Iterable[int] if desc: positions = reversed_keys(index) else: positions = index.keys() for p in positions: if gt is not None: if not p > gt: continue if lte is not None: if not p <= lte: continue s = self._stored_events[index[p]] results.append(s) if len(results) == limit: break return results
[docs]class POPOApplicationRecorder(ApplicationRecorder, POPOAggregateRecorder):
[docs] def insert_events( self, stored_events: List[StoredEvent], **kwargs: Any ) -> Optional[Sequence[int]]: return self._insert_events(stored_events, **kwargs)
[docs] def select_notifications( self, start: int, limit: int, stop: Optional[int] = None, topics: Sequence[str] = (), ) -> List[Notification]: with self._database_lock: results = [] start = max(start, 1) # Don't use negative indexes! i = start - 1 while True: if stop is not None and i > stop - 1: break try: s = self._stored_events[i] except IndexError: break i += 1 if topics and s.topic not in topics: continue n = Notification( id=i, originator_id=s.originator_id, originator_version=s.originator_version, topic=s.topic, state=s.state, ) results.append(n) if len(results) == limit: break return results
[docs] def max_notification_id(self) -> int: with self._database_lock: return len(self._stored_events)
[docs]class POPOProcessRecorder(ProcessRecorder, POPOApplicationRecorder):
[docs] def __init__(self) -> None: super().__init__() self._tracking_table: Dict[str, Set[int]] = defaultdict(set) self._max_tracking_ids: Dict[str, int] = defaultdict(lambda: 0)
def _assert_uniqueness( self, stored_events: List[StoredEvent], **kwargs: Any ) -> None: super()._assert_uniqueness(stored_events, **kwargs) t: Optional[Tracking] = kwargs.get("tracking", None) if t and t.notification_id in self._tracking_table[t.application_name]: raise IntegrityError( f"Already recorded notification ID {t.notification_id} " f"for application {t.application_name}" ) def _update_table( self, stored_events: List[StoredEvent], **kwargs: Any ) -> Optional[Sequence[int]]: notification_ids = super()._update_table(stored_events, **kwargs) t: Optional[Tracking] = kwargs.get("tracking", None) if t: self._tracking_table[t.application_name].add(t.notification_id) if self._max_tracking_ids[t.application_name] < t.notification_id: self._max_tracking_ids[t.application_name] = t.notification_id return notification_ids
[docs] def max_tracking_id(self, application_name: str) -> int: with self._database_lock: return self._max_tracking_ids[application_name]
[docs] def has_tracking_id(self, application_name: str, notification_id: int) -> bool: with self._database_lock: return notification_id in self._tracking_table[application_name]
[docs]class Factory(InfrastructureFactory):
[docs] def aggregate_recorder(self, purpose: str = "events") -> AggregateRecorder: return POPOAggregateRecorder()
[docs] def application_recorder(self) -> ApplicationRecorder: return POPOApplicationRecorder()
[docs] def process_recorder(self) -> ProcessRecorder: return POPOProcessRecorder()