In [1]:
    
%matplotlib inline
from matplotlib.colors import PowerNorm, LogNorm
import numpy as np
import os
import pandas as pd
%load_ext autoreload
%autoreload 2
import palm_diagnostics as pdiag
    
In [2]:
    
pdiag.pb.register()
    
In [3]:
    
@pdiag.dask.delayed
def _gen_img_sub_thread(chunklen, chunk, yx_shape, df, mag, multipliers, diffraction_limit):
    """"""
    s = slice(chunk * chunklen, (chunk + 1) * chunklen)
    df_chunk = df[["y0", "x0", "sigma_y", "sigma_x"]].values[s]
    # calculate the amplitude of the z gaussian.
    amps = multipliers[s]
    # generate a 2D image weighted by the z gaussian.
    print(amps)
    return pdiag._jit_gen_img_sub(yx_shape, df_chunk, mag, amps, diffraction_limit)
def _gen_img_sub_threaded(yx_shape, df, mag, multipliers, diffraction_limit, numthreads=1):
    """"""
    length = len(df)
    chunklen = (length + numthreads - 1) // numthreads
    new_shape = tuple(np.array(yx_shape) * mag)
    # print(dask.array.from_delayed(_gen_zplane(df, yx_shape, zplanes[0], mag), new_shape, np.float))
    rendered_threads = [pdiag.dask.array.from_delayed(
        _gen_img_sub_thread(chunklen, chunk, yx_shape, df, mag, multipliers, diffraction_limit), new_shape, np.float)
                        for chunk in range(numthreads)]
    lazy_result = pdiag.dask.array.stack(rendered_threads)
    return lazy_result.sum(0)
    
In [4]:
    
def _gen_img_sub_threaded2(yx_shape, df, mag, multipliers, diffraction_limit, numthreads=1):
    new_shape = np.array(yx_shape) * mag
    keys_for_render = ["y0", "x0", "sigma_y", "sigma_x"]
    df = df[keys_for_render].values
    length = len(df)
    chunklen = (length + numthreads - 1) // numthreads
    
    delayed_jit_gen_img_sub = pdiag.dask.delayed(pdiag._jit_gen_img_sub)
    print([multipliers[i * chunklen:(i + 1) * chunklen]
                               for i in range(numthreads)])
    lazy_result = [delayed_jit_gen_img_sub(yx_shape, df[i * chunklen:(i + 1) * chunklen], mag,
                                           multipliers[i * chunklen:(i + 1) * chunklen], diffraction_limit)
                               for i in range(numthreads)]
    
    lazy_result = pdiag.dask.array.stack([pdiag.dask.array.from_delayed(l, new_shape, np.float)
        for l in lazy_result])
    
    return lazy_result.sum(0)
    
In [5]:
    
test_data = pd.DataFrame((np.random.rand(50000000,4)), columns=["y0", "x0", "sigma_y", "sigma_x"])
test_data[["sigma_y", "sigma_x"]] *= 0.01
test_data.info()
    
    
In [6]:
    
from dask.dot import dot_graph
    
In [7]:
    
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1, np.array(()), True, 4)
dot_graph(timg.dask)
    
    
    Out[7]:
In [8]:
    
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 1)
%time a = timg.compute()
    
    
In [9]:
    
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 2)
%time a = timg.compute()
    
    
In [10]:
    
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute()
    
    
In [11]:
    
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)
    
    
In [12]:
    
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 1)
%time a = timg.compute()
    
    
In [13]:
    
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute()
    
    
In [14]:
    
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)