from __future__ import annotations
import contextlib
from collections import defaultdict
from threading import Event, Lock
from typing import TYPE_CHECKING, Any
from eventsourcing.persistence import (
AggregateRecorder,
ApplicationRecorder,
InfrastructureFactory,
IntegrityError,
ListenNotifySubscription,
Notification,
ProcessRecorder,
StoredEvent,
Subscription,
Tracking,
TrackingRecorder,
)
from eventsourcing.utils import resolve_topic, reversed_keys
if TYPE_CHECKING:
from collections.abc import Iterable, Sequence
from uuid import UUID
class POPORecorder:
def __init__(self) -> None:
self._database_lock = Lock()
[docs]
class POPOAggregateRecorder(POPORecorder, AggregateRecorder):
[docs]
def __init__(self) -> None:
super().__init__()
self._stored_events: list[StoredEvent] = []
self._stored_events_index: dict[UUID, dict[int, int]] = defaultdict(dict)
[docs]
def insert_events(
self, stored_events: list[StoredEvent], **kwargs: Any
) -> Sequence[int] | None:
self._insert_events(stored_events, **kwargs)
return None
def _insert_events(
self, stored_events: list[StoredEvent], **kwargs: Any
) -> Sequence[int] | None:
with self._database_lock:
self._assert_uniqueness(stored_events, **kwargs)
return self._update_table(stored_events, **kwargs)
def _assert_uniqueness(self, stored_events: list[StoredEvent], **_: Any) -> None:
new = set()
for s in stored_events:
# Check events don't already exist.
if s.originator_version in self._stored_events_index[s.originator_id]:
msg = f"Stored event already recorded: {s}"
raise IntegrityError(msg)
new.add((s.originator_id, s.originator_version))
# Check new events are unique.
if len(new) < len(stored_events):
msg = f"Stored events are not unique: {stored_events}"
raise IntegrityError(msg)
def _update_table(
self, stored_events: list[StoredEvent], **_: Any
) -> Sequence[int] | None:
notification_ids = []
for s in stored_events:
self._stored_events.append(s)
self._stored_events_index[s.originator_id][s.originator_version] = (
len(self._stored_events) - 1
)
notification_ids.append(len(self._stored_events))
return notification_ids
[docs]
def select_events(
self,
originator_id: UUID,
*,
gt: int | None = None,
lte: int | None = None,
desc: bool = False,
limit: int | None = None,
) -> list[StoredEvent]:
with self._database_lock:
results = []
index = self._stored_events_index[originator_id]
positions: Iterable[int]
positions = reversed_keys(index) if desc else index.keys()
for p in positions:
if gt is not None and not p > gt:
continue
if lte is not None and not p <= lte:
continue
s = self._stored_events[index[p]]
results.append(s)
if len(results) == limit:
break
return results
[docs]
class POPOApplicationRecorder(POPOAggregateRecorder, ApplicationRecorder):
[docs]
def __init__(self) -> None:
super().__init__()
self._listeners: set[Event] = set()
[docs]
def insert_events(
self, stored_events: list[StoredEvent], **kwargs: Any
) -> Sequence[int] | None:
notification_ids = self._insert_events(stored_events, **kwargs)
self._notify_listeners()
return notification_ids
[docs]
def select_notifications(
self,
start: int | None,
limit: int,
stop: int | None = None,
topics: Sequence[str] = (),
*,
inclusive_of_start: bool = True,
) -> list[Notification]:
with self._database_lock:
results = []
if start is None:
start = 1
inclusive_of_start = True
if not inclusive_of_start:
start += 1
start = max(start, 1) # Don't use negative indexes!
i = start - 1 # Zero-based indexing.
while True:
if stop is not None and i > stop - 1:
break
try:
s = self._stored_events[i]
except IndexError:
break
i += 1
if topics and s.topic not in topics:
continue
n = Notification(
id=i,
originator_id=s.originator_id,
originator_version=s.originator_version,
topic=s.topic,
state=s.state,
)
results.append(n)
if len(results) == limit:
break
return results
[docs]
def max_notification_id(self) -> int | None:
with self._database_lock:
return len(self._stored_events) or None
[docs]
def subscribe(
self, gt: int | None = None, topics: Sequence[str] = ()
) -> Subscription[ApplicationRecorder]:
return POPOSubscription(recorder=self, gt=gt, topics=topics)
def listen(self, event: Event) -> None:
self._listeners.add(event)
def unlisten(self, event: Event) -> None:
with contextlib.suppress(KeyError):
self._listeners.remove(event)
def _notify_listeners(self) -> None:
for listener in self._listeners:
listener.set()
[docs]
class POPOSubscription(ListenNotifySubscription[POPOApplicationRecorder]):
[docs]
def __init__(
self,
recorder: POPOApplicationRecorder,
gt: int | None = None,
topics: Sequence[str] = (),
) -> None:
assert isinstance(recorder, POPOApplicationRecorder)
super().__init__(recorder=recorder, gt=gt, topics=topics)
self._recorder.listen(self._has_been_notified)
[docs]
def stop(self) -> None:
super().stop()
self._recorder.unlisten(self._has_been_notified)
[docs]
class POPOTrackingRecorder(POPORecorder, TrackingRecorder):
[docs]
def __init__(self) -> None:
super().__init__()
self._tracking_table: dict[str, set[int]] = defaultdict(set)
self._max_tracking_ids: dict[str, int | None] = defaultdict(lambda: None)
def _assert_tracking_uniqueness(self, tracking: Tracking) -> None:
if tracking.notification_id in self._tracking_table[tracking.application_name]:
msg = (
f"Already recorded notification ID {tracking.notification_id} "
f"for application {tracking.application_name}"
)
raise IntegrityError(msg)
[docs]
def insert_tracking(self, tracking: Tracking) -> None:
with self._database_lock:
self._assert_tracking_uniqueness(tracking)
self._insert_tracking(tracking)
def _insert_tracking(self, tracking: Tracking) -> None:
self._tracking_table[tracking.application_name].add(tracking.notification_id)
max_tracking_id = self._max_tracking_ids[tracking.application_name]
if max_tracking_id is None or max_tracking_id < tracking.notification_id:
self._max_tracking_ids[tracking.application_name] = tracking.notification_id
[docs]
def max_tracking_id(self, application_name: str) -> int | None:
with self._database_lock:
return self._max_tracking_ids[application_name]
[docs]
def has_tracking_id(
self, application_name: str, notification_id: int | None
) -> bool:
if notification_id is None:
return True
with self._database_lock:
return notification_id in self._tracking_table[application_name]
[docs]
class POPOProcessRecorder(
POPOTrackingRecorder, POPOApplicationRecorder, ProcessRecorder
):
def _assert_uniqueness(
self, stored_events: list[StoredEvent], **kwargs: Any
) -> None:
super()._assert_uniqueness(stored_events, **kwargs)
t: Tracking | None = kwargs.get("tracking")
if t:
self._assert_tracking_uniqueness(t)
def _update_table(
self, stored_events: list[StoredEvent], **kwargs: Any
) -> Sequence[int] | None:
notification_ids = super()._update_table(stored_events, **kwargs)
t: Tracking | None = kwargs.get("tracking")
if t:
self._insert_tracking(t)
return notification_ids
[docs]
class POPOFactory(InfrastructureFactory[POPOTrackingRecorder]):
[docs]
def aggregate_recorder(self, purpose: str = "events") -> AggregateRecorder:
return POPOAggregateRecorder()
[docs]
def application_recorder(self) -> ApplicationRecorder:
return POPOApplicationRecorder()
[docs]
def tracking_recorder(
self, tracking_recorder_class: type[POPOTrackingRecorder] | None = None
) -> POPOTrackingRecorder:
if tracking_recorder_class is None:
tracking_recorder_topic = self.env.get(self.TRACKING_RECORDER_TOPIC)
if tracking_recorder_topic:
tracking_recorder_class = resolve_topic(tracking_recorder_topic)
else:
tracking_recorder_class = POPOTrackingRecorder
assert tracking_recorder_class is not None
assert issubclass(tracking_recorder_class, POPOTrackingRecorder)
return tracking_recorder_class()
[docs]
def process_recorder(self) -> ProcessRecorder:
return POPOProcessRecorder()
Factory = POPOFactory