Source code for eventsourcing.dcb.persistence

from __future__ import annotations

import threading
from abc import ABC, abstractmethod
from collections.abc import Iterator
from queue import Queue
from typing import TYPE_CHECKING, Any, Generic

from eventsourcing.dcb.api import (
    DCBAppendCondition,
    DCBEvent,
    DCBQuery,
    DCBQueryItem,
    DCBReadResponse,
    DCBRecorder,
    DCBSequencedEvent,
    DCBSubscription,
    TDCBRecorder_co,
)
from eventsourcing.dcb.domain import (
    Selector,
    Tagged,
    TDecision,
)
from eventsourcing.persistence import BaseInfrastructureFactory, TTrackingRecorder
from eventsourcing.utils import get_topic

if TYPE_CHECKING:
    from collections.abc import Sequence


[docs] class DCBMapper(ABC, Generic[TDecision]):
[docs] @abstractmethod def to_dcb_event(self, event: Tagged[TDecision]) -> DCBEvent: raise NotImplementedError # pragma: no cover
[docs] @abstractmethod def to_domain_event(self, event: DCBEvent) -> Tagged[TDecision]: raise NotImplementedError # pragma: no cover
[docs] class DCBEventStore(Generic[TDecision]):
[docs] def __init__(self, mapper: DCBMapper[TDecision], recorder: DCBRecorder): self.mapper = mapper self.recorder = recorder
[docs] def append( self, events: Sequence[Tagged[TDecision]], cb: Selector | Sequence[Selector] | None = None, after: int | None = None, ) -> int: if len(events) == 0: return 0 condition = ( None if cb is None and after is None else DCBAppendCondition( fail_if_events_match=self._cb_to_dcb_query(cb), after=after, ) ) return self.recorder.append( events=[self.mapper.to_dcb_event(e) for e in events], condition=condition, )
[docs] def read( self, cb: Selector | Sequence[Selector] | None = None, *, after: int | None = None, ) -> DCBEventStoreReadResponse[TDecision]: query = self._cb_to_dcb_query(cb) read_response = self.recorder.read( query=query, after=after, ) return DCBEventStoreReadResponse(read_response, self.mapper)
@staticmethod def _cb_to_dcb_query( cb: Selector | Sequence[Selector] | None = None, ) -> DCBQuery: cb = [cb] if isinstance(cb, Selector) else cb or [] return DCBQuery( items=[ DCBQueryItem( types=[get_topic(t) for t in s.types], tags=list(s.tags), ) for s in cb ] )
[docs] class DCBEventStoreReadResponse(Iterator[Tagged[TDecision]]):
[docs] def __init__( self, dcb_read_response: DCBReadResponse, mapper: DCBMapper[TDecision] ): self._dcb_read_response = dcb_read_response self._mapper = mapper
@property def head(self) -> int | None: return self._dcb_read_response.head def __next__(self) -> Tagged[TDecision]: dcb_sequenced_event = self._dcb_read_response.__next__() return self._mapper.to_domain_event(dcb_sequenced_event.event)
[docs] class NotFoundError(Exception): pass
[docs] class DCBInfrastructureFactory(BaseInfrastructureFactory[TTrackingRecorder], ABC):
[docs] @abstractmethod def dcb_recorder(self) -> DCBRecorder: pass # pragma: no cover
[docs] class DCBListenNotifySubscription(DCBSubscription[TDCBRecorder_co]):
[docs] def __init__( self, recorder: TDCBRecorder_co, query: DCBQuery | None = None, after: int | None = None, ) -> None: super().__init__(recorder=recorder, query=query, after=after) self.select_limit = 500 self._events: Sequence[DCBSequencedEvent] = [] self._events_index: int = 0 self._events_queue: Queue[Sequence[DCBSequencedEvent]] = Queue(maxsize=10) self._has_been_notified = threading.Event() self._thread_error: BaseException | None = None self._pull_thread = threading.Thread(target=self._loop_on_pull) self._pull_thread.start()
def __exit__(self, *args: object, **kwargs: Any) -> None: super().__exit__(*args, **kwargs) self._pull_thread.join()
[docs] def stop(self) -> None: """Stops the subscription.""" super().stop() self._events_queue.put([]) self._has_been_notified.set()
def __next__(self) -> DCBSequencedEvent: # If necessary, get a new list of events from the recorder. if self._events_index == len(self._events) and not self._has_been_stopped: self._events = self._events_queue.get() self._events_index = 0 # Stop the iteration if necessary, maybe raise thread error. if self._has_been_stopped or not self._events: if self._thread_error is not None: raise self._thread_error raise StopIteration # Return a notification from previously obtained list. notification = self._events[self._events_index] self._events_index += 1 return notification def _loop_on_pull(self) -> None: try: self._pull() # Already recorded events. while not self._has_been_stopped: self._has_been_notified.wait() self._pull() # Newly recorded events. except BaseException as e: # pragma: no cover if self._thread_error is None: self._thread_error = e self.stop() def _pull(self) -> None: while not self._has_been_stopped: self._has_been_notified.clear() events = list( self._recorder.read( query=self._query, after=self._last_position, limit=self.select_limit, ) ) if len(events) > 0: # print("Putting", len(events), "events into queue") self._events_queue.put(events) self._last_position = events[-1].position if len(events) < self.select_limit: break