Source code for mycelyso.pilyso.application.application

# -*- coding: utf-8 -*-
"""
The application submodule contains the base class for applications, as well auxiliary functions.
"""

import argparse
import logging
import multiprocessing
# noinspection PyUnresolvedReferences
import sys
import numpy as np

from tunable import TunableManager
from ..misc.hacks.maintenance_interrupt import install_maintenance_interrupt
from ..misc.hacks.recursionlimit_raise import *
from ..misc.hacks.multiprocessing_patch import *

from collections import namedtuple

from ..imagestack.imagestack import ImageStack, Dimensions
# noinspection PyUnresolvedReferences
from ..imagestack.readers import *
from ..pipeline import PipelineExecutor


[docs]def parse_range(s, maximum=0): """ Parses a range string. :param s: :param maximum: :return: """ maximum -= 1 splits = s.replace(' ', '').replace(';', ',').split(',') ranges = [] remove = [] not_values = False for frag in splits: if frag[0] == '~': not_values = not not_values frag = frag[1:] if '-' in frag: f, t = frag.split('-') interval = 1 if '%' in t: t, _interval = t.split('%') interval = int(_interval) if t == '': t = maximum f, t = int(f), int(t) t = min(t, maximum) parsed_fragment = range(f, t + 1, interval) else: parsed_fragment = [int(frag)] if not_values: remove += parsed_fragment else: ranges += parsed_fragment return list(sorted(set(ranges) - set(remove)))
[docs]def prettify_numpy_array(arr, space_or_prefix): """ Returns a properly indented string representation of a numpy array. :param arr: :param space_or_prefix: :return: """ six_spaces = ' ' * 6 prepared = repr(np.array(arr)).replace(')', '').replace('array(', six_spaces) if isinstance(space_or_prefix, int): return prepared.replace(six_spaces, ' ' * space_or_prefix) else: return space_or_prefix + prepared.replace(six_spaces, ' ' * len(space_or_prefix)).lstrip()
[docs]class AppInterface(object): """ Interface to be implemented by Apps utilizing pilyso's App infrastructure. """
[docs] def options(self): return {}
[docs] def arguments(self, argparser): return
[docs] def setup(self, pipeline_executor): return
Meta = namedtuple('Meta', ['pos', 't'])
[docs]class App(AppInterface): """ Base class implementing most of the App "in the back" works. """ @staticmethod def _internal_option_defaults(): return { 'name': "processor", 'description': "processor", 'banner': "", 'pipeline': None } _internal_options = None @property def internal_options(self): if self._internal_options is None: self._internal_options = self._internal_option_defaults() self._internal_options.update(self.options()) return self._internal_options _log = None @property def log(self): if self._log is None: self._log = logging.getLogger(self.internal_options['name']) return self._log def _create_argparser(self): argparser = argparse.ArgumentParser(description=self.internal_options['description']) def _error(message=''): argparser.print_help() self.log.error("command line argument error: %s", message) sys.exit(1) argparser.error = _error return argparser @staticmethod def _arguments(argparser): argparser.add_argument('input', metavar='input', type=str, help="input file") argparser.add_argument('-m', '--module', dest='modules', type=str, default=None, action='append') argparser.add_argument('-n', '--processes', dest='processes', default=-1, type=int) argparser.add_argument('--prompt', '--prompt', dest='wait_on_start', default=False, action='store_true') argparser.add_argument('-tp', '--timepoints', dest='timepoints', default='0-', type=str) argparser.add_argument('-mp', '--positions', dest='positions', default='0-', type=str) TunableManager.register_argparser(argparser)
[docs] def handle_args(self): pass
args = None
[docs] def main(self): """ Main entry point for App apps. :return: """ logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(name)s %(levelname)s %(message)s") install_maintenance_interrupt() argparser = self._create_argparser() self._arguments(argparser) self.arguments(argparser) self.log.info(self.internal_options['banner']) self.args = argparser.parse_args() self.handle_args() if self.args.wait_on_start: _ = input("Press enter to continue.") self.log.info("Started %s.", self.internal_options['name']) ims = ImageStack(self.args.input).view(Dimensions.PositionXY, Dimensions.Time) # self._setup_modules() self.positions = parse_range(self.args.positions, maximum=ims.size[Dimensions.PositionXY]) self.timepoints = parse_range(self.args.timepoints, maximum=ims.size[Dimensions.Time]) self.log.info("Tunable Hash: %s" % (TunableManager.get_hash())) calibration = ims.meta[0, 0].calibration self.log.info( "Beginning Processing:\n%s\n%s\n%s", prettify_numpy_array(self.positions, "Positions : "), prettify_numpy_array(self.timepoints, "Timepoints : "), "Calibration: %.3f µm·pixel⁻¹" % (calibration,) ) if self.args.processes < 0: self.args.processes = multiprocessing.cpu_count() elif self.args.processes == 0: self.args.processes = False self.pe = PipelineExecutor() self.pe.multiprocessing = self.args.processes self.pe.set_workload(Meta, Meta(pos=self.positions, t=self.timepoints), Meta(t=1, pos=2)) self.setup(self.pe) self.run() self.pe.close() self.log.info("Finished %s.", self.internal_options['name'])
[docs] def run(self): self.pe.run()
# default implementation
[docs] def setup(self, pipeline_executor): pipeline_executor.initialize(self.internal_options['pipeline'], self.args)