In [1]:
%load_ext autoreload
In [2]:
%autoreload 2
In [3]:
import itertools
from collections import OrderedDict
import sky_anisotropy as sa
import numpy as np
import pandas as pd
import xarray as xr
import healpy as hp
import matplotlib.pyplot as plt
from dask.distributed import LocalCluster, Client, progress
from dask.diagnostics import ProgressBar
import dask.array as da
import dask.dataframe as dd
from dask import delayed, compute
from bokeh.io import output_notebook
import comptools as comp
%matplotlib inline
In [4]:
# output_notebook()
In [5]:
# cluster = LocalCluster(n_workers=10,
# threads_per_worker=5,
# diagnostics_port=8787)
# client = Client(cluster)
In [6]:
# client
In [7]:
nside = 64
npix = hp.nside2npix(nside)
In [8]:
# ddf = dd.read_hdf('/data/user/vwiedemann/hawcdata/hawc_2016-2/datasample_10percent.hdf',
# 'dataframe',
# mode='r',
# chunksize=1000000)
ddf = dd.read_hdf('data_dataframe.hdf', 'dataframe', mode='r', chunksize=100000)
In [9]:
ddf
Out[9]:
In [11]:
max_energy = max_energy.persist() # start computation in the background
progress(max_energy) # watch progress
In [13]:
max_energy.compute()
Out[13]:
In [14]:
min_energy = ddf['protonllhEnergy'].min()
min_energy = min_energy.persist() # start computation in the background
progress(min_energy) # watch progress
In [16]:
min_energy.compute()
Out[16]:
In [10]:
# plt.hist(ddf['protonllhEnergy'].values, bins=20)
plt.hist(ddf['reco_log_energy'].values, bins=20)
plt.show()
In [10]:
energy_bins = np.arange(6.1, 8, 0.1)
energy_bins
Out[10]:
In [111]:
plt.hist(ddf['lap_cos_zenith'].values, bins=20)
plt.show()
In [160]:
# zenith_bins = np.linspace(0.85, 1.0, 10)
zenith_bins = np.linspace(0.85, 1.0, 4)
zenith_bins
Out[160]:
In [161]:
col_to_bins = OrderedDict()
# lap_cos_zenith must come before reco_log_energy, but not sure why
col_to_bins['lap_cos_zenith'] = zenith_bins
col_to_bins['reco_log_energy'] = energy_bins
# energy_bins = np.linspace(12, 15, 20)
# col_to_bins = OrderedDict()
# col_to_bins['protonllhEnergy'] = energy_bins
col_to_bins
Out[161]:
In [162]:
data = sa.binned_skymaps(ddf,
col_to_bins=col_to_bins,
ra_col='lap_ra',
dec_col='lap_dec',
nside=nside,
num_workers=25)
In [163]:
data
Out[163]:
In [164]:
data.shape
Out[164]:
In [165]:
data.nbytes / 1e6
Out[165]:
In [167]:
for i in range(len(zenith_bins) - 1):
m = data[i, 5].copy()
m[m == 0] = hp.UNSEEN
comp.plot_skymap(m, polar=True)
plt.show()
In [168]:
size = np.deg2rad(10)
pix = np.arange(npix)
In [169]:
ra_test = np.deg2rad(0)
dec_test = np.deg2rad(-80)
theta_test, phi_test = sa.equatorial_to_healpy(ra_test, dec_test)
vec_test = hp.ang2vec(theta_test, phi_test)
pix_center = hp.ang2pix(nside, theta_test, phi_test)
pix_center
Out[169]:
In [170]:
on_mask = sa.disc_on_region(pix=pix, pix_center=pix_center, size=size, nside=nside)
on_mask
Out[170]:
In [171]:
off_mask = sa.theta_band_off_region(pix=pix, pix_center=pix_center, on_region_mask=on_mask, nside=nside)
off_mask
Out[171]:
In [172]:
on_mask.sum(), off_mask.sum()
Out[172]:
In [173]:
data[..., on_mask].sum(axis=-1).values.shape
Out[173]:
In [174]:
data.shape[-1]
Out[174]:
In [175]:
def region_hist(data, mask):
return data[..., mask].sum(axis=-1)
In [176]:
data.shape, on_mask.shape
Out[176]:
In [177]:
on_mask.ndim
Out[177]:
In [178]:
r = region_hist(data, on_mask)
r
Out[178]:
In [179]:
r.sum(axis=0)
Out[179]:
In [180]:
from sky_anisotropy.core import on_off_chi_squared_single
In [181]:
pix_center
Out[181]:
In [188]:
data.reco_log_energy > 6.4
Out[188]:
In [192]:
on_off_chi_squared_single(data, pix_center, on_region='disc',
size=size, off_region='theta_band',
nside=nside, hist_func=lambda x: x.sum(axis=0)[(x.reco_log_energy > 6.4) & (x.reco_log_energy < 7.8)])
# nside=nside, hist_func=None)
Out[192]:
In [193]:
for col in col_to_bins.keys():
print('col = {}'.format(col))
In [49]:
def digitize_columns(partition, col_to_bins):
for col in col_to_bins.keys():
bins = col_to_bins[col]
partition[col + '_digitized'] = np.digitize(partition[col], bins=bins) - 1
return partition
In [137]:
# meta = ddf.dtypes.to_dict()
# meta['reco_log_energy_digitized'] = np.int64
# meta['lap_cos_zenith_digitized'] = np.int64
# meta
In [54]:
ddf_digitized = ddf.map_partitions(digitize_columns, col_to_bins)
# ddf_digitized = ddf.map_partitions(digitize_columns, col_to_bins, meta=meta)
In [55]:
ddf_digitized
Out[55]:
In [57]:
# def unique_counts(block):
# idx, counts = np.unique(block, return_counts=True)
# return idx, counts
In [97]:
shape = list((len(bins)-1 for bins in col_to_bins.values()))
shape
Out[97]:
In [59]:
bin_idxs = [np.arange(l) for l in shape]
bin_idxs
Out[59]:
In [60]:
print(len(list(itertools.product(*bin_idxs))))
In [61]:
def ang2pix(theta, phi, nside=64):
return hp.ang2pix(nside=nside, theta=theta, phi=phi)
In [88]:
cols = col_to_bins.keys()
maps = []
for idx in itertools.product(*bin_idxs):
bool_masks = list(ddf_digitized['{}_digitized'.format(col)] == i for col, i in zip(cols, idx))
if len(bool_masks) == 1:
mask = bool_masks[0]
else:
mask = da.logical_and(*bool_masks)
theta, phi = sa.equatorial_to_healpy(ddf.loc[mask, 'lap_ra'],
ddf.loc[mask, 'lap_dec'])
ipix = da.map_blocks(ang2pix, theta.values, phi.values)
m, _ = da.histogram(ipix, bins=np.arange(npix + 1))
maps.append(m)
In [89]:
maps
Out[89]:
In [90]:
with ProgressBar():
maps = compute(*maps, num_worker=25, scheduler='threading')
In [91]:
for m in maps[:3]:
hp.mollview(m, title='Example skymap')
hp.graticule(verbose=False)
plt.show()
In [126]:
data = np.zeros(shape + [npix])
In [127]:
for idx, m in zip(itertools.product(*bin_idxs), maps):
data[idx] = m
In [128]:
data[0, 3].shape
Out[128]:
In [133]:
dims = col_to_bins.keys() + ['ipix']
dims
Out[133]:
In [141]:
coords = {}
for col in col_to_bins.keys():
bins = col_to_bins[col]
coords[col] = (bins[1:] + bins[:-1]) / 2
coords
Out[141]:
In [142]:
data = xr.DataArray(data, dims=dims, coords=coords)
print(data)
In [144]:
hp.mollview(data[2, 7], title='Example skymap')
hp.graticule(verbose=False)
plt.show()
In [ ]: