Source code for mycelyso.misc.regression

# -*- coding: utf-8 -*-
"""
The regression modules contains some helpers to perform linear fits on data with non-linear begins or ends.
"""

import numpy as np
from scipy.stats import linregress


[docs]def find_linear_window( x, y, begin=float('nan'), end=float('nan'), window=0.1, condition=('rvalue', 'gt', 0.95), return_begin_end=False, return_nan_if_impossible=True ): """ Tries to find a continuous window in x/y which (mostly) follows a linear relation subject to condition. If window is a float, it is seen as relative length of the input lists. Linear regressions will be performed on each window, then the windows will be filtered by the condition (eg that they have a rvalue better than 0.95). Then the range between the first and the last window to follow these conditions will be used to perform the overall regression. See also :py:func:`scipy.stats.linregress` :param x: Input data, independent value :param y: Input data, dependent value :param begin: :param end: :param window: Window, either :param condition: Condition to check, a tuple of three. The first must be a key of a linear regression result object, the second either 'gt' or 'lt', and the third the value to compare. :param return_begin_end: If true, return the found range as well :param return_nan_if_impossible: If True, return NaN if no suitable region was found, otherwise throws RuntimeError :return: """ data = np.c_[x, y] data[:, 1][~np.isfinite(data[:, 1])] = np.log(np.finfo(data.dtype).eps) if type(window) == float: window = int(window * data.shape[0]) if window < 3: window = 3 pairs = [[i, min(i+window, data.shape[0])] for i in range(0, data.shape[0], window)] with np.errstate(all='ignore'): results = [[b, e, linregress(data[b:e, 0], data[b:e, 1])] for b, e in pairs] condition_check = (lambda r: getattr(r, condition[0]) > condition[2]) if condition[1] == 'gt'\ else (lambda r: getattr(r, condition[0]) < condition[2]) filtered_results = [[b, e] for b, e, r in results if condition_check(r)] if not filtered_results: if return_nan_if_impossible: regression = results[0][2] # noinspection PyProtectedMember,PyProtectedMember regression = regression._replace(**{k: float('nan') for k in regression._asdict().keys()}) if return_begin_end: return float('nan'), float('nan'), regression else: return regression else: raise RuntimeError('No optimal range for regression found.') if np.isnan(begin): begin = filtered_results[0][0] if np.isnan(end): end = filtered_results[-1][1] filtered_data = data[begin:end, :] with np.errstate(all='ignore'): regression = linregress(filtered_data[:, 0], filtered_data[:, 1]) if return_begin_end: return begin, end, regression else: return regression
[docs]def prepare_optimized_regression(x, y): """ First finds an optimal window using :py:func:`find_linear_window`, than performs a linear regression. :param x: independent variable :param y: dependent variable :return: >>> x = np.linspace(1, 100, 100) >>> y = x * 5 + 10 >>> y[0:10] = 0 # break our nice linear curve >>> prepare_optimized_regression(x, y) OrderedDict([('slope', 5.0), ('intercept', 10.0), ('rvalue', 0.9999999999999999), ('pvalue', 0.0), \ ('stderr', 7.942345602646859e-09), ('begin_index', 10), ('end_index', 100), ('begin', 11.0), ('end', 100.0)]) """ condition = ('rvalue', 'gt', 0.9) begin, end, regression = find_linear_window(x, y, condition=condition, return_begin_end=True) # noinspection PyProtectedMember regression = regression._asdict() regression['begin_index'] = begin regression['end_index'] = end if end == len(x): end -= 1 if (begin != begin or end != end) or begin < 0 or end >= len(x): regression['begin'] = regression['end'] = -1 else: regression['begin'] = x[begin] regression['end'] = x[end] return regression