Source code for eventsourcing.dcb.api

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Iterator
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Generic, TypeVar

from eventsourcing.persistence import ProgrammingError

if TYPE_CHECKING:
    from collections.abc import Sequence

    from typing_extensions import Self


[docs] @dataclass class DCBQueryItem: types: list[str] = field(default_factory=list) tags: list[str] = field(default_factory=list)
[docs] @dataclass class DCBQuery: items: list[DCBQueryItem] = field(default_factory=list)
[docs] @dataclass class DCBAppendCondition: fail_if_events_match: DCBQuery = field(default_factory=DCBQuery) after: int | None = None
[docs] @dataclass class DCBEvent: type: str data: bytes tags: list[str] = field(default_factory=list)
[docs] @dataclass class DCBSequencedEvent: event: DCBEvent position: int
[docs] class DCBReadResponse(Iterator[DCBSequencedEvent], ABC): @property @abstractmethod def head(self) -> int | None: pass # pragma: no cover @abstractmethod def __next__(self) -> DCBSequencedEvent: pass # pragma: no cover
# @abstractmethod # def next_batch(self) -> list[DCBSequencedEvent]: # """ # Returns a batch of events as a list. # Updates the head position similar to __next__. # """
[docs] class DCBRecorder(ABC):
[docs] @abstractmethod def read( self, query: DCBQuery | None = None, *, after: int | None = None, limit: int | None = None, ) -> DCBReadResponse: """ Returns all events, unless 'after' is given then only those with position greater than 'after', and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags. """
[docs] @abstractmethod def append( self, events: Sequence[DCBEvent], condition: DCBAppendCondition | None = None ) -> int: """ Appends given events to the event store, unless the condition fails. """
[docs] @abstractmethod def subscribe( self, query: DCBQuery | None = None, *, after: int | None = None, ) -> DCBSubscription[Self]: """ Returns all events, unless 'after' is given then only those with position greater than 'after', and unless any query items are given, then only those that match at least one query item. An event matches a query item if its type is in the item types or there are no item types, and if all the item tags are in the event tags. The subscription will block when the last recorded event is received, and then continue when new events are recorded. """
TDCBRecorder_co = TypeVar("TDCBRecorder_co", bound=DCBRecorder, covariant=True)
[docs] class DCBSubscription(Iterator[DCBSequencedEvent], Generic[TDCBRecorder_co]):
[docs] def __init__( self, recorder: TDCBRecorder_co, query: DCBQuery | None = None, after: int | None = None, ) -> None: self._recorder = recorder self._query = query self._has_been_entered = False self._has_been_stopped = False self._last_position: int = after or 0
def __enter__(self) -> Self: if self._has_been_entered: msg = "Already entered subscription context manager" raise ProgrammingError(msg) self._has_been_entered = True return self def __exit__(self, *args: object, **kwargs: Any) -> None: if not self._has_been_entered: msg = "Not already entered subscription context manager" raise ProgrammingError(msg) self.stop()
[docs] def stop(self) -> None: """Stops the subscription.""" self._has_been_stopped = True
def __iter__(self) -> Self: return self @abstractmethod def __next__(self) -> DCBSequencedEvent: """Returns the next DCBEvent in the sequence."""