from __future__ import annotations
import contextlib
import os
import threading
import weakref
from abc import ABC, abstractmethod
from collections.abc import Iterator, Sequence
from threading import Event, Thread
from traceback import format_exc
from typing import TYPE_CHECKING, Any, ClassVar, Generic, TypeVar
from warnings import warn
from eventsourcing.application import Application, ProcessingEvent
from eventsourcing.dispatch import singledispatchmethod
from eventsourcing.domain import DomainEventProtocol
from eventsourcing.persistence import (
InfrastructureFactory,
IntegrityError,
ProcessRecorder,
Tracking,
TrackingRecorder,
TTrackingRecorder,
WaitInterruptedError,
)
from eventsourcing.utils import Environment, EnvType
if TYPE_CHECKING:
from types import TracebackType
from typing_extensions import Self
[docs]
class ApplicationSubscription(Iterator[tuple[DomainEventProtocol, Tracking]]):
"""An iterator that yields all domain events recorded in an application
sequence that have notification IDs greater than a given value. The iterator
will block when all recorded domain events have been yielded, and then
continue when new events are recorded. Domain events are returned along
with tracking objects that identify the position in the application sequence.
"""
[docs]
def __init__(
self,
app: Application,
gt: int | None = None,
topics: Sequence[str] = (),
):
"""
Starts a subscription to application's recorder.
"""
self.name = app.name
self.recorder = app.recorder
self.mapper = app.mapper
self.subscription = self.recorder.subscribe(gt=gt, topics=topics)
[docs]
def stop(self) -> None:
"""Stops the subscription to the application's recorder."""
self.subscription.stop()
[docs]
def __enter__(self) -> Self:
"""Calls __enter__ on the stored event subscription."""
self.subscription.__enter__()
return self
[docs]
def __exit__(self, *args: object, **kwargs: Any) -> None:
"""Calls __exit__ on the stored event subscription."""
self.subscription.__exit__(*args, **kwargs)
[docs]
def __iter__(self) -> Self:
return self
[docs]
def __next__(self) -> tuple[DomainEventProtocol, Tracking]:
"""Returns the next stored event from subscription to the application's
recorder. Constructs a tracking object that identifies the position of
the event in the application sequence. Constructs a domain event object
from the stored event object using the application's mapper. Returns a
tuple of the domain event object and the tracking object.
"""
notification = next(self.subscription)
tracking = Tracking(self.name, notification.id)
domain_event = self.mapper.to_domain_event(notification)
return domain_event, tracking
def __del__(self) -> None:
"""Stops the stored event subscription."""
with contextlib.suppress(AttributeError):
self.stop()
[docs]
class Projection(ABC, Generic[TTrackingRecorder]):
name: str = ""
"""
Name of projection, used to pick prefixed environment
variables and define database table names.
"""
topics: tuple[str, ...] = ()
"""
Event topics, used to filter events in database when subscribing to an application.
"""
def __init_subclass__(cls, **kwargs: Any) -> None:
if "name" not in cls.__dict__:
cls.name = cls.__name__
[docs]
def __init__(
self,
view: TTrackingRecorder,
):
"""Initialises the view property with the given view argument."""
self._view = view
@property
def view(self) -> TTrackingRecorder:
"""Materialised view of an event-sourced application."""
return self._view
[docs]
@singledispatchmethod
@abstractmethod
def process_event(
self, domain_event: DomainEventProtocol, tracking: Tracking
) -> None:
"""Process a domain event and track it."""
[docs]
class EventSourcedProjection(Application, ABC):
"""Extends the :py:class:`~eventsourcing.application.Application` class
by using a process recorder as its application recorder, and by
processing domain events through its :py:func:`policy` method.
"""
topics: ClassVar[Sequence[str]] = ()
[docs]
def __init__(self, env: EnvType | None = None) -> None:
super().__init__(env)
self.recorder: ProcessRecorder
self.processing_lock = threading.Lock()
[docs]
def construct_recorder(self) -> ProcessRecorder:
"""Constructs and returns a :class:`~eventsourcing.persistence.ProcessRecorder`
for the application to use as its application recorder.
"""
return self.factory.process_recorder()
[docs]
def process_event(
self, domain_event: DomainEventProtocol, tracking: Tracking
) -> None:
"""Calls :func:`~eventsourcing.system.Follower.policy` method with
the given :class:`~eventsourcing.domain.AggregateEvent` and a
new :class:`~eventsourcing.application.ProcessingEvent` created from
the given :class:`~eventsourcing.persistence.Tracking` object.
The policy will collect any new aggregate events on the process
event object.
After the policy method returns, the process event object will
then be recorded by calling
:func:`~eventsourcing.application.Application.record`, which
will return new notifications.
After calling
:func:`~eventsourcing.application.Application.take_snapshots`,
the new notifications are passed to the
:func:`~eventsourcing.application.Application.notify` method.
"""
processing_event = ProcessingEvent(tracking=tracking)
self.policy(domain_event, processing_event)
try:
recordings = self._record(processing_event)
except IntegrityError:
if self.recorder.has_tracking_id(
tracking.application_name,
tracking.notification_id,
):
pass
else:
raise
else:
self._take_snapshots(processing_event)
self.notify(processing_event.events)
self._notify(recordings)
[docs]
@singledispatchmethod
def policy(
self,
domain_event: DomainEventProtocol,
processing_event: ProcessingEvent,
) -> None:
"""Abstract domain event processing policy method. Must be
implemented by event processing applications. When
processing the given domain event, event processing
applications must use the :func:`~ProcessingEvent.collect_events`
method of the given :py:class:`~ProcessingEvent` object (not
the application's :func:`~eventsourcing.application.Application.save`
method) so that the new domain events will be recorded atomically
and uniquely with tracking information about the position of the processed
event in its application sequence.
"""
TApplication = TypeVar("TApplication", bound=Application)
TEventSourcedProjection = TypeVar(
"TEventSourcedProjection", bound=EventSourcedProjection
)
[docs]
class BaseProjectionRunner(Generic[TApplication]):
[docs]
def __init__(
self,
*,
projection: EventSourcedProjection | Projection[Any],
application_class: type[TApplication],
tracking_recorder: TrackingRecorder,
topics: Sequence[str],
env: EnvType | None = None,
) -> None:
self._projection = projection
self._is_interrupted = Event()
self._has_called_stop = False
# Construct the application.
self.app: TApplication = application_class(env)
self._tracking_recorder = tracking_recorder
# Subscribe to the application.
self._subscription = ApplicationSubscription(
app=self.app,
gt=self._tracking_recorder.max_tracking_id(self.app.name),
topics=topics,
)
# Start a thread to stop the subscription when the runner is interrupted.
self._thread_error: BaseException | None = None
self._stop_thread = Thread(
target=self._stop_subscription_when_stopping,
kwargs={
"subscription": self._subscription,
"is_stopping": self._is_interrupted,
},
)
self._stop_thread.start()
# Start a thread to iterate over the subscription.
self._processing_thread = Thread(
target=self._process_events_loop,
kwargs={
"subscription": self._subscription,
"projection": self._projection,
"is_stopping": self._is_interrupted,
"runner": weakref.ref(self),
},
)
self._processing_thread.start()
@property
def is_interrupted(self) -> Event:
return self._is_interrupted
@staticmethod
def _construct_env(name: str, env: EnvType | None = None) -> Environment:
"""Constructs environment from which projection will be configured."""
_env: dict[str, str] = {}
_env.update(os.environ)
if env is not None:
_env.update(env)
return Environment(name, _env)
[docs]
def stop(self) -> None:
"""Sets the "interrupted" event."""
self._has_called_stop = True
self._is_interrupted.set()
@staticmethod
def _stop_subscription_when_stopping(
subscription: ApplicationSubscription,
is_stopping: Event,
) -> None:
"""Stops the application subscription, which
will stop the event-processing thread.
"""
try:
is_stopping.wait()
finally:
is_stopping.set()
subscription.stop()
@staticmethod
def _process_events_loop(
subscription: ApplicationSubscription,
projection: EventSourcedProjection | Projection[Any],
is_stopping: Event,
runner: weakref.ReferenceType[ProjectionRunner[Application, TrackingRecorder]],
) -> None:
"""Iterates over the subscription and calls process_event()."""
try:
with subscription:
for domain_event, tracking in subscription:
projection.process_event(domain_event, tracking)
except BaseException as e:
_runner = runner() # get reference from weakref
if _runner is not None:
_runner._thread_error = e # noqa: SLF001
else:
msg = "ProjectionRunner was deleted before error could be assigned:\n"
msg += format_exc()
warn(
msg,
RuntimeWarning,
stacklevel=2,
)
finally:
is_stopping.set()
[docs]
def run_forever(self, timeout: float | None = None) -> None:
"""Blocks until timeout, or until the runner is stopped or errors. Re-raises
any error otherwise exits normally
"""
if (
self._is_interrupted.wait(timeout=timeout)
and self._thread_error is not None
):
error = self._thread_error
self._thread_error = None
raise error
[docs]
def wait(self, notification_id: int | None, timeout: float = 1.0) -> None:
"""Blocks until timeout, or until the materialised view has recorded a tracking
object that is greater than or equal to the given notification ID.
"""
try:
self._tracking_recorder.wait(
application_name=self.app.name,
notification_id=notification_id,
timeout=timeout,
interrupt=self._is_interrupted,
)
except WaitInterruptedError:
if self._thread_error:
error = self._thread_error
self._thread_error = None
raise error from None
if self._has_called_stop:
return
raise
[docs]
def __enter__(self) -> Self:
return self
[docs]
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Calls stop() and waits for the event-processing thread to exit."""
self.stop()
self._stop_thread.join()
self._processing_thread.join()
if self._thread_error:
error = self._thread_error
self._thread_error = None
raise error
def __del__(self) -> None:
"""Calls stop()."""
with contextlib.suppress(AttributeError):
self.stop()
[docs]
class ProjectionRunner(
BaseProjectionRunner[TApplication], Generic[TApplication, TTrackingRecorder]
):
[docs]
def __init__(
self,
*,
application_class: type[TApplication],
projection_class: type[Projection[TTrackingRecorder]],
view_class: type[TTrackingRecorder],
env: EnvType | None = None,
):
"""Constructs application from given application class with given environment.
Also constructs a materialised view from given class using an infrastructure
factory constructed with an environment named after the projection. Also
constructs a projection with the constructed materialised view object.
Starts a subscription to application and, in a separate event-processing
thread, calls projection's process_event() method for each event and tracking
object pair received from the subscription.
"""
# Construct the materialised view using an infrastructure factory.
self.view = (
InfrastructureFactory[TTrackingRecorder]
.construct(env=self._construct_env(name=projection_class.name, env=env))
.tracking_recorder(view_class)
)
# Construct the projection using the materialised view.
self.projection = projection_class(view=self.view)
super().__init__(
projection=self.projection,
application_class=application_class,
tracking_recorder=self.view,
topics=self.projection.topics,
env=env,
)
[docs]
class EventSourcedProjectionRunner(
BaseProjectionRunner[TApplication], Generic[TApplication, TEventSourcedProjection]
):
[docs]
def __init__(
self,
*,
application_class: type[TApplication],
projection_class: type[TEventSourcedProjection],
env: EnvType | None = None,
):
self.projection: TEventSourcedProjection = projection_class(
env=self._construct_env(name=projection_class.name, env=env)
)
super().__init__(
projection=self.projection,
application_class=application_class,
tracking_recorder=self.projection.recorder,
topics=self.projection.topics,
env=env,
)