In [1]:
import dask.dataframe as dd
from distributed import Client
client = Client(scheduler_file='/scratch/tmorton/dask/scheduler.json')
In [3]:
from explorer.catalog import ParquetCatalog
import glob
files = glob.glob('/scratch/tmorton/qa_explorer_data/forced_big_fake*.parq')
catalog = ParquetCatalog(files[:32], client=client)
In [4]:
from explorer.functors import (Mag, MagDiff, CustomFunctor, DeconvolvedMoments, Column,
SdssTraceSize, PsfSdssTraceSizeDiff, HsmTraceSize,
PsfHsmTraceSizeDiff, CompositeFunctor)
funcs_to_test = [CustomFunctor('mag(modelfit_CModel) - mag(base_PsfFlux)'),
MagDiff('modelfit_CModel', 'base_PsfFlux'),
DeconvolvedMoments(),
Column('base_Footprint_nPix'),
SdssTraceSize(),
PsfSdssTraceSizeDiff(),
HsmTraceSize(),
PsfHsmTraceSizeDiff(),
]
In [5]:
f1 = MagDiff('modelfit_CModel', 'base_PsfFlux')
f2 = DeconvolvedMoments()
# f.test(catalog)
In [6]:
%time y1 = f1(catalog)
%time y2 = f2(catalog)
In [7]:
%time y1 = f1(catalog)
%time y2 = f2(catalog)
In [7]:
%time y1.head()
Out[7]:
In [9]:
# 24 dask workers, 1 thread each
n_files = [1,2,4,8,16,32,64,128,256]
times = []
for n in n_files:
catalog = ParquetCatalog(files[:n], client=client)
times.append(f1.test(catalog))
In [9]:
# with 16 dask workers
n_files = [1,2,4,8,16,32,64,128,256]
times = []
for n in n_files:
catalog = ParquetCatalog(files[:n])
times.append(f.test(catalog))
In [14]:
# with 32 dask workers
n_files = [1,2,4,8,16,32,64,128,256]
times = []
for n in n_files:
catalog = ParquetCatalog(files[:n])
times.append(f.test(catalog))
In [41]:
# 16 workers, without specifying nthreads=1
n_files = [1,2,4,8,16,32,64,128,256]
times = []
for n in n_files:
catalog = ParquetCatalog(files[:n])
times.append(f.test(catalog))
In [26]:
catalog = ParquetCatalog(files[:64], client=client)
In [33]:
f1 = MagDiff('modelfit_CModel', 'base_PsfFlux')
f2 = DeconvolvedMoments()
# y1 = f1(catalog)
In [28]:
%time df = catalog.get_columns(f1.columns)
In [31]:
df.head()
Out[31]:
In [34]:
%time catalog.get_columns(f2.columns)
In [35]:
df2.head()
Out[35]:
In [36]:
df3 = df.join(df2)
In [37]:
df3.head()
Out[37]:
In [46]:
%time df = client.persist(dd.read_parquet(files[:16]))
In [47]:
%time df[f1.columns].head()
Out[47]:
In [49]:
df.columns
Out[49]:
In [50]:
all(c in df.columns for c in f2.columns)
Out[50]:
In [52]:
%time df[list(f2.columns)].head()
Out[52]:
In [5]:
%time df = dd.read_parquet(files[:64], columns=f1.columns)
In [8]:
%prun dd.read_parquet(files[:64], columns=f1.columns)
In [9]:
%prun client.persist(dd.read_parquet(files[:64], columns=f1.columns))
In [16]:
%time df = client.persist(dd.read_parquet(files[:16], columns=f1.columns))
In [17]:
import fastparquet
import pandas as pd
In [20]:
%%time
dfs = []
for f in files[:16]:
pfile = fastparquet.ParquetFile(f)
dfs.append(pfile.to_pandas(columns=f1.columns))
df = pd.concat(dfs)
In [21]:
%%time
df = client.persist(dd.read_parquet(files[:16], columns=f1.columns))
In [22]:
len(df)
Out[22]:
In [ ]:
res = f1(catalog)
In [23]:
import dask.dataframe as dd
import pandas as pd
In [25]:
df = dd.from_pandas(pd.DataFrame({'y1':y1.result(), 'y2':y2.result()}))
In [29]:
client.ncores()
Out[29]:
In [ ]: