from __future__ import annotations
import types
import typing
from abc import ABC, ABCMeta, abstractmethod
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, ClassVar, Generic, ParamSpec, Self
from uuid import uuid4
from typing_extensions import TypeVar
from eventsourcing.domain import (
AbstractDecision,
CallableType,
CommandMethodDecorator,
ProgrammingError,
all_func_decorators,
decorated_func_callers,
filter_kwargs_for_method_params,
)
from eventsourcing.utils import construct_topic
if TYPE_CHECKING:
from collections.abc import Callable, Sequence
[docs]
class Decision(AbstractDecision, ABC):
metadata: dict[str, str]
[docs]
@abstractmethod
def as_dict(self) -> dict[str, Any]:
pass # pragma: no cover
[docs]
def mutate(self, obj: TPerspective | None) -> TPerspective | None:
assert obj is not None
# Identify the function that was decorated.
try:
decorated_func = decorated_funcs[(type(obj), type(self))]
except KeyError:
pass
else:
# Select event attributes mentioned in function signature.
self_dict = self.as_dict()
kwargs = filter_kwargs_for_method_params(self_dict, decorated_func)
# Call the original method with event attribute values.
decorated_method = decorated_func.__get__(obj, type(obj))
try:
decorated_method(**kwargs)
except TypeError as e: # pragma: no cover
# TODO: Write a test that does this...
msg = (
f"Failed to apply {type(self).__qualname__} to "
f"{type(obj).__qualname__} with kwargs {kwargs}: {e}"
)
raise TypeError(msg) from e
self.apply(obj)
return obj
[docs]
def apply(self, obj: Any) -> None:
pass
TDecision = TypeVar("TDecision", bound=Decision, default=Decision)
"""
A type variable representing any subclass of :class:`Decision`.
"""
[docs]
@dataclass
class Tagged(Generic[TDecision]):
tags: list[str]
decision: TDecision
uuid: str = field(default_factory=lambda: str(uuid4()))
T = TypeVar("T")
P = ParamSpec("P")
[docs]
class Perspective(ABC, Generic[TDecision], metaclass=MetaPerspective):
last_known_position: int | None
new_decisions: list[Tagged[TDecision]]
def __new__(cls, *_: Any, **__: Any) -> Self:
self = super().__new__(cls)
self.last_known_position = None
self.new_decisions = []
return self
[docs]
@abstractmethod
def consistency_boundary(self) -> Selector | Sequence[Selector]:
raise NotImplementedError # pragma: no cover
[docs]
def trigger_event(
self,
decision_cls: Callable[P, TDecision],
tags: Sequence[str] = (),
*args: P.args,
**kwargs: P.kwargs,
) -> None:
"""
Constructs new tagged decision and appends to list of uncommitted events.
"""
tagged = Tagged[TDecision](
tags=list(tags),
decision=decision_cls(*args, **kwargs),
)
tagged.decision.mutate(self)
self.new_decisions.append(tagged)
[docs]
def collect_events(self) -> Sequence[Tagged[Any]]:
"""
Drains list of triggered events.
"""
collected, self.new_decisions = self.new_decisions, []
return collected
TPerspective = TypeVar("TPerspective", bound=Perspective[Any])
decorated_funcs: dict[tuple[MetaPerspective, type[Decision]], CallableType] = {}
TID = TypeVar("TID", bound=str, default=str)
[docs]
class EnduringObject(
Perspective[TDecision], Generic[TDecision, TID], metaclass=MetaEnduringObject
):
id: TID
@classmethod
def _create(cls: type[Self], **kwargs: Any) -> Self:
obj = cls.__new__(cls, **kwargs)
# TODO: Maybe find a better way to do this, but it seems we need
# to set the `id` attribute for the call the `trigger_event()`?
obj.id = next(iter(kwargs.values())) # assume ID is first arg
# Calling __init__ should trigger an event that
# calls the original decorated __init__ method.
obj.__init__(**kwargs) # type: ignore[misc]
return obj
[docs]
def consistency_boundary(self) -> list[Selector]:
return [Selector(tags=[self.id])]
[docs]
def trigger_event(
self,
decision_cls: Callable[P, TDecision],
tags: Sequence[str] = (),
*args: P.args,
**kwargs: P.kwargs,
) -> None:
tags = [self.id, *tags]
super().trigger_event(decision_cls, tags, *args, **kwargs)
[docs]
class Group(Perspective[TDecision]):
_enduring_objects: list[EnduringObject]
classes: ClassVar[Sequence[type[EnduringObject[Any, Any]]]]
def __init_subclass__(cls) -> None:
super().__init_subclass__()
# 1. Get all the type hints from the __init__ method
hints = typing.get_type_hints(cls.__init__)
extracted_classes: list[type[EnduringObject[Any]]] = []
for param_name, hint in hints.items():
# Ignore the return type and 'self' (if it happens to be annotated)
if param_name in ("return", "self"):
continue
# 2. Extract arguments from the Union
# (e.g., Student | None becomes (Student, NoneType))
args = typing.get_args(hint)
if args:
# Filter out NoneType to just get the actual class
extracted_classes.extend(
arg for arg in args if arg is not types.NoneType
)
else:
# If it wasn't a Union/Optional, just append the hint directly
extracted_classes.append(hint)
assert all(issubclass(cls, EnduringObject) for cls in extracted_classes)
cls.classes = extracted_classes
def __new__(cls, *args: Any, **kwargs: Any) -> Self:
self = super().__new__(cls, *args, **kwargs)
self._enduring_objects = [a for a in args if isinstance(a, EnduringObject)]
return self
[docs]
def consistency_boundary(self) -> list[Selector]:
return [
Selector(tags=cb.tags)
for cbs in [o.consistency_boundary() for o in self._enduring_objects]
for cb in cbs
]
[docs]
def trigger_event(
self,
decision_cls: Callable[P, TDecision],
tags: Sequence[str] = (),
*args: P.args,
**kwargs: P.kwargs,
) -> None:
objs = self._enduring_objects
tags = [o.id for o in objs] + list(tags)
tagged = Tagged[TDecision](
tags=tags,
decision=decision_cls(*args, **kwargs),
)
for o in objs:
tagged.decision.mutate(o)
self.new_decisions.append(tagged)
[docs]
@dataclass
class Selector:
types: Sequence[type[Decision]] = ()
tags: Sequence[str] = ()
[docs]
class Slice(Perspective[TDecision], metaclass=MetaSlice):
[docs]
def execute(self) -> None:
pass
TSlice = TypeVar("TSlice", bound=Slice[Any])
TGroup = TypeVar("TGroup", bound=Group[Any])