In [1]:
import dask.dataframe as dd
from distributed import Client
client = Client(scheduler_file='/scratch/tmorton/dask/scheduler.json')

In [3]:
from explorer.catalog import ParquetCatalog
import glob

files = glob.glob('/scratch/tmorton/qa_explorer_data/forced_big_fake*.parq')
catalog = ParquetCatalog(files[:32], client=client)

In [4]:
from explorer.functors import (Mag, MagDiff, CustomFunctor, DeconvolvedMoments, Column,
                            SdssTraceSize, PsfSdssTraceSizeDiff, HsmTraceSize,
                            PsfHsmTraceSizeDiff, CompositeFunctor)

funcs_to_test = [CustomFunctor('mag(modelfit_CModel) - mag(base_PsfFlux)'),
                 MagDiff('modelfit_CModel', 'base_PsfFlux'),
                 DeconvolvedMoments(),
                 Column('base_Footprint_nPix'),
                 SdssTraceSize(),
                 PsfSdssTraceSizeDiff(),
                 HsmTraceSize(),
                 PsfHsmTraceSizeDiff(),
                ]

In [5]:
f1 = MagDiff('modelfit_CModel', 'base_PsfFlux')
f2 = DeconvolvedMoments()
# f.test(catalog)

In [6]:
%time y1 = f1(catalog)
%time y2 = f2(catalog)


CPU times: user 13.2 s, sys: 1.25 s, total: 14.5 s
Wall time: 31.5 s
CPU times: user 11.9 s, sys: 1.22 s, total: 13.1 s
Wall time: 24.4 s

In [7]:
%time y1 = f1(catalog)
%time y2 = f2(catalog)


CPU times: user 6.13 s, sys: 1.17 s, total: 7.3 s
Wall time: 10.9 s
CPU times: user 5.99 s, sys: 1.18 s, total: 7.17 s
Wall time: 10.8 s

In [7]:
%time y1.head()


CPU times: user 0 ns, sys: 965 µs, total: 965 µs
Wall time: 712 µs
Out[7]:
id
53158034708430851   -0.667639
53158034708430852   -0.584314
53158034708430853   -0.037183
53158034708430854   -0.575237
53158034708430855   -0.001186
dtype: float64

In [9]:
# 24 dask workers, 1 thread each
n_files = [1,2,4,8,16,32,64,128,256]
times = []
for n in n_files:
    catalog = ParquetCatalog(files[:n], client=client)
    times.append(f1.test(catalog))


Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 0.96s, length=1214434.  Type=<class 'pandas.core.series.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 2.06s, length=2428868.  Type=<class 'pandas.core.series.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 2.80s, length=4857736.  Type=<class 'pandas.core.series.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 4.92s, length=9715472.  Type=<class 'pandas.core.series.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 9.07s, length=19430944.  Type=<class 'pandas.core.series.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 18.54s, length=38861888.  Type=<class 'pandas.core.series.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 39.27s, length=77723776.  Type=<class 'pandas.core.series.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 114.67s, length=155447552.  Type=<class 'pandas.core.series.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 302.08s, length=310895104.  Type=<class 'pandas.core.series.Series'>

In [9]:
# with 16 dask workers
n_files = [1,2,4,8,16,32,64,128,256]
times = []
for n in n_files:
    catalog = ParquetCatalog(files[:n])
    times.append(f.test(catalog))


Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 1.44s, length=1214434.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 2.03s, length=2428868.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 2.51s, length=4857736.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 3.86s, length=9715472.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 6.64s, length=19430944.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 12.63s, length=38861888.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 25.08s, length=77723776.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 58.77s, length=155447552.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 130.74s, length=310895104.  Type=<class 'dask.dataframe.core.Series'>
distributed.client - WARNING - Client report stream closed to scheduler

In [14]:
# with 32 dask workers
n_files = [1,2,4,8,16,32,64,128,256]
times = []
for n in n_files:
    catalog = ParquetCatalog(files[:n])
    times.append(f.test(catalog))


Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 10.30s, length=1214434.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 5.30s, length=2428868.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 6.12s, length=4857736.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 7.32s, length=9715472.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 9.46s, length=19430944.  Type=<class 'dask.dataframe.core.Series'>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://141.142.237.49:9624 remote=tcp://141.142.237.49:8786>
tornado.application - ERROR - Exception in callback None
Traceback (most recent call last):
  File "/home/tmorton/.conda/envs/my_py3/lib/python3.6/site-packages/tornado/ioloop.py", line 887, in start
    fd_obj, handler_func = self._handlers[fd]
KeyError: 65
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 11.97s, length=38861888.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 23.68s, length=77723776.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 54.01s, length=155447552.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 117.99s, length=310895104.  Type=<class 'dask.dataframe.core.Series'>

In [41]:
# 16 workers, without specifying nthreads=1
n_files = [1,2,4,8,16,32,64,128,256]
times = []
for n in n_files:
    catalog = ParquetCatalog(files[:n])
    times.append(f.test(catalog))


distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://141.142.237.49:47212 remote=tcp://141.142.237.49:8786>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://141.142.237.49:47214 remote=tcp://141.142.237.49:8786>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://141.142.237.49:47216 remote=tcp://141.142.237.49:8786>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 12.27s, length=1214434.  Type=<class 'dask.dataframe.core.Series'>
tornado.application - ERROR - Exception in callback None
Traceback (most recent call last):
  File "/home/tmorton/.conda/envs/my_py3/lib/python3.6/site-packages/tornado/ioloop.py", line 887, in start
    fd_obj, handler_func = self._handlers[fd]
KeyError: 64
tornado.application - ERROR - Exception in callback None
Traceback (most recent call last):
  File "/home/tmorton/.conda/envs/my_py3/lib/python3.6/site-packages/tornado/ioloop.py", line 887, in start
    fd_obj, handler_func = self._handlers[fd]
KeyError: 70
tornado.application - ERROR - Exception in callback None
Traceback (most recent call last):
  File "/home/tmorton/.conda/envs/my_py3/lib/python3.6/site-packages/tornado/ioloop.py", line 887, in start
    fd_obj, handler_func = self._handlers[fd]
KeyError: 66
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 5.56s, length=2428868.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 6.14s, length=4857736.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 7.66s, length=9715472.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 7.56s, length=19430944.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 15.07s, length=38861888.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 29.04s, length=77723776.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 60.67s, length=155447552.  Type=<class 'dask.dataframe.core.Series'>
Test results for (mag_modelfit_CModel_flux - mag_base_PsfFlux_flux):
  Took 125.02s, length=310895104.  Type=<class 'dask.dataframe.core.Series'>

In [26]:
catalog = ParquetCatalog(files[:64], client=client)

In [33]:
f1 = MagDiff('modelfit_CModel', 'base_PsfFlux')
f2 = DeconvolvedMoments()
# y1 = f1(catalog)

In [28]:
%time df = catalog.get_columns(f1.columns)


CPU times: user 13 s, sys: 45.4 ms, total: 13 s
Wall time: 15.3 s

In [31]:
df.head()


Out[31]:
modelfit_CModel_flux base_PsfFlux_flux
id
53158034708430850 NaN 8.047405e-11
53158034708430851 4.617162e-10 2.496430e-10
53158034708430852 1.449521e-10 8.462506e-11
53158034708430853 6.169326e-11 5.961625e-11
53158034708430854 4.629501e-10 2.725458e-10

In [34]:
%time catalog.get_columns(f2.columns)


CPU times: user 14.3 s, sys: 319 ms, total: 14.6 s
Wall time: 15.3 s

In [35]:
df2.head()


Out[35]:
ext_shapeHSM_HsmSourceMoments_xx ext_shapeHSM_HsmSourceMoments_yy base_SdssShape_xx base_SdssShape_yy ext_shapeHSM_HsmPsfMoments_xx ext_shapeHSM_HsmPsfMoments_yy
id
53158034708430850 NaN NaN 1.088979 0.127574 1.100241 1.165169
53158034708430851 1.573842 2.554046 1.580069 2.569499 1.098099 1.164787
53158034708430852 2.688466 1.494553 2.504668 1.434881 1.092501 1.164131
53158034708430853 NaN NaN 0.083333 0.083333 1.235970 1.256991
53158034708430854 2.683044 1.672739 2.662576 1.666592 1.089752 1.163285

In [36]:
df3 = df.join(df2)

In [37]:
df3.head()


Out[37]:
modelfit_CModel_flux base_PsfFlux_flux ext_shapeHSM_HsmSourceMoments_xx ext_shapeHSM_HsmSourceMoments_yy base_SdssShape_xx base_SdssShape_yy ext_shapeHSM_HsmPsfMoments_xx ext_shapeHSM_HsmPsfMoments_yy
id
53158034708430850 NaN 8.047405e-11 NaN NaN 1.088979 0.127574 1.100241 1.165169
53158034708430851 4.617162e-10 2.496430e-10 1.573842 2.554046 1.580069 2.569499 1.098099 1.164787
53158034708430852 1.449521e-10 8.462506e-11 2.688466 1.494553 2.504668 1.434881 1.092501 1.164131
53158034708430853 6.169326e-11 5.961625e-11 NaN NaN 0.083333 0.083333 1.235970 1.256991
53158034708430854 4.629501e-10 2.725458e-10 2.683044 1.672739 2.662576 1.666592 1.089752 1.163285

In [46]:
%time df = client.persist(dd.read_parquet(files[:16]))


CPU times: user 3.34 s, sys: 37 ms, total: 3.37 s
Wall time: 3.47 s

In [47]:
%time df[f1.columns].head()


CPU times: user 11 ms, sys: 0 ns, total: 11 ms
Wall time: 27.5 ms
Out[47]:
modelfit_CModel_flux base_PsfFlux_flux
index
0 1.091596e-10 1.090838e-10
1 1.062041e-10 1.062667e-10
2 7.338646e-11 3.957935e-11
3 1.509706e-10 1.043840e-10
4 5.588762e-11 5.143294e-11

In [49]:
df.columns


Out[49]:
Index(['id', 'coord_ra', 'coord_dec', 'parent', 'deblend_nChild',
       'base_TransformedCentroid_x', 'slot_Centroid_x',
       'base_TransformedCentroid_y', 'slot_Centroid_y',
       'base_TransformedCentroid_flag',
       ...
       'ext_shapeHSM_HsmSourceMoments_yy', 'ext_shapeHSM_HsmSourceMoments_xy',
       'ext_shapeHSM_HsmPsfMoments_xx', 'ext_shapeHSM_HsmPsfMoments_yy',
       'ext_shapeHSM_HsmPsfMoments_xy', 'calib_psfUsed', 'calib_psfCandidate',
       'base_SdssCentroid_flag', 'base_Footprint_nPix',
       'base_Footprint_nPix_flag'],
      dtype='object', length=449)

In [50]:
all(c in df.columns for c in f2.columns)


Out[50]:
True

In [52]:
%time df[list(f2.columns)].head()


CPU times: user 10.9 ms, sys: 1 ms, total: 11.9 ms
Wall time: 57.7 ms
Out[52]:
ext_shapeHSM_HsmSourceMoments_xx ext_shapeHSM_HsmSourceMoments_yy base_SdssShape_xx base_SdssShape_yy ext_shapeHSM_HsmPsfMoments_xx ext_shapeHSM_HsmPsfMoments_yy
index
0 NaN NaN 0.083333 0.083333 1.147312 1.150524
1 1.732549 1.829507 1.724967 1.804357 1.183368 1.136575
2 NaN NaN 0.083333 0.083333 1.164925 1.142803
3 2.495500 2.906507 3.446043 2.699839 1.154973 1.146689
4 NaN NaN NaN NaN 1.162071 1.143402

In [5]:
%time df = dd.read_parquet(files[:64], columns=f1.columns)


CPU times: user 13.4 s, sys: 126 ms, total: 13.5 s
Wall time: 14 s

In [8]:
%prun dd.read_parquet(files[:64], columns=f1.columns)


 

In [9]:
%prun client.persist(dd.read_parquet(files[:64], columns=f1.columns))


 

In [16]:
%time df = client.persist(dd.read_parquet(files[:16], columns=f1.columns))


CPU times: user 2.95 s, sys: 9.3 ms, total: 2.96 s
Wall time: 3.07 s

In [17]:
import fastparquet
import pandas as pd

In [20]:
%%time
dfs = []
for f in files[:16]:
    pfile = fastparquet.ParquetFile(f)
    dfs.append(pfile.to_pandas(columns=f1.columns))
df = pd.concat(dfs)


CPU times: user 5.33 s, sys: 460 ms, total: 5.79 s
Wall time: 9.97 s

In [21]:
%%time
df = client.persist(dd.read_parquet(files[:16], columns=f1.columns))


CPU times: user 2.88 s, sys: 13.9 ms, total: 2.89 s
Wall time: 2.93 s

In [22]:
len(df)


Out[22]:
19614112

In [ ]:
res = f1(catalog)

In [23]:
import dask.dataframe as dd
import pandas as pd

In [25]:
df = dd.from_pandas(pd.DataFrame({'y1':y1.result(), 'y2':y2.result()}))


---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-25-e8ae5d07aee4> in <module>()
----> 1 df = dd.from_pandas(pd.DataFrame({'y1':y1.result(), 'y2':y2.result()}))

~/.conda/envs/my_py3/lib/python3.6/site-packages/dask/dataframe/io/io.py in from_pandas(data, npartitions, chunksize, sort, name)
    170 
    171     if ((npartitions is None) == (chunksize is None)):
--> 172         raise ValueError('Exactly one of npartitions and chunksize must be specified.')
    173 
    174     nrows = len(data)

ValueError: Exactly one of npartitions and chunksize must be specified.

In [29]:
client.ncores()


Out[29]:
{'tcp://141.142.237.49:10225': 1,
 'tcp://141.142.237.49:10244': 1,
 'tcp://141.142.237.49:11228': 1,
 'tcp://141.142.237.49:11841': 1,
 'tcp://141.142.237.49:11859': 1,
 'tcp://141.142.237.49:12959': 1,
 'tcp://141.142.237.49:13371': 1,
 'tcp://141.142.237.49:13704': 1,
 'tcp://141.142.237.49:1578': 1,
 'tcp://141.142.237.49:17960': 1,
 'tcp://141.142.237.49:19503': 1,
 'tcp://141.142.237.49:20506': 1,
 'tcp://141.142.237.49:20994': 1,
 'tcp://141.142.237.49:21194': 1,
 'tcp://141.142.237.49:23175': 1,
 'tcp://141.142.237.49:23976': 1,
 'tcp://141.142.237.49:24639': 1,
 'tcp://141.142.237.49:24744': 1,
 'tcp://141.142.237.49:2550': 1,
 'tcp://141.142.237.49:25547': 1,
 'tcp://141.142.237.49:25619': 1,
 'tcp://141.142.237.49:31423': 1,
 'tcp://141.142.237.49:31917': 1,
 'tcp://141.142.237.49:32327': 1,
 'tcp://141.142.237.49:3397': 1,
 'tcp://141.142.237.49:3461': 1,
 'tcp://141.142.237.49:4020': 1,
 'tcp://141.142.237.49:5831': 1,
 'tcp://141.142.237.49:6884': 1,
 'tcp://141.142.237.49:8186': 1,
 'tcp://141.142.237.49:9088': 1}
distributed.client - WARNING - Client report stream closed to scheduler
distributed.client - WARNING - Client report stream closed to scheduler

In [ ]: