In [1]:
%matplotlib inline
from matplotlib.colors import PowerNorm, LogNorm
import numpy as np
import os
import pandas as pd
%load_ext autoreload
%autoreload 2

import palm_diagnostics as pdiag

In [2]:
pdiag.pb.register()

In [3]:
@pdiag.dask.delayed
def _gen_img_sub_thread(chunklen, chunk, yx_shape, df, mag, multipliers, diffraction_limit):
    """"""
    s = slice(chunk * chunklen, (chunk + 1) * chunklen)
    df_chunk = df[["y0", "x0", "sigma_y", "sigma_x"]].values[s]
    # calculate the amplitude of the z gaussian.
    amps = multipliers[s]
    # generate a 2D image weighted by the z gaussian.
    print(amps)
    return pdiag._jit_gen_img_sub(yx_shape, df_chunk, mag, amps, diffraction_limit)


def _gen_img_sub_threaded(yx_shape, df, mag, multipliers, diffraction_limit, numthreads=1):
    """"""
    length = len(df)
    chunklen = (length + numthreads - 1) // numthreads
    new_shape = tuple(np.array(yx_shape) * mag)
    # print(dask.array.from_delayed(_gen_zplane(df, yx_shape, zplanes[0], mag), new_shape, np.float))
    rendered_threads = [pdiag.dask.array.from_delayed(
        _gen_img_sub_thread(chunklen, chunk, yx_shape, df, mag, multipliers, diffraction_limit), new_shape, np.float)
                        for chunk in range(numthreads)]
    lazy_result = pdiag.dask.array.stack(rendered_threads)
    return lazy_result.sum(0)

In [4]:
def _gen_img_sub_threaded2(yx_shape, df, mag, multipliers, diffraction_limit, numthreads=1):
    new_shape = np.array(yx_shape) * mag
    keys_for_render = ["y0", "x0", "sigma_y", "sigma_x"]
    df = df[keys_for_render].values
    length = len(df)
    chunklen = (length + numthreads - 1) // numthreads
    
    delayed_jit_gen_img_sub = pdiag.dask.delayed(pdiag._jit_gen_img_sub)
    print([multipliers[i * chunklen:(i + 1) * chunklen]
                               for i in range(numthreads)])
    lazy_result = [delayed_jit_gen_img_sub(yx_shape, df[i * chunklen:(i + 1) * chunklen], mag,
                                           multipliers[i * chunklen:(i + 1) * chunklen], diffraction_limit)
                               for i in range(numthreads)]
    
    lazy_result = pdiag.dask.array.stack([pdiag.dask.array.from_delayed(l, new_shape, np.float)
        for l in lazy_result])
    
    return lazy_result.sum(0)

In [5]:
test_data = pd.DataFrame((np.random.rand(50000000,4)), columns=["y0", "x0", "sigma_y", "sigma_x"])
test_data[["sigma_y", "sigma_x"]] *= 0.01
test_data.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50000000 entries, 0 to 49999999
Data columns (total 4 columns):
y0         float64
x0         float64
sigma_y    float64
sigma_x    float64
dtypes: float64(4)
memory usage: 1.5 GB

In [6]:
from dask.dot import dot_graph

In [7]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1, np.array(()), True, 4)
dot_graph(timg.dask)


[array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64)]
Wall time: 1.19 s
Out[7]:

In [8]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 1)
%time a = timg.compute()


Wall time: 2 ms
[                                        ] | 0% Completed |  0.0s[]
[########################################] | 100% Completed | 21min 57.2s
Wall time: 21min 57s

In [9]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 2)
%time a = timg.compute()


Wall time: 2 ms
[                                        ] | 0% Completed |  2.4s[][]

[########################################] | 100% Completed | 14min 37.7s
Wall time: 14min 37s

In [10]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute()


Wall time: 4 ms
[                                        ] | 0% Completed |  1.1s[][]
[]
[                                        ] | 0% Completed |  4.6s[]

[########################################] | 100% Completed |  8min 57.5s
Wall time: 8min 57s

In [15]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 8)
%time a = timg.compute()


Wall time: 122 ms
[                                        ] | 0% Completed |  2.7s[][]
[]


[                                        ] | 0% Completed |  7.0s[]
[]
[]

[########################################] | 100% Completed |  6min 22.7s
Wall time: 6min 22s

In [16]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 16)
%time a = timg.compute()


Wall time: 9.01 ms
[                                        ] | 0% Completed |  1.2s[][]
[]
[                                        ] | 0% Completed |  4.7s[]

[][]
[                                        ] | 0% Completed |  8.9s[]
[]
[                                        ] | 0% Completed | 11.5s[]
[]
[]

[                                        ] | 0% Completed | 16.2s[]
[                                        ] | 0% Completed | 19.4s[]

[][]

[########################################] | 100% Completed |  5min 50.4s
Wall time: 5min 50s

In [11]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)


Wall time: 4 ms
[########################################] | 100% Completed |  7min 14.2s
Wall time: 7min 15s

In [19]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 8)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)


Wall time: 235 ms
[########################################] | 100% Completed |  5min 12.6s
Wall time: 5min 14s

In [20]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 16)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)


Wall time: 7 ms
[########################################] | 100% Completed |  5min 25.7s
Wall time: 5min 27s

Original way


In [12]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 1)
%time a = timg.compute()


[array([], dtype=float64)]
Wall time: 1.65 s
[########################################] | 100% Completed | 22min 31.5s
Wall time: 22min 31s

In [13]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute()


[array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64)]
Wall time: 1.7 s
[########################################] | 100% Completed |  9min 21.9s
Wall time: 9min 21s

In [17]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 8)
%time a = timg.compute()


[array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64)]
Wall time: 1.66 s
[########################################] | 100% Completed |  6min 26.3s
Wall time: 6min 26s

In [18]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 16)
%time a = timg.compute()


[array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64)]
Wall time: 1.9 s
[########################################] | 100% Completed |  5min 46.3s
Wall time: 5min 46s

In [14]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)


[array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64)]
Wall time: 1.75 s
[########################################] | 100% Completed |  7min 12.3s
Wall time: 7min 14s

In [ ]: