Source code for eventsourcing.domain.model.decorators

import random
from functools import singledispatch, wraps
from inspect import isfunction
from time import sleep
from typing import Callable, Dict, Optional, Sequence, Type, Union, no_type_check

from eventsourcing.domain.model.events import DomainEvent, subscribe
from eventsourcing.exceptions import ProgrammingError


[docs]def subscribe_to(*args: Union[Callable, Type[DomainEvent]]) -> Callable: """ Decorator for making a custom event handler function subscribe to a certain class of event. The decorated function will be called once for each matching event that is published, and will be given one argument, the event, when it is called. If events are published in lists, for example the AggregateRoot publishes a list of pending events when its __save__() method is called, then the decorated function will be called once for each event that is an instance of the given event_class. Please note, this decorator isn't suitable for use with object class methods. The decorator receives in Python 3 an unbound function, and defines a handler which it subscribes that calls the decorated function for each matching event. However the method isn't called on the object, so the object instance is never available in the decorator, so the decorator can't call a normal object method because it doesn't have a value for 'self'. :param event_class: type used to match published events, an event matches if it is an instance of this type. The following example shows a custom handler that reacts to Todo.Created event and saves a projection of a Todo model object. .. code:: @subscribe_to(Todo.Created) def new_todo_projection(event): todo = TodoProjection(id=event.originator_id, title=event.title) todo.save() """ @no_type_check def wrap(func): def handler(event): if isinstance(event, (list, tuple)): # Call handler once for each event. for e in event: handler(e) elif not args or isinstance(event, args): # Call handler if there are no classes or have an instance. func(event) subscribe(handler=handler, predicate=lambda _: True) return func if len(args) == 1 and isfunction(args[0]): func = args[0] args = () return wrap(func) else: return wrap
[docs]def mutator(arg: Optional[Callable] = None) -> Callable: """Structures mutator functions by allowing handlers to be registered for different types of event. When the decorated function is called with an initial value and an event, it will call the handler that has been registered for that type of event. It works like singledispatch, which it uses. The difference is that when the decorated function is called, this decorator dispatches according to the type of last call arg, which fits better with reduce(). The builtin Python function reduce() is used by the library to replay a sequence of events against an initial state. If a mutator function is given to reduce(), along with a list of events and an initializer, reduce() will call the mutator function once for each event in the list, but the initializer will be the first value, and the event will be the last argument, and we want to dispatch according to the type of the event. It happens that singledispatch is coded to switch on the type of the first argument, which makes it unsuitable for structuring a mutator function without the modifications introduced here. The other aspect introduced by this decorator function is the option to set the type of the handled entity in the decorator. When an entity is replayed from scratch, in other words when all its events are replayed, the initial state is None. The handler which handles the first event in the sequence will probably construct an object instance. It is possible to write the type into the handler, but that makes the entity more difficult to subclass because you will also need to write a handler for it. If the decorator is invoked with the type, when the initial value passed as a call arg to the mutator function is None, the handler will instead receive the type of the entity, which it can use to construct the entity object. .. code:: class Entity(object): class Created(object): pass @mutator(Entity) def mutate(initial, event): raise NotImplementedError(type(event)) @mutate.register(Entity.Created) def _(initial, event): return initial(**event.__dict__) entity = mutate(None, Entity.Created()) """ domain_class = None @no_type_check def _mutator(func): wrapped = singledispatch(func) @wraps(wrapped) def wrapper(initial, event): initial = initial or domain_class return wrapped.dispatch(type(event))(initial, event) wrapper.register = wrapped.register return wrapper if not isfunction(arg): # Decorator used with an entity class. domain_class = arg return _mutator else: # Decorator invoked as method or function decorator. return _mutator(arg)
[docs]def attribute(getter: Callable) -> property: """ When used as a method decorator, returns a property object with the method as the getter and a setter defined to call instance method __change_attribute__(), which publishes an AttributeChanged event. """ if isfunction(getter): @no_type_check def setter(self, value): name = "_" + getter.__name__ self.__change_attribute__(name=name, value=value) @no_type_check def new_getter(self): name = "_" + getter.__name__ return getattr(self, name, None) return property(fget=new_getter, fset=setter, doc=getter.__doc__) else: raise ProgrammingError("Expected a function, got: {}".format(repr(getter)))
[docs]def retry( exc: Union[Type[Exception], Sequence[Type[Exception]]] = Exception, max_attempts: int = 1, wait: float = 0, stall: float = 0, verbose: bool = False, ) -> Callable: """ Retry decorator. :param exc: List of exceptions that will cause the call to be retried if raised. :param max_attempts: Maximum number of attempts to try. :param wait: Amount of time to wait before retrying after an exception. :param stall: Amount of time to wait before the first attempt. :param verbose: If True, prints a message to STDOUT when retries occur. :return: Returns the value returned by decorated function. """ @no_type_check def _retry(func): @wraps(func) def wrapper(*args, **kwargs): if stall: sleep(stall) attempts = 0 while True: try: if verbose and attempts > 0: print("Retrying ", func, args, kwargs) return func(*args, **kwargs) except exc as e: if verbose: print("Error trying ", func, args, kwargs, e) attempts += 1 if max_attempts is None or attempts < max_attempts: sleep(wait * (1 + 0.1 * (random.random() - 0.5))) if verbose: print("Retrying {}".format(func)) else: # Max retries exceeded. raise e return wrapper # If using decorator in bare form, the decorated # function is the first arg, so check 'exc'. if isfunction(exc): # Remember the given function. _func = exc # Set 'exc' to a sensible exception class for _retry(). exc = Exception # Wrap and return. return _retry(func=_func) else: # Check decorator args, and return _retry, # to be called with the decorated function. if isinstance(exc, (list, tuple)): for _exc in exc: if not (isinstance(_exc, type) and issubclass(_exc, Exception)): raise TypeError("not an exception class: {}".format(_exc)) else: if not (isinstance(exc, type) and issubclass(exc, Exception)): raise TypeError("not an exception class: {}".format(exc)) if not isinstance(max_attempts, int): raise TypeError("'max_attempts' must be an int: {}".format(max_attempts)) if not isinstance(wait, (float, int)): raise TypeError("'wait' must be a float: {}".format(max_attempts)) return _retry
[docs]def subclassevents(cls: type) -> type: """ Decorator that avoids "boilerplate" subclassing of domain events. For example, with this decorator you can do this: .. code:: @subclassevents class Example(AggregateRoot): class SomethingHappened(DomainEvent): pass rather than this: .. code:: class Example(AggregateRoot): class Event(AggregateRoot.Event): pass class Created(Event, AggregateRoot.Created): pass class Discarded(Event, AggregateRoot.Discarded): pass class AttributeChanged(Event, AggregateRoot.AttributeChanged): pass class SomethingHappened(Event): pass You can apply this to a tree of domain event classes by defining the base class with attribute 'subclassevents = True'. """ bases_event_attrs = [] super_event_class_names = set() for base_cls in cls.__bases__: base_event_attrs: Dict[str, Type[DomainEvent]] = {} bases_event_attrs.append(base_event_attrs) for base_attr_name in dir(base_cls): base_attr = getattr(base_cls, base_attr_name) if isinstance(base_attr, type) and issubclass(base_attr, DomainEvent): base_event_attrs[base_attr_name] = base_attr if base_attr_name != "Event": super_event_class_names.add(base_attr_name) # Define base Event subclass including super Event classes. if "Event" in cls.__dict__: event_event_subclass = cls.__dict__["Event"] else: base_event_classes = [] for base_event_attrs in bases_event_attrs: try: event_cls = base_event_attrs["Event"] except KeyError: pass else: base_event_classes.append(event_cls) event_event_subclass = type( "Event", tuple(base_event_classes or [DomainEvent]), {"__qualname__": cls.__name__ + ".Event"}, ) event_event_subclass.__module__ = cls.__module__ setattr(cls, "Event", event_event_subclass) # Define subclasses for super event classes, including Event subclass as base. for super_event_class_name in super_event_class_names: base_event_classes = [event_event_subclass] if super_event_class_name in cls.__dict__: continue for base_event_attrs in bases_event_attrs: try: event_cls = base_event_attrs[super_event_class_name] except KeyError: pass else: base_event_classes.append(event_cls) event_subclass = type( super_event_class_name, tuple(base_event_classes), {"__qualname__": cls.__name__ + "." + super_event_class_name}, ) event_subclass.__module__ = cls.__module__ setattr(cls, super_event_class_name, event_subclass) # Redefine event classes in cls.__dict__ that are not subclasses of Event. for cls_attr_name in cls.__dict__.keys(): cls_attr = getattr(cls, cls_attr_name) if isinstance(cls_attr, type) and issubclass(cls_attr, DomainEvent): if not issubclass(cls_attr, event_event_subclass): try: event_subclass = type( cls_attr_name, (cls_attr, event_event_subclass), { "__qualname__": cls_attr.__qualname__, "__module__": cls_attr.__module__, "__doc__": cls_attr.__doc__, }, ) except TypeError: raise Exception(cls_attr_name, cls_attr, event_event_subclass) setattr(cls, cls_attr_name, event_subclass) return cls