Source code for pharaglow.util

#!/usr/bin/env python

"""util.py: useful general functions like filters and peak detection."""
import warnings
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
from functools import partial
import math
import numpy as np
import pandas as pd


# def parallelize_dataframe(df, func, params, n_cores):

#     """ split a dataframe to easily use multiprocessing."""
#     df_split = np.array_split(df, n_cores)
#     df_split = [d for d in df_split  if len(d)>0]
#     if len(df_split) <1:
#         return
#     # filter zero-size jobs
#     print([len(d) for d in df_split])
#     pool = Pool(n_cores)
#     df = pd.concat(pool.starmap(func, zip(df_split, np.repeat(params, len(df_split)))))
#     pool.close()
#     pool.join()
#     return df


[docs]def parallel_analysis(args, param, parallelWorker, framenumbers = None, nWorkers = 5, output= None, depth = 'uint8', **kwargs): """Use multiprocessing to speed up image analysis. This is inspired by the trackpy.batch function. Args: args (tuple): contains iterables eg. (frames, masks) or just frames that will be iterated through. param (dict): image analysis parameters parallelWorker (func): a function defining what should be done with args framenumbers (list, optional): a list of frame numbers corresponding to the frames in args. Defaults to None. nWorkers (int, optional): Processes to use, if 1 will run without multiprocessing. Defaults to 5. output ([type], optional): {None, trackpy.PandasHDFStore, SomeCustomClass} a storage class e.g. trackpy.PandasHDFStore. Defaults to None. depth (str, optional): bit depth of frames. Defaults to 'uint8'. Returns: output or (pandas.DataFrame, numpy.array) """ if framenumbers is None: framenumbers = np.arange(len(args[0])) # Prepare wrapped function for mapping to `frames` detection_func = partial(parallelWorker, params = param) if nWorkers ==1: func = map pool = None else: # prepare imap pool pool = ProcessPoolExecutor(max_workers=nWorkers, mp_context=mp.get_context("spawn")) func = pool.map objects = [] images = [] try: for i, res in enumerate(func(detection_func, zip(*args, framenumbers))): if i%100 ==0: print(f'Analyzing image {i} of {len(args[0])}') if len(res[0]) > 0: # Store if features were found if output is None: objects.append(res[0]) if len(res)>1: images += res[1] else: # here we keep images within the dataframe if len(res)>1: res[0]['images'] = res[1] output.put(res[0]) finally: if pool: # Ensure correct termination of Pool pool.shutdown() if output is None: if len(objects) > 0: objects = pd.concat(objects).reset_index(drop=True) if len(images)>0: images = np.array([pad_images(im, shape, param['length'], reshape = True, depth=depth) for im,shape in zip(images, objects['shapeX'])]) images = np.array(images).astype(depth) return objects, images else: # return empty DataFrame warnings.warn("No objects found in any frame.") return pd.DataFrame([]), images else: return output
[docs]def smooth(x,window_len=11,window='hanning'): """smooth the data using a window with requested size. This method is based on the convolution of a scaled window with the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the begining and end part of the output signal. import warnings input: x: the input signal window_len: the dimension of the smoothing window; should be an odd integer window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' flat window will produce a moving average smoothing. output: the smoothed signal example: t=linspace(-2,2,0.1) x=sin(t)+randn(len(t))*0.1 y=smooth(x) see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve scipy.signal.lfilter TODO: the window parameter could be the window itself if an array instead of a string NOTE: length(output) != length(input), to correct this: return y[(window_len/2-1):-(window_len/2)] instead of just y. """ if x.ndim != 1: raise ValueError("smooth only accepts 1 dimension arrays.") if x.size < window_len: raise ValueError("Input vector needs to be bigger than window size.") if window_len<3: return x if window not in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'") s=np.r_[x[window_len-1:0:-1],x,x[-2:-window_len-1:-1]] #print(len(s)) if window == 'flat': #moving average w=np.ones(window_len,'d') else: w=eval('np.'+window+'(window_len)') y=np.convolve(w/w.sum(),s,mode='valid') return y[(window_len//2-1):-(window_len//2)][:len(x)]
[docs]def unravelImages(im, lengthX): """ Reshape images from linear to square. Args: im (list): an image lengthX (int): shape of new image in second axis Returns: numpy.array: image reshaped as (N,lengthX) """ return im.reshape(-1, lengthX)
[docs]def get_im(df, colnames, lengthX): """deprecated """ return unravelImages(df[colnames].to_numpy(), lengthX)
[docs]def pad_images(im, shape, size, reshape = True, depth = 'uint8'): """pad image to desired size. Args: im (list): a linearized version of an image shape (int): shape of second axis of image. size (int): pad image to size (size, size) reshape (bool, optional): reshape image to (-1, shape) before padding. Defaults to True. depth (str, optional): bit depth of image. Defaults to 'uint8'. Returns: numpy.array: padded image of size (size, size) """ # make image from list im = np.array(im, dtype = depth) if reshape: im = im.reshape(-1, shape) # pad an image to square sy, sx = im.shape if sy > size or sx > size: warnings.warn(f'Image {sy, sx} larger than pad size {size}. Cropping') cropy, cropx = math.ceil(max([0, sy-size])/2), math.ceil(max([0, sx-size])/2) im = im[cropy:sy-cropy, cropx:sx-cropx] return pad_images(im, shape, size, reshape = False) # how much to add around each side py, px = (size-sy)//2, (size-sx)//2 # add back the possible rounding error oy, ox = (size-sy)%2, (size-sx)%2 new_im = np.pad(im, [(py, py+oy), (px, px+ox)], mode='constant', constant_values= 0) if new_im.shape !=(size, size): warnings.warn(f'Rerunning to correct size {size}') return pad_images(im, shape, size, reshape = False) return new_im