In [1]:
%matplotlib inline
from matplotlib.colors import PowerNorm, LogNorm
import numpy as np
import os
import pandas as pd
%load_ext autoreload
%autoreload 2
import palm_diagnostics as pdiag
In [2]:
pdiag.pb.register()
In [3]:
@pdiag.dask.delayed
def _gen_img_sub_thread(chunklen, chunk, yx_shape, df, mag, multipliers, diffraction_limit):
""""""
s = slice(chunk * chunklen, (chunk + 1) * chunklen)
df_chunk = df[["y0", "x0", "sigma_y", "sigma_x"]].values[s]
# calculate the amplitude of the z gaussian.
amps = multipliers[s]
# generate a 2D image weighted by the z gaussian.
print(amps)
return pdiag._jit_gen_img_sub(yx_shape, df_chunk, mag, amps, diffraction_limit)
def _gen_img_sub_threaded(yx_shape, df, mag, multipliers, diffraction_limit, numthreads=1):
""""""
length = len(df)
chunklen = (length + numthreads - 1) // numthreads
new_shape = tuple(np.array(yx_shape) * mag)
# print(dask.array.from_delayed(_gen_zplane(df, yx_shape, zplanes[0], mag), new_shape, np.float))
rendered_threads = [pdiag.dask.array.from_delayed(
_gen_img_sub_thread(chunklen, chunk, yx_shape, df, mag, multipliers, diffraction_limit), new_shape, np.float)
for chunk in range(numthreads)]
lazy_result = pdiag.dask.array.stack(rendered_threads)
return lazy_result.sum(0)
In [4]:
def _gen_img_sub_threaded2(yx_shape, df, mag, multipliers, diffraction_limit, numthreads=1):
new_shape = np.array(yx_shape) * mag
keys_for_render = ["y0", "x0", "sigma_y", "sigma_x"]
df = df[keys_for_render].values
length = len(df)
chunklen = (length + numthreads - 1) // numthreads
delayed_jit_gen_img_sub = pdiag.dask.delayed(pdiag._jit_gen_img_sub)
print([multipliers[i * chunklen:(i + 1) * chunklen]
for i in range(numthreads)])
lazy_result = [delayed_jit_gen_img_sub(yx_shape, df[i * chunklen:(i + 1) * chunklen], mag,
multipliers[i * chunklen:(i + 1) * chunklen], diffraction_limit)
for i in range(numthreads)]
lazy_result = pdiag.dask.array.stack([pdiag.dask.array.from_delayed(l, new_shape, np.float)
for l in lazy_result])
return lazy_result.sum(0)
In [5]:
test_data = pd.DataFrame((np.random.rand(100000000,4)), columns=["y0", "x0", "sigma_y", "sigma_x"])
test_data[["sigma_y", "sigma_x"]] *= 0.01
test_data.info()
In [6]:
from dask.dot import dot_graph
In [7]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1, np.array(()), True, 4)
dot_graph(timg.dask)
Out[7]:
In [8]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 1)
%time a = timg.compute()
In [9]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 2)
%time a = timg.compute()
In [10]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute()
In [11]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 8)
%time a = timg.compute()
In [ ]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 16)
%time a = timg.compute()
In [ ]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)
In [ ]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 8)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)
In [ ]:
%time timg = _gen_img_sub_threaded((1,1), test_data, 1000, np.array(()), True, 16)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)
In [ ]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 1)
%time a = timg.compute()
In [ ]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute()
In [ ]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 8)
%time a = timg.compute()
In [ ]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 16)
%time a = timg.compute()
In [ ]:
%time timg = _gen_img_sub_threaded2((1,1), test_data, 1000, np.array(()), True, 4)
%time a = timg.compute(get=pdiag.dask.multiprocessing.get)
In [ ]:
%reset -f
In [ ]: