from __future__ import annotations
import contextlib
from copy import deepcopy
from typing import TYPE_CHECKING
from eventsourcing.dcb.api import (
DCBAppendCondition,
DCBEvent,
DCBQuery,
DCBReadResponse,
DCBRecorder,
DCBSequencedEvent,
)
from eventsourcing.dcb.persistence import (
DCBInfrastructureFactory,
DCBListenNotifySubscription,
)
from eventsourcing.persistence import IntegrityError, ProgrammingError
from eventsourcing.popo import POPOFactory, POPORecorder, POPOTrackingRecorder
if TYPE_CHECKING:
from collections.abc import Iterator, Sequence
from threading import Event
[docs]
class InMemoryDCBRecorder(DCBRecorder, POPORecorder):
[docs]
def __init__(self) -> None:
super().__init__()
self.events: list[DCBSequencedEvent] = []
self.position_sequence = self._position_sequence_generator()
self._listeners: set[Event] = set()
[docs]
def read(
self,
query: DCBQuery | None = None,
*,
after: int | None = None,
limit: int | None = None,
) -> DCBReadResponse:
query = query or DCBQuery()
with self._database_lock:
events_generator = (
event
for event in self.events
if (after is None or event.position > after)
and (
not query.items
or any(
(not item.types or event.event.type in item.types)
and (set(event.event.tags) >= set(item.tags))
for item in query.items
)
)
)
events = []
for i, event in enumerate(events_generator):
if limit is not None and i >= limit:
break
events.append(deepcopy(event))
if limit is None:
head = self.events[-1].position if self.events else None
else:
head = events[-1].position if events else None
# TODO: Change the previous few lines to actually be an iterator.
return SimpleDCBReadResponse(iter(events), head)
[docs]
def append(
self, events: Sequence[DCBEvent], condition: DCBAppendCondition | None = None
) -> int:
if len(events) == 0:
msg = "Should be at least one event. Avoid this elsewhere"
raise ProgrammingError(msg)
with self._database_lock:
if condition is not None:
read_response = self.read(
query=condition.fail_if_events_match,
after=condition.after,
limit=1,
)
try:
next(read_response)
except StopIteration:
pass
else:
raise IntegrityError(condition)
self.events.extend(
DCBSequencedEvent(
position=next(self.position_sequence),
event=deepcopy(event),
)
for event in events
)
self._notify_listeners()
return self.events[-1].position
def _position_sequence_generator(self) -> Iterator[int]:
position = 1
while True:
yield position
position += 1
[docs]
def subscribe(
self,
query: DCBQuery | None = None,
*,
after: int | None = None,
) -> InMemorySubscription:
return InMemorySubscription(self, query=query, after=after)
[docs]
def listen(self, event: Event) -> None:
self._listeners.add(event)
[docs]
def unlisten(self, event: Event) -> None:
with contextlib.suppress(KeyError):
self._listeners.remove(event)
def _notify_listeners(self) -> None:
for listener in self._listeners:
listener.set()
[docs]
class InMemorySubscription(DCBListenNotifySubscription[InMemoryDCBRecorder]):
[docs]
def __init__(
self,
recorder: InMemoryDCBRecorder,
query: DCBQuery | None = None,
after: int | None = None,
) -> None:
super().__init__(recorder=recorder, query=query, after=after)
self._recorder.listen(self._has_been_notified)
[docs]
def stop(self) -> None:
super().stop()
self._recorder.unlisten(self._has_been_notified)
[docs]
class SimpleDCBReadResponse(DCBReadResponse):
[docs]
def __init__(self, events: Iterator[DCBSequencedEvent], head: int | None = None):
self.events = events
self._head_was_given = head is not None
self._head = head
@property
def head(self) -> int | None:
return self._head
def __next__(self) -> DCBSequencedEvent:
event = next(self.events)
if not self._head_was_given: # pragma: no cover
self._head = event.position
return event
# def next_batch(self) -> list[DCBSequencedEvent]:
# """
# Returns a batch of events as a list.
# Updates the head position similar to __next__.
# """
# result = []
# max_batch_size = 100
#
# # Get up to max_batch_size events from the iterator
# try:
# for _ in range(max_batch_size):
# event = next(self.events)
# if not self._head_was_given:
# self._head = event.position
# result.append(event)
# except StopIteration:
# pass
# return result
[docs]
class InMemoryDCBFactory(POPOFactory, DCBInfrastructureFactory[POPOTrackingRecorder]):
[docs]
def dcb_recorder(self) -> DCBRecorder:
return InMemoryDCBRecorder()