In [1]:
%matplotlib inline
from matplotlib.colors import PowerNorm, LogNorm
import numpy as np
import os
import pandas as pd
%load_ext autoreload
%autoreload 2

import palm_diagnostics as pdiag

In [2]:
pdiag.pb.register()

In [3]:
@pdiag.dask.delayed
def _gen_img_sub_thread(chunklen, chunk, yx_shape, df, mag, multipliers, diffraction_limit):
    """"""
    s = slice(chunk * chunklen, (chunk + 1) * chunklen)
    df_chunk = df[["y0", "x0", "sigma_y", "sigma_x"]].values[s]
    # calculate the amplitude of the z gaussian.
    amps = multipliers[s]
    # generate a 2D image weighted by the z gaussian.
    print(amps)
    return pdiag._jit_gen_img_sub(yx_shape, df_chunk, mag, amps, diffraction_limit)


def _gen_img_sub_threaded(yx_shape, df, mag, multipliers, diffraction_limit, numthreads=1):
    """"""
    length = len(df)
    chunklen = (length + numthreads - 1) // numthreads
    new_shape = tuple(np.array(yx_shape) * mag)
    # print(dask.array.from_delayed(_gen_zplane(df, yx_shape, zplanes[0], mag), new_shape, np.float))
    rendered_threads = [pdiag.dask.array.from_delayed(
        _gen_img_sub_thread(chunklen, chunk, yx_shape, df, mag, multipliers, diffraction_limit), new_shape, np.float)
                        for chunk in range(numthreads)]
    lazy_result = pdiag.dask.array.stack(rendered_threads)
    return lazy_result.sum(0)

In [4]:
def _gen_img_sub_threaded2(yx_shape, df, mag, multipliers, diffraction_limit, numthreads=1):
    new_shape = np.array(yx_shape) * mag
    keys_for_render = ["y0", "x0", "sigma_y", "sigma_x"]
    df = df[keys_for_render].values
    length = len(df)
    chunklen = (length + numthreads - 1) // numthreads
    
    delayed_jit_gen_img_sub = pdiag.dask.delayed(pdiag._jit_gen_img_sub)
    print([multipliers[i * chunklen:(i + 1) * chunklen]
                               for i in range(numthreads)])
    lazy_result = [delayed_jit_gen_img_sub(yx_shape, df[i * chunklen:(i + 1) * chunklen], mag,
                                           multipliers[i * chunklen:(i + 1) * chunklen], diffraction_limit)
                               for i in range(numthreads)]
    
    lazy_result = pdiag.dask.array.stack([pdiag.dask.array.from_delayed(l, new_shape, np.float)
        for l in lazy_result])
    
    return lazy_result.sum(0)

In [5]:
test_data = pd.DataFrame((np.random.rand(100000000,4)), columns=["y0", "x0", "sigma_y", "sigma_x"])
test_data[["sigma_y", "sigma_x"]] *= 0.01
test_data.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 4 columns):
y0         float64
x0         float64
sigma_y    float64
sigma_x    float64
dtypes: float64(4)
memory usage: 3.0 GB

In [6]:
from dask.dot import dot_graph

In [7]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1, np.array(()), True, 4)
dot_graph(timg.dask)


[array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64)]
Wall time: 2.83 s
Out[7]:

In [8]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 1)
%time a = timg.compute()


Wall time: 2 ms
[                                        ] | 0% Completed |  2.1s[]
[########################################] | 100% Completed | 38min  5.6s
Wall time: 38min 5s

In [9]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 2)
%time a = timg.compute()


Wall time: 3 ms
[                                        ] | 0% Completed |  2.6s[][]

[########################################] | 100% Completed | 22min  5.8s
Wall time: 22min 5s

In [10]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute()


Wall time: 3 ms
[                                        ] | 0% Completed |  4.5s[][][][]



[########################################] | 100% Completed | 14min 42.5s
Wall time: 14min 42s

In [11]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 8)
%time a = timg.compute()


Wall time: 6 ms
[                                        ] | 0% Completed |  0.0s[]
[                                        ] | 0% Completed |  3.9s[][]
[]

[                                        ] | 0% Completed |  9.4s[]
[]
[                                        ] | 0% Completed | 14.2s[]
[]
[########################################] | 100% Completed | 10min 24.8s
Wall time: 10min 24s

In [ ]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 16)
%time a = timg.compute()


Wall time: 8.01 ms
[                                        ] | 0% Completed |  2.3s[]
[][]
[                                        ] | 0% Completed | 10.2s[]

[]

[]
[                                        ] | 0% Completed | 18.0s[]
[]
[                                        ] | 0% Completed | 23.8s[][]

[                                        ] | 0% Completed | 32.7s
[][]
[]

[                                        ] | 0% Completed | 41.9s[]
[########################################] | 100% Completed | 11min 58.7s
Wall time: 11min 58s

In [ ]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)


Wall time: 3 ms
[                                        ] | 0% Completed |  6min 12.5s

In [ ]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 8)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)

In [ ]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 16)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)

Original way


In [ ]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 1)
%time a = timg.compute()

In [ ]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute()

In [ ]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 8)
%time a = timg.compute()

In [ ]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 16)
%time a = timg.compute()

In [ ]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)

In [ ]:
%reset -f

In [ ]: