Source code for mycelyso.highlevel.steps

# -*- coding: utf-8 -*-
"""
The steps module contains most of the individual, albeit mycelyso-specific processing steps.
"""


import warnings
from itertools import product, chain

import numpy as np
from scipy.stats import linregress

from ..tunables import CleanUpGaussianSigma, CleanUpGaussianThreshold, CleanUpHoleFillSize, \
    RemoveSmallStructuresSize, BorderArtifactRemovalBorderSize, TrackingMaximumRelativeShrinkage, \
    TrackingMinimumTipElongationRate, TrackingMaximumTipElongationRate, TrackingMaximumCoverage, \
    TrackingMinimumTrackedPointCount, TrackingMinimalMaximumLength, TrackingMinimalGrownLength, \
    ThresholdingTechnique, ThresholdingParameters

from skimage.morphology import remove_small_holes, remove_small_objects, skeletonize as sk_skeletonize
from skimage.measure import label, regionprops

from scipy import ndimage as ndi

import networkx as nx

from ..misc.graphml import to_graphml_string
from ..misc.util import pairwise
from ..misc.regression import prepare_optimized_regression
from ..pilyso.application import Collected, Meta, Skip
from ..pilyso.pipeline.pipeline import get_argnames_and_defaults

from ..processing import binarization as binarization_module

from .pixelframe import PixelFrame
from .nodeframe import NodeFrame

try:
    import matplotlib.pyplot as pyplot
except ImportError:
    pyplot = None


[docs]def qimshow(image, cmap='gray'): """ Debug function, quickly shows the passed image via matplotlibs imshow-facilities. :param image: :param cmap: :return: """ if not pyplot: raise RuntimeError('matplotlib not installed.') fig = pyplot.figure() ax = fig.add_subplot(111) range_ = ax.imshow(image, cmap=cmap, interpolation='none') def _format_coords(x, y): try: y, x = int(y + 0.5), int(x + 0.5) if y < 0 or x < 0: raise IndexError value = image[y, x] except IndexError: value = float('nan') return 'x=%d y=%d value=%1.4f' % (x, y, value,) ax.format_coord = _format_coords pyplot.colorbar(range_) pyplot.show()
# noinspection PyUnusedLocal
[docs]def set_empty_crops(image, crop_t=None, crop_b=None, crop_l=None, crop_r=None): """ Defines crop parameters based upon image size, effectively not cropping at all. :param image: :param crop_t: :param crop_b: :param crop_l: :param crop_r: :return: """ return 0, image.shape[0], 0, image.shape[1]
[docs]def skip_if_image_is_below_size(min_height=4, min_width=4): """ Raises a Skip exception if the image size falls below the set image size. :param min_height: :param min_width: :return: >>> skip_if_image_is_below_size(32, 32)(np.zeros((16,16)), Meta(0, 0)) Traceback (most recent call last): ... mycelyso.pilyso.pipeline.executor.Skip: Meta(pos=0, t=<class 'mycelyso.pilyso.pipeline.executor.Collected'>) """ def _inner(image, meta): if image.shape[0] < min_height or image.shape[1] < min_width: # noinspection PyCompatibility raise Skip(Meta(pos=meta.pos, t=Collected)) from None return image, meta return _inner
# noinspection PyUnusedLocal
[docs]def binarize(image, binary=None): """ Binarizes the input image using the experimental thresholding technique. :param image: :param binary: :return: """ technique = getattr(binarization_module, ThresholdingTechnique.value) args, defaults = get_argnames_and_defaults(technique) for skip in 'kwargs', 'args': if skip in args: args.remove(skip) args = args[-len(defaults):] default_parameters = dict(zip(args, defaults)) override_parameters = {} parameter_str = ThresholdingParameters.value if parameter_str: for pair in parameter_str.split(','): key, value = pair.split(':') if key not in default_parameters: raise RuntimeError("Unsupported parameter \"%s\" passed to thresholding function." % (key,)) desired_type = type(default_parameters[key]) override_parameters[key] = desired_type(value) return technique(image, **override_parameters)
# noinspection PyUnusedLocal
[docs]def skeletonize(binary, skeleton=None): """ Skeletonizes the image using scikit-image's skeletonize function. :param binary: :param skeleton: :return: >>> skeletonize(np.array([[0, 0, 1, 1], ... [0, 0, 1, 1], ... [0, 0, 1, 1], ... [0, 0, 1, 1]])) array([[False, False, False, False], [False, False, True, False], [False, False, True, False], [False, False, False, False]]) """ return sk_skeletonize(binary)
# noinspection PyUnusedLocal
[docs]def image_statistics(image, calibration, result=None): """ Adds some numeric image parameters (i.e. size) to the results. :param image: :param calibration: :param result: :return: >>> sorted(image_statistics(np.array([[0, 0, 0], ... [0, 0, 0], ... [0, 0, 0]]), calibration=15.0).items()) [('area', 2025.0), ('area_pixel', 9), ('input_height', 45.0), ('input_height_pixel', 3), \ ('input_width', 45.0), ('input_width_pixel', 3)] """ return { 'input_width': image.shape[1] * calibration, 'input_height': image.shape[0] * calibration, 'input_width_pixel': image.shape[1], 'input_height_pixel': image.shape[0], 'area': image.shape[1] * calibration * image.shape[0] * calibration, 'area_pixel': image.shape[1] * image.shape[0] }
# noinspection PyUnusedLocal
[docs]def quantify_binary(binary, calibration, result=None): """ Adds some information about the binary image (i.e. covered ratio, area ...) to the results. :param binary: :param calibration: :param result: :return: >>> sorted(quantify_binary(np.array([[0, 0, 0], ... [1, 1, 1], ... [0, 0, 0]]), calibration=15.0).items()) [('covered_area', 675.0), ('covered_area_pixel', 3), ('covered_ratio', 0.3333333333333333)] """ ones = np.sum(binary) total = binary.shape[0] * binary.shape[1] return { 'covered_ratio': ones / total, 'covered_area': ones * calibration ** 2, 'covered_area_pixel': ones }
# noinspection PyUnusedLocal
[docs]def graph_statistics(node_frame, result=None): """ Adds some information about the graph to the results. :param node_frame: :param result: :return: >>> pf = PixelFrame(np.array([[0, 0, 0], ... [1, 1, 1], ... [0, 0, 0]]), calibration=15.0) >>> sorted(graph_statistics(NodeFrame(pf)).items()) [('graph_edge_count', 1.0), ('graph_edge_length', 30.0), ('graph_endpoint_count', 2), \ ('graph_junction_count', 0), ('graph_node_count', 2)] """ # everything / 2 because it's a digraph/graph structure graph_edge_length = node_frame.adjacency.sum() / 2 graph_edge_count = node_frame.adjacency.nnz / 2 graph_node_count = node_frame.adjacency.shape[0] graph_junction_count = len(node_frame.every_junction) graph_endpoint_count = len(node_frame.every_endpoint) return { 'graph_edge_length': graph_edge_length * node_frame.calibration, 'graph_edge_count': graph_edge_count, 'graph_node_count': graph_node_count, 'graph_junction_count': graph_junction_count, 'graph_endpoint_count': graph_endpoint_count }
[docs]def clean_up(calibration, binary): """ Cleans up the image by removing holes smaller than the configured size. :param calibration: :param binary: :return: >>> clean_up(0.1, np.array([[ True, True, True], ... [ True, False, True], ... [ True, True, True]])) array([[ True, True, True], [ True, True, True], [ True, True, True]]) """ binary = ( ndi.gaussian_filter(binary * 1.0, CleanUpGaussianSigma.value / calibration) > CleanUpGaussianThreshold.value ) with warnings.catch_warnings(): warnings.simplefilter('ignore') binary = remove_small_holes(binary, min_size=int(CleanUpHoleFillSize.value / calibration ** 2), connectivity=2) return binary
[docs]def remove_small_structures(calibration, binary): """ Cleans up the image by removing structures smaller than the configured size. :param calibration: :param binary: :return: >>> remove_small_structures(0.1, np.array([[ False, False, False], ... [ False, False, True], ... [ False, False, True]])) array([[False, False, False], [False, False, False], [False, False, False]]) """ with warnings.catch_warnings(): warnings.simplefilter('ignore') return remove_small_objects(binary, min_size=int(RemoveSmallStructuresSize.value / calibration ** 2), connectivity=2) # TODO
[docs]def remove_border_artifacts(calibration, binary): """ Removes structures, which are most likely artifacts because their centroid lies near the border. :param calibration: :param binary: :return: >>> remove_border_artifacts(0.1, np.array([[ False, False, False], ... [ False, False, True], ... [ False, False, True]])) array([[False, False, False], [False, False, False], [False, False, False]]) """ border = BorderArtifactRemovalBorderSize.value / calibration labeled = label(binary) corner_pixels = np.r_[ labeled[0, :].ravel(), labeled[-1, :].ravel(), labeled[:, 0].ravel(), labeled[:, -1].ravel() ] corner_pixels = set(np.unique(corner_pixels)) - {0} for region in regionprops(labeled): if region.label in corner_pixels: if ((region.centroid[0] < border) or (region.centroid[0] > (binary.shape[0] - border)) or (region.centroid[1] < border) or (region.centroid[1] > (binary.shape[1] - border))): binary[labeled == region.label] = False return binary
# noinspection PyUnusedLocal
[docs]def convert_to_nodes(skeleton, timepoint, calibration, pixel_frame=None, node_frame=None): """ Passes the input skeleton into a PixelFrame and instantiates a NodeFrame based upon that. :param skeleton: :param timepoint: :param calibration: :param pixel_frame: :param node_frame: :return: """ pixel_frame = PixelFrame(skeleton, timepoint, calibration=calibration) node_frame = NodeFrame(pixel_frame) return pixel_frame, node_frame
[docs]def track_multipoint(collected): """ Initiates tracking between consecutive NodeFrames. :param collected: :return: """ for result1, result2 in pairwise(collected.values()): result1.node_frame.track(result2.node_frame) return collected
# noinspection PyUnusedLocal
[docs]def individual_tracking(collected, tracked_fragments=None, tracked_fragments_fates=None): """ After correspondence has been established by NodeFrame#track, reconstructs growing paths over time. :param collected: :param tracked_fragments: :param tracked_fragments_fates: :return: """ def any_in(needles, haystack): for needle in needles: if needle in haystack: return True return False tracks = {} last_valid = [] fates = {} next_track_id = 0 for i, (current_result, next_result) in enumerate(pairwise(collected.values())): if current_result.covered_ratio > TrackingMaximumCoverage.value: break frame, next_frame = current_result.node_frame, next_result.node_frame calibration = frame.calibration timepoint, next_timepoint = frame.timepoint, next_frame.timepoint delta_t_in_hours = (next_timepoint - timepoint) / (60.0*60.0) # either: look for pairs of endpoints and junctions, # or endpoints and endpoints, if there are no junctions yet def valid_pairs(): for _e in frame.every_endpoint: edges = frame.adjacency[_e, :] edge_indices = edges.nonzero()[1] for _other in edge_indices: no_junctions = not any_in(frame.get_connected_nodes(_other), frame.every_junction) if frame.is_junction(_other) or (no_junctions and frame.is_endpoint(_other)): if _e != _other: yield None, _e, _other valid = [] endpoints_used = set() for track_id, e, other in chain(last_valid, valid_pairs()): e_on_next, other_on_next = frame.self_to_successor[e], frame.self_to_successor[other] if e_on_next == other_on_next: continue if e_on_next == -1 or other_on_next == -1: # tracking error fates[track_id] = "track aborted due to missing future node" continue distance = frame.shortest_paths[e, other] def distance_condition(current, new): if new == 0.0 or current == 0.0: # print(new, current) return False return (new > current) or (abs(1.0 - (current / new)) < TrackingMaximumRelativeShrinkage.value) if next_frame.connected_components[e_on_next] != next_frame.connected_components[other_on_next]: # careful now: either the track broke, or we need to pick an alternative, fitting variant for _e_on_next, _other_on_next in product(frame.self_to_successor_alternatives[e], frame.self_to_successor_alternatives[other]): if (_e_on_next != _other_on_next and ( next_frame.connected_components[_e_on_next] == next_frame.connected_components[ _other_on_next]) and ( next_frame.shortest_paths[_e_on_next, _other_on_next] < float('inf')) and distance_condition( distance, next_frame.shortest_paths[_e_on_next, _other_on_next])): e_on_next, other_on_next = _e_on_next, _other_on_next break next_distance = next_frame.shortest_paths[e_on_next, other_on_next] distance_num = frame.shortest_paths_num[e, other] # print(track_id, i, e, other, e_on_next, other_on_next, distance, distance_num) if e in endpoints_used or other in endpoints_used: fates[track_id] = "endpoints used otherwise" continue if distance == float('inf') or next_distance == float('inf'): # the first one should never happen, # the nodes are no longer connected on the next frame? fates[track_id] = "formerly connected components became unconnected? (dist/next dist %.4f %.4f)" % ( distance, next_distance ) continue if ((next_distance < distance) and abs(1.0 - (distance / next_distance)) > TrackingMaximumRelativeShrinkage.value): # a later distance was SHORTER than the current, that means tracking error or cycle in graph fates[track_id] = "track aborted due to shortcut (cycle or tracking error) [last %f > next %f]" % ( distance, next_distance ) continue mu_per_h = ((next_distance - distance) * calibration) / delta_t_in_hours if not (TrackingMinimumTipElongationRate.value < mu_per_h < TrackingMaximumTipElongationRate.value): # a later distance changed too much, which might be a tracking error fates[track_id] = "track aborted due to too large change in length " \ "[last %f, next %f, change %f mu per h]" % ( distance, next_distance, mu_per_h ) continue # split up tracks which contain just endpoints ... otherwise the elongation rate is twice as high if frame.is_endpoint(e) and frame.is_endpoint(other): path = frame.get_path(e, other) if next_frame.is_endpoint(e_on_next) and next_frame.is_endpoint(other_on_next): if len(path) > 2: other = path[-2] # noinspection PyUnusedLocal distance = frame.shortest_paths[e, other] # noinspection PyUnusedLocal distance_num = frame.shortest_paths_num[e, other] continue if track_id is None: track_id = next_track_id next_track_id += 1 if track_id not in tracks: tracks[track_id] = {} tracks[track_id][i] = (e, other, e_on_next, other_on_next, distance, distance_num) valid.append((track_id, e_on_next, other_on_next)) endpoints_used.add(e) last_valid = valid return tracks, fates
# noinspection PyProtectedMember,PyUnusedLocal
[docs]def prepare_tracked_fragments(collected, tracked_fragments, tracked_fragments_fates, track_table=None, track_table_aux_tables=None): """ Filters and converts tracked growing segments to result datasets. :param collected: :param tracked_fragments: :param tracked_fragments_fates: :param track_table: :param track_table_aux_tables: :return: """ key_list = list(collected.keys()) calibration = next(iter(collected.values()))['calibration'] track_table = [] track_table_aux_tables = [] for track_id, track in tracked_fragments.items(): if len(track) < TrackingMinimumTrackedPointCount.value: continue track_list = [[i, track[i]] for i in sorted(track.keys())] times_lengths = np.array( [[collected[key_list[i]]['node_frame'].timepoint, calibration * distance] for i, (e, other, e_on_next, other_on_next, distance, distance_num) in track_list]) # minimum length of the maximally tracked segment if times_lengths[:, 1].max() < TrackingMinimalMaximumLength.value: continue # minimum length grown additionally if (times_lengths[:, 1].max() - times_lengths[:, 1].min()) < TrackingMinimalGrownLength.value: continue # whatever the settings are, there must be growth if (times_lengths[:, 1].max() - times_lengths[:, 1].min()) == 0.0: continue aux_table_num = len(track_table_aux_tables) track_table_aux_tables.append( [{ 'track_table_number': aux_table_num, 'timepoint': collected[key_list[i]]['node_frame'].timepoint, 'node_id_a': e, 'node_id_b': other, 'node_next_id_a': e_on_next, 'node_next_id_b': other_on_next, 'distance': calibration * distance, 'distance_num': distance_num } for i, (e, other, e_on_next, other_on_next, distance, distance_num) in track_list] ) distance_num_helper = np.array( [distance_num for i, (e, other, e_on_next, other_on_next, distance, distance_num) in track_list]) times = times_lengths[:, 0] lengths = times_lengths[:, 1] relative_lengths = times_lengths[:, 1] / times_lengths[:, 1].min() t_min, t_max = times.min(), times.max() row = { 'timepoint_begin': t_min, 'timepoint_end': t_max, 'timepoint_center': t_min + (t_max - t_min) / 2, 'minimum_distance': lengths.min(), 'maximum_distance': lengths.max(), 'minimum_distance_num': distance_num_helper.min(), 'maximum_distance_num': distance_num_helper.max(), 'duration': t_max - t_min, 'count': len(times), 'aux_table': aux_table_num } try: regression = linregress(times, lengths)._asdict() row.update({'plain_regression_' + k: v for k, v in regression.items()}) regression = linregress(times, relative_lengths)._asdict() row.update({'normalized_regression_' + k: v for k, v in regression.items()}) regression = linregress(times, np.log(lengths))._asdict() row.update({'logarithmic_plain_regression_' + k: v for k, v in regression.items()}) regression = linregress(times, np.log(relative_lengths))._asdict() row.update({'logarithmic_normalized_regression_' + k: v for k, v in regression.items()}) regression = prepare_optimized_regression(times, lengths) row.update({'optimized_regression_' + k: v for k, v in regression.items()}) regression = prepare_optimized_regression(times, relative_lengths) row.update({'optimized_normalized_regression_' + k: v for k, v in regression.items()}) regression = prepare_optimized_regression(times, np.log(lengths)) row.update({'optimized_logarithmic_regression_' + k: v for k, v in regression.items()}) regression = prepare_optimized_regression(times, np.log(relative_lengths)) row.update({'optimized_logarithmic_normalized_regression_' + k: v for k, v in regression.items()}) except IndexError: pass track_table.append(row) return track_table, track_table_aux_tables
[docs]def prepare_position_regressions(collected, result): """ Prepares some regressions over parameters collected per position over time. :param collected: :param result: :return: """ fields = ['covered_ratio', 'covered_area', 'graph_edge_length', 'graph_edge_count', 'graph_node_count', 'graph_junction_count', 'graph_endpoint_count'] row = {} # noinspection PyProtectedMember def prepare_for_field(field_name): data = np.array([[f.timepoint, f[field_name]] for f in collected.values()]) regression = linregress(data[:, 0], data[:, 1])._asdict() row.update({field_name + '_linear_regression_' + k: v for k, v in regression.items()}) regression = linregress(data[:, 0], np.log(data[:, 1]))._asdict() row.update({field_name + '_logarithmic_regression_' + k: v for k, v in regression.items()}) regression = prepare_optimized_regression(data[:, 0], data[:, 1]) row.update({field_name + '_optimized_linear_regression_' + k: v for k, v in regression.items()}) regression = prepare_optimized_regression(data[:, 0], np.log(data[:, 1])) row.update({field_name + '_optimized_logarithmic_regression_' + k: v for k, v in regression.items()}) for field in fields: try: with np.errstate(divide='ignore', invalid='ignore'): prepare_for_field(field) except IndexError: pass result.update(row) return result
# noinspection PyUnusedLocal
[docs]def generate_graphml(node_frame, result): """ Generates a GraphML representation of a particular frame. :param node_frame: :param result: :return: """ return { 'graphml': to_graphml_string(node_frame.get_networkx_graph()) }
# noinspection PyTypeChecker,PyUnusedLocal
[docs]def generate_overall_graphml(collected, result): """ Generates a GraphML representation of the whole graph of one image stack. :param collected: :param result: :return: """ time_to_z_scale = 1.0 graphs_list = {} successors = {} node_counts = {} node_count_accumulator = 0 graph = nx.Graph() for i, (meta, frame) in enumerate(list(collected.items())[:-1]): node_frame = frame.node_frame successors[i] = node_frame.self_to_successor g = node_frame.get_networkx_graph(with_z=time_to_z_scale) g = nx.relabel_nodes(g, lambda node_id: node_id + node_count_accumulator, copy=True) graphs_list[i] = g node_counts[i] = node_count_accumulator node_count_accumulator += len(g.nodes()) graph.add_nodes_from(g.nodes(data=True)) graph.add_edges_from(g.edges(data=True)) for i, mapping in successors.items(): if i + 1 == len(successors): break for relative_from_index, relative_to_index in enumerate(mapping): if relative_to_index == -1: continue from_index = node_counts[i] + relative_from_index to_index = node_counts[i + 1] + relative_to_index graph.add_edge(from_index, to_index) return { 'overall_graphml': to_graphml_string(graph) }