from __future__ import annotations
import contextlib
import os
import threading
import weakref
from abc import ABC, abstractmethod
from collections.abc import Iterator, Sequence
from threading import Event, Thread
from traceback import format_exc
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast, get_origin
from warnings import warn
from eventsourcing.application import Application, ProcessingEvent
from eventsourcing.dcb.api import DCBQuery, DCBQueryItem
from eventsourcing.dcb.application import DCBApplication
from eventsourcing.dcb.domain import Decision, Tagged
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import DomainEventProtocol, TAggregateID
from eventsourcing.persistence import (
InfrastructureFactory,
Mapper,
ProcessRecorder,
Tracking,
TrackingRecorder,
TTrackingRecorder,
WaitInterruptedError,
)
from eventsourcing.utils import Environment, EnvType
if TYPE_CHECKING:
from types import TracebackType
from typing_extensions import Self
[docs]
class ApplicationSubscription(
Iterator[tuple[DomainEventProtocol[TAggregateID], Tracking]]
):
"""An iterator that yields all domain events recorded in an application
sequence that have notification IDs greater than a given value. The iterator
will block when all recorded domain events have been yielded, and then
continue when new events are recorded. Domain events are returned along
with tracking objects that identify the position in the application sequence.
"""
[docs]
def __init__(
self,
app: Application[TAggregateID],
gt: int | None = None,
topics: Sequence[str] = (),
):
"""
Starts a subscription to application's recorder.
"""
self.name = app.name
self.recorder = app.recorder
self.mapper: Mapper[TAggregateID] = app.mapper
self.subscription = self.recorder.subscribe(gt=gt, topics=topics)
[docs]
def stop(self) -> None:
"""Stops the subscription to the application's recorder."""
self.subscription.stop()
[docs]
def __enter__(self) -> Self:
"""Calls __enter__ on the stored event subscription."""
self.subscription.__enter__()
return self
[docs]
def __exit__(self, *args: object, **kwargs: Any) -> None:
"""Calls __exit__ on the stored event subscription."""
self.subscription.__exit__(*args, **kwargs)
[docs]
def __iter__(self) -> Self:
return self
[docs]
def __next__(self) -> tuple[DomainEventProtocol[TAggregateID], Tracking]:
"""Returns the next stored event from subscription to the application's
recorder. Constructs a tracking object that identifies the position of
the event in the application sequence. Constructs a domain event object
from the stored event object using the application's mapper. Returns a
tuple of the domain event object and the tracking object.
"""
notification = next(self.subscription)
tracking = Tracking(self.name, notification.id)
domain_event = self.mapper.to_domain_event(notification)
return domain_event, tracking
def __del__(self) -> None:
"""Stops the stored event subscription."""
with contextlib.suppress(AttributeError):
self.stop()
[docs]
class DCBApplicationSubscription(Iterator[tuple[Tagged[Decision], Tracking]]):
"""An iterator that yields all tagged decisions recorded in an application
sequence that have sequence numbers greater than a given value. The iterator
will block when all tagged decisions have been yielded, and then
continue when new ones are recorded. Tagged decisions are returned along
with tracking objects that identify the position in the application sequence.
"""
[docs]
def __init__(
self,
app: DCBApplication,
gt: int | None = None,
topics: Sequence[str] = (),
):
"""
Starts a subscription to application's recorder.
"""
self.name = app.name
self.recorder = app.recorder
self.mapper = app.mapper
self.subscription = self.recorder.subscribe(
query=DCBQuery(items=[DCBQueryItem(types=list(topics))]),
after=gt,
)
[docs]
def stop(self) -> None:
"""Stops the subscription to the application's recorder."""
self.subscription.stop()
[docs]
def __enter__(self) -> Self:
"""Calls __enter__ on the stored event subscription."""
self.subscription.__enter__()
return self
[docs]
def __exit__(self, *args: object, **kwargs: Any) -> None:
"""Calls __exit__ on the stored event subscription."""
self.subscription.__exit__(*args, **kwargs)
[docs]
def __iter__(self) -> Self:
return self
[docs]
def __next__(self) -> tuple[Tagged[Decision], Tracking]:
"""Returns the next stored event from subscription to the application's
recorder. Constructs a tracking object that identifies the position of
the event in the application sequence. Constructs a domain event object
from the stored event object using the application's mapper. Returns a
tuple of the domain event object and the tracking object.
"""
sequenced_event = next(self.subscription)
tracking = Tracking(self.name, sequenced_event.position)
tagged_decision = self.mapper.to_domain_event(sequenced_event.event)
return tagged_decision, tracking
def __del__(self) -> None:
"""Stops the stored event subscription."""
with contextlib.suppress(AttributeError):
self.stop()
[docs]
class Projection(ABC, Generic[TTrackingRecorder]):
name: str = ""
"""
Name of projection, used to pick prefixed environment
variables and define database table names.
"""
topics: tuple[str, ...] = ()
"""
Event topics, used to filter events in database when subscribing to an application.
"""
def __init_subclass__(cls, **kwargs: Any) -> None:
if "name" not in cls.__dict__:
cls.name = cls.__name__
[docs]
def __init__(
self,
view: TTrackingRecorder,
):
"""Initialises the view property with the given view argument."""
self._view = view
@property
def view(self) -> TTrackingRecorder:
"""Materialised view of an event-sourced application."""
return self._view
[docs]
@singledispatchmethod
@abstractmethod
def process_event(self, domain_event: Any, tracking: Tracking) -> None:
"""Process a domain event and track it."""
[docs]
class EventSourcedProjection(Application[TAggregateID], ABC):
"""Extends the :py:class:`~eventsourcing.application.Application` class
by using a process recorder as its application recorder, and by
processing domain events through its :py:func:`policy` method.
"""
topics: Sequence[str] = ()
[docs]
def __init__(self, env: EnvType | None = None) -> None:
super().__init__(env)
self.recorder: ProcessRecorder
self.processing_lock = threading.Lock()
[docs]
def construct_recorder(self) -> ProcessRecorder:
"""Constructs and returns a :class:`~eventsourcing.persistence.ProcessRecorder`
for the application to use as its application recorder.
"""
return self.factory.process_recorder()
[docs]
def process_event(
self, domain_event: DomainEventProtocol[TAggregateID], tracking: Tracking
) -> None:
"""Calls :func:`~eventsourcing.system.Follower.policy` method with the given
domain event and a new :class:`~eventsourcing.application.ProcessingEvent`
constructed with the given tracking object.
The policy method should collect any new aggregate events on the process
event object.
After the policy method returns, the processing event object will be recorded
by calling :py:func:`~eventsourcing.application.Application._record`,
which then returns list of :py:class:`~eventsourcing.persistence.Recording`.
After calling :func:`~eventsourcing.application.Application._take_snapshots`,
the recordings are passed in a call to
:py:func:`~eventsourcing.application.Application._notify`.
"""
processing_event = ProcessingEvent[TAggregateID](tracking=tracking)
self.policy(domain_event, processing_event)
recordings = self._record(processing_event)
self._take_snapshots(processing_event)
self.notify(processing_event.events)
self._notify(recordings)
[docs]
@singledispatchmethod
def policy(
self,
domain_event: DomainEventProtocol[TAggregateID],
processing_event: ProcessingEvent[TAggregateID],
) -> None:
"""Abstract domain event processing policy method. Must be
implemented by event processing applications. When
processing the given domain event, event processing
applications must use the :func:`~ProcessingEvent.collect_events`
method of the given :py:class:`~ProcessingEvent` object (not
the application's :func:`~eventsourcing.application.Application.save`
method) so that the new domain events will be recorded atomically
and uniquely with tracking information about the position of the processed
event in its application sequence.
"""
TApplication = TypeVar("TApplication")
TEventSourcedProjection = TypeVar(
"TEventSourcedProjection", bound=EventSourcedProjection[Any]
)
[docs]
class BaseProjectionRunner(Generic[TApplication]):
[docs]
def __init__(
self,
*,
projection: EventSourcedProjection[Any] | Projection[Any],
application_class: type[TApplication],
tracking_recorder: TrackingRecorder,
topics: Sequence[str],
env: EnvType | None = None,
) -> None:
self._projection = projection
self._is_interrupted = Event()
self._has_called_stop = False
self._tracking_recorder = tracking_recorder
# Construct and subscribe to the application.
self._subscription: ApplicationSubscription[Any] | DCBApplicationSubscription
# Do this for pyright (with the cast to TApplication below).
app: Any
# get_origin() because issubclass doesn't work with generic alias, and
# then 'or' with the class in case get_origin() returns None.
if issubclass(get_origin(application_class) or application_class, Application):
# cast() because that call to issubclass() doesn't narrow the type.
app = cast(type[Application[Any]], application_class)(env)
self.app_name = app.name
self._subscription = ApplicationSubscription(
app=app,
gt=self._tracking_recorder.max_tracking_id(app.name),
topics=topics,
)
elif issubclass(application_class, DCBApplication):
app = application_class(env)
self.app = app
self.app_name = app.name
self._subscription = DCBApplicationSubscription(
app=app,
gt=self._tracking_recorder.max_tracking_id(app.name),
topics=topics,
)
else: # pragma: no cover
msg = f"Unsupported application type: {application_class}"
raise TypeError(msg)
self.app = cast(TApplication, app)
# Start a thread to stop the subscription when the runner is interrupted.
self._thread_error: BaseException | None = None
self._stop_thread = Thread(
target=self._stop_subscription_when_stopping,
kwargs={
"subscription": self._subscription,
"is_stopping": self._is_interrupted,
},
)
self._stop_thread.start()
# Start a thread to iterate over the subscription.
self._processing_thread = Thread(
target=self._process_events_loop,
kwargs={
"subscription": self._subscription,
"projection": self._projection,
"is_stopping": self._is_interrupted,
"runner": weakref.ref(self),
},
)
self._processing_thread.start()
@property
def is_interrupted(self) -> Event:
return self._is_interrupted
@staticmethod
def _construct_env(name: str, env: EnvType | None = None) -> Environment:
"""Constructs environment from which projection will be configured."""
_env: dict[str, str] = {}
_env.update(os.environ)
if env is not None:
_env.update(env)
return Environment(name, _env)
[docs]
def stop(self) -> None:
"""Sets the "interrupted" event."""
self._has_called_stop = True
self._is_interrupted.set()
@staticmethod
def _stop_subscription_when_stopping(
subscription: ApplicationSubscription[TAggregateID],
is_stopping: Event,
) -> None:
"""Stops the application subscription, which
will stop the event-processing thread.
"""
try:
is_stopping.wait()
finally:
is_stopping.set()
subscription.stop()
@staticmethod
def _process_events_loop(
subscription: ApplicationSubscription[TAggregateID],
projection: EventSourcedProjection[Any] | Projection[Any],
is_stopping: Event,
runner: weakref.ReferenceType[
ProjectionRunner[Application[Any], TrackingRecorder]
],
) -> None:
"""Iterates over the subscription and calls process_event()."""
try:
for domain_event, tracking in subscription:
projection.process_event(domain_event, tracking)
except BaseException as e:
_runner = runner() # get reference from weakref
if _runner is not None:
_runner._thread_error = e # noqa: SLF001
else:
msg = "ProjectionRunner was deleted before error could be assigned:\n"
msg += format_exc()
warn(
msg,
RuntimeWarning,
stacklevel=2,
)
finally:
is_stopping.set()
[docs]
def run_forever(self, timeout: float | None = None) -> None:
"""Blocks until timeout, or until the runner is stopped or errors. Re-raises
any error otherwise exits normally
"""
if (
self._is_interrupted.wait(timeout=timeout)
and self._thread_error is not None
):
error = self._thread_error
self._thread_error = None
raise error from None
[docs]
def wait(self, notification_id: int | None, timeout: float = 1.0) -> None:
"""Blocks until timeout, or until the materialised view has recorded a tracking
object that is greater than or equal to the given notification ID.
"""
try:
self._tracking_recorder.wait(
application_name=self.app_name,
notification_id=notification_id,
timeout=timeout,
interrupt=self._is_interrupted,
)
except WaitInterruptedError as e:
if self._thread_error:
error = self._thread_error
self._thread_error = None
raise error from None
if self._has_called_stop:
return
raise e from None
[docs]
def __enter__(self) -> Self:
self._subscription.__enter__()
return self
[docs]
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Calls stop() and waits for the event-processing thread to exit."""
self.stop()
self._stop_thread.join()
self._subscription.__exit__(exc_type, exc_val, exc_tb)
self._processing_thread.join()
# TODO: Improve typing of application classes and type annotation for self.app
self.app.close() # pyright: ignore [reportAttributeAccessIssue]
if self._thread_error:
error = self._thread_error
self._thread_error = None
raise error
def __del__(self) -> None:
"""Calls stop()."""
with contextlib.suppress(AttributeError):
self.stop()
[docs]
class ProjectionRunner(
BaseProjectionRunner[TApplication], Generic[TApplication, TTrackingRecorder]
):
[docs]
def __init__(
self,
*,
application_class: type[TApplication],
projection_class: type[Projection[TTrackingRecorder]],
view_class: type[TTrackingRecorder],
env: EnvType | None = None,
):
"""Constructs application from given application class with given environment.
Also constructs a materialised view from given class using an infrastructure
factory constructed with an environment named after the projection. Also
constructs a projection with the constructed materialised view object.
Starts a subscription to application and, in a separate event-processing
thread, calls projection's process_event() method for each event and tracking
object pair received from the subscription.
"""
# Construct the materialised view using an infrastructure factory.
self.view = (
InfrastructureFactory[TTrackingRecorder]
.construct(env=self._construct_env(name=projection_class.name, env=env))
.tracking_recorder(view_class)
)
# Construct the projection using the materialised view.
self.projection = projection_class(view=self.view)
super().__init__(
projection=self.projection,
application_class=application_class,
tracking_recorder=self.view,
topics=self.projection.topics,
env=env,
)
[docs]
class EventSourcedProjectionRunner(
BaseProjectionRunner[TApplication], Generic[TApplication, TEventSourcedProjection]
):
[docs]
def __init__(
self,
*,
application_class: type[TApplication],
projection_class: type[TEventSourcedProjection],
env: EnvType | None = None,
):
self.projection: TEventSourcedProjection = projection_class(
env=self._construct_env(name=projection_class.name, env=env)
)
super().__init__(
projection=self.projection,
application_class=application_class,
tracking_recorder=self.projection.recorder,
topics=self.projection.topics,
env=env,
)
[docs]
def __enter__(self) -> Self:
cm = super().__enter__()
self.projection.__enter__()
return cm
[docs]
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.projection.__exit__(exc_type, exc_val, exc_tb)
return super().__exit__(exc_type, exc_val, exc_tb)