Source code for eventsourcing.application.pipeline

from abc import ABCMeta


[docs]class PipelineExpression(object): def __init__(self, left, right): self.left = left self.right = right def __or__(self, other): return PipelineExpression(self, other) def __iter__(self): for term in (self.left, self.right): if isinstance(term, PipelineExpression): for item in term: yield item else: assert issubclass(term, Pipeable), term yield term
[docs]class PipeableMetaclass(ABCMeta): def __or__(self, other): return PipelineExpression(self, other)
[docs]class Pipeable(metaclass=PipeableMetaclass): pass