#!/usr/bin/env python
"""extract.py: Extract pumping-like traces from kymographs etc.."""
import numpy as np
from scipy.signal import find_peaks
[docs]def alignKymos(ar):
"""Align a kymograph by largest correlation.
Args:
ar (numpy,array): 2d array with each row a kymograph line
Returns:
numpy.array: array with each row shifted for optimal correlation with the first row.
"""
sample = ar[0]
sample =sample - np.mean(sample)
ar2 = np.zeros(ar.shape)
#shifts = np.zeros(len(ar))
for ri, row in enumerate(ar):
row =row - np.mean(row)
row =row/np.std(row)
corr = np.correlate(sample,row,mode='full')
shift = int(np.argmax(corr))
ar2[ri] = np.roll(ar[ri], shift)
return ar2
[docs]def hampel(vals_orig, k=7, t0=3):
"""Implements a Hampel filter (code from E. Osorio, Stackoverflow).
Args:
vals_orig (list, numpy.array): series of values to filter
k (int, optional): window size to each side of the sample eg. 7 is 3 left and 3 right of the sample value. Defaults to 7.
t0 (int or float, optional): how many sigma away is an outlier. Defaults to 3.
"""
#Make copy so original not edited
vals = vals_orig.copy()
#Hampel Filter
L = 1.4826
rolling_median = vals.rolling(window=k, center=True, min_periods = 1).median()
MAD = lambda x: np.median(np.abs(x - np.median(x)))
rolling_MAD = vals.rolling(window=k, center=True, min_periods=1).apply(MAD)
threshold = t0 * L * rolling_MAD
difference = np.abs(vals - rolling_median)
outlier_idx = difference > threshold
vals[outlier_idx] = rolling_median[outlier_idx]
return vals
[docs]def preprocess(p, w_bg, w_sm, win_type_bg = 'hamming', win_type_sm = 'boxcar'):
"""preprocess a trace with rolling window brackground subtraction.
Args:
p (numpy.array): input signal or time series
w_bg (int): background window size
w_sm (int): smoothing window size
win_type_bg (str, optional): background window type. Defaults to 'hamming'.
win_type_sm (str, optional): smoothing window type. Defaults to 'boxcar'.
Returns:
numpy.array: background subtracted and smoothed signal
"""
bg = p.rolling(w_bg, min_periods=1, center=True, win_type=win_type_bg).median()
return (p - bg).rolling(w_sm, min_periods=1, center=True, win_type=win_type_sm).mean(), bg
[docs]def find_pumps(metric, heights = np.arange(0.01, 5, 0.1), min_distance = 5, sensitivity = 0.99, **kwargs):
""" peak detection method finding the maximum number of peaks while allowing fewer than
a sensitivity fraction of peaks that are closer than a min_distance.
Args:
metric (numpy.array): input signal with peaks
heights (list, optional): peak prominence values to test. Defaults to np.arange(0.01, 5, 0.1).
min_distance (int, optional): distance peaks should be apart. Defaults to 5.
sensitivity (float, optional): how many peaks can violate min_distance. Defaults to 0.99.
Returns:
list: peak indices
numpy.array: peak number and fraction of valid peaks for all heights
numpy.array: mean and standard deviation fraction of valid peaks for all heights in a random occurance
"""
tmp = []
all_peaks = []
# find peaks at different heights
for h in heights:
peaks = find_peaks(metric, prominence = h, **kwargs)[0]
tmp.append([len(peaks), np.mean(np.diff(peaks)>=min_distance)])
all_peaks.append(peaks)
tmp = np.array(tmp)
# set the valid peaks score to zero if no peaks are present
tmp[:,1][~np.isfinite(tmp[:,1])]= 0
# calculate random distribution of peaks in a series of length l (actually we know the intervals will be exponential)
null = []
l = len(metric)
for npeaks in tmp[:,0]:
locs = np.random.randint(0,l,(100, int(npeaks)))
# calculate the random error rate - and its stdev
null.append([np.mean(np.diff(np.sort(locs), axis =1)>=min_distance), \
np.std(np.mean(np.diff(np.sort(locs), axis =1)>=min_distance, axis =1))])
null = np.array(null)
# now find the best peak level - larger than random, with high accuracy
# subtract random level plus 1 std:
metric_random = tmp[:,1] - (null[:,0]+null[:,1])
# check where this is still positive and where the valid intervals are 1 or some large value
valid = np.where((metric_random>0)*(tmp[:,1]>=sensitivity))[0]
if len(valid)>0:
#peaks = all_peaks[valid[np.argmax(tmp[:,0][valid])]]
h = heights[valid[np.argmax(tmp[:,0][valid])]]
peaks = find_peaks(metric, prominence = h, distance = min_distance, **kwargs)[0]
else:
return [], tmp, null
return peaks, tmp, null
[docs]def pumps(data, key = 'Straightened'):
"""Pumping metric based on the dorsal-vetral signal variation.
Args:
data (pandas.DataFrame): dataframe with a column containing straightened images of pharyxes
Returns:
numpy.array: signal for all images
"""
straightIms = np.array([im for im in data[key].values])
k = np.max(np.std(straightIms, axis =2), axis =1)#-np.mean(straightIms, axis =2)
return np.ravel(k)