Source code for mycelyso.pilyso.pipeline.pipeline


# encoding: utf-8

from collections import OrderedDict
from inspect import isclass
import inspect

if getattr(inspect, 'signature'):
    def get_argnames_and_defaults(call):
        sig = inspect.signature(call)
        args = [para for para in sig.parameters]
        # noinspection PyProtectedMember
        defaults = [para.default for para in sig.parameters.values() if para.default is not inspect._empty]
        return args, defaults

else:
[docs] def get_argnames_and_defaults(call): # noinspection PyDeprecation argspec = inspect.getargspec(call) args = list(argspec.args) defaults = argspec.defaults if list(argspec.defaults) else [] return args, defaults
[docs]class NeatDict(dict): def __getattr__(self, item): return self.get(item) def __setattr__(self, key, value): self[key] = value def __delattr__(self, item): del self[item]
[docs]class Pipeline(object): def __init__(self): self.steps = [] def __lt__(self, other): return self.__le__(other) def __le__(self, other): return self.call(other) def __or__(self, other): self.add(other) return self def __ior__(self, other): self.add(other) return self
[docs] def add(self, what): self.steps.append(what)
[docs] def call(self, parameter): return PipelineInvocation(self, parameter)
[docs]class PipelineInvocation(object): def __init__(self, pipeline, parameter): self.pipeline = pipeline self.parameter = parameter
[docs]class PipelineEnvironment(object): debug = False KEY_COLLECTED = 'collected' KEY_RESULT = 'result' def __init__(self, **kwargs): self.external_di = {'pipeline_environment': self} self.external_di.update(kwargs)
[docs] @staticmethod def new_pipeline(): return Pipeline()
[docs] def run(self, pipeline_invocation, **kwargs): if type(pipeline_invocation) == Pipeline: pipeline_invocation = pipeline_invocation < kwargs result = pipeline_invocation.parameter for raw_step in pipeline_invocation.pipeline.steps: step = self.wrap(raw_step) result = step(result) return result
[docs] def debug_message(self, *args, **kwargs): if self.debug: print(args, kwargs) else: pass
[docs] def wrap_result(self, result): result = NeatDict(result) result[self.KEY_RESULT] = True if self.KEY_COLLECTED in result: wrapped = OrderedDict() for k, v in result[self.KEY_COLLECTED].items(): if v is None: # wrapped[k] = None pass else: wrapped[k] = NeatDict(v) result[self.KEY_COLLECTED] = wrapped return result
[docs] def unwrap_result(self, result): if self.KEY_COLLECTED in result and (result[self.KEY_COLLECTED] is not None): unwrapped = OrderedDict() for k, v in result[self.KEY_COLLECTED].items(): unwrapped[k] = dict(v) result[self.KEY_COLLECTED] = unwrapped result = dict(result) if self.KEY_RESULT in result: del result[self.KEY_RESULT] return result
[docs] def prepare_call(self, call, di=None, result=None): if di is None: di = {} if result is None: result = {} args, defaults = get_argnames_and_defaults(call) if args[0] == 'self': args = args[1:] non_default_parameters = len(args) - len(defaults) def _wrapped(): from_di = set() parameters = [] for n, arg in enumerate(args): if arg == self.KEY_RESULT: parameters.append(result) elif arg in result: parameters.append(result[arg]) else: if n >= non_default_parameters: parameters.append(defaults[n - non_default_parameters]) else: if arg in di: from_di.add(arg) parameters.append(di[arg]) else: # problem: pipeline step asks for a parameter we do not have raise ValueError('[At %s]: Argument %r not in %r' % (repr(call), arg, result,)) _call_return = call(*parameters) return args, _call_return return _wrapped
[docs] def reassemble_result(self, result, args, _call_return): if type(_call_return) == dict and self.KEY_RESULT in _call_return: # if we get a dict back, we merge it with the ongoing result object result.update(_call_return) elif type(_call_return) == NeatDict: # if we get a neatdict back, we assume its the proper result object # and the pipeline step knew what it did ... # we continue with it as-is result = _call_return else: if type(_call_return) != tuple: _call_return = (_call_return,) for n, item in enumerate(reversed(_call_return)): k = args[-(n+1)] if k == self.KEY_RESULT: if type(item) == dict or type(item) == NeatDict: result.update(item) else: # do nothing if the parameter came from di if k not in self.external_di: result[k] = item return result
[docs] def wrap(self, what): name = ("class_" if isclass(what) else "function_") +\ ('__LAMBDA__' if getattr(what, '__name__', False) == '<lambda>' else getattr(what, '__name__', repr(what))) if isclass(what): # we create an instance without calling the constructor, so we can setup the environment first instance = what.__new__(what) # now we call the constructor, and it has everything neatly set up already! init_call = self.prepare_call(instance.__init__, di=self.external_di) init_call() call = instance else: call = what def _wrapped(result): self.debug_message("Entering %s", name) result = self.wrap_result(result) _pre_wrapped_call = self.prepare_call(call, self.external_di, result) args, _call_return = _pre_wrapped_call() result = self.reassemble_result(result, args, _call_return) result = self.unwrap_result(result) self.debug_message("Leaving %s", name) return result _wrapped.__name__ = name _wrapped.__qualname__ = _wrapped.__name__ return _wrapped
[docs]class PipelineExecutionContext(object):
[docs] def add_stage(self, step, pipeline=None): if pipeline is None: pipeline = Pipeline() self.steps[step] = pipeline return pipeline
[docs] def get_step_keys(self): return list(self.steps.keys())
[docs] def dispatch(self, step, **kwargs): return self.pipeline_environment.run(self.steps[step], **kwargs)
def __new__(cls, *args, **kwargs): instance = super(PipelineExecutionContext, cls).__new__(cls) instance.steps = {} instance.pipeline_environment = PipelineEnvironment() return instance