In [1]:
from __future__ import division, print_function
import os
import sys
from numbers import Number
import numpy as np
import pandas as pd
import healpy as hp
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter
import seaborn as sns
from dask import delayed, compute
from dask.diagnostics import ProgressBar
import dask.array as da
import pyunfold
from icecube import astro
import comptools as comp
import sky_anisotropy as sa
color_dict = comp.color_dict
%matplotlib inline
In [2]:
%load_ext line_profiler
In [3]:
from dask.distributed import Client, LocalCluster
In [4]:
cluster = LocalCluster(n_workers=5, threads_per_worker=5, diagnostics_port=8787)
client = Client(cluster)
In [5]:
client
Out[5]:
In [7]:
config = 'IC86.2012'
num_groups = 2
comp_list = comp.get_comp_list(num_groups=num_groups)
energybins = comp.get_energybins(config)
num_ebins = len(energybins.log_energy_midpoints)
In [8]:
feature_list, feature_labels = comp.get_training_features()
In [ ]:
print('Loading data into memory...')
df_data = comp.load_data(config=config,
energy_reco=True,
log_energy_min=6.1,
log_energy_max=8.0,
# columns=['start_time_mjd', 'lap_ra', 'lap_dec'],
columns=feature_list + ['start_time_mjd', 'lap_ra', 'lap_dec'],
n_jobs=None,
verbose=True)
In [103]:
stacked = np.hstack((zenith[:10, None], azimuth[:10, None]))
stacked
Out[103]:
In [113]:
for i in np.array_split(stacked, 4):
print(i[:, 0], i[:, 1])
In [133]:
def make_reference_skymap(zenith, azimuth, time, num_resamples=20, nside=64, random_state=2, verbose=False):
np.random.seed(random_state)
# assert len(set([zenith.shape, azimuth.shape, time.shape])) == 1
stacked = np.hstack((zenith[:, None], azimuth[:, None]))
print(stacked.shape)
batches = min(1000, len(stacked) / 10)
pix_indices = [delayed(get_pixel_values)(i[:, 0], i[:, 1], time, num_resamples=num_resamples, nside=nside, random_state=random_state, verbose=verbose)
for i in np.array_split(stacked, 50)]
with ProgressBar():
pix_indices = compute(*pix_indices)
pix_indices = np.asarray(pix_indices).flatten()
npix = hp.nside2npix(nside)
reference_map = np.zeros(npix, dtype=float)
print(pix_indices.shape)
reference_map[pix_indices] += 1
return reference_map
def get_pixel_values(zenith, azimuth, time, num_resamples=20, nside=64, random_state=2, verbose=False):
npix = hp.nside2npix(nside)
reference_map = np.zeros(npix, dtype=float)
pix_indices = []
for local_zenith, local_azimuth in zip(zenith, azimuth):
# Reference skymap
rand_times = np.random.choice(time, size=num_resamples)
ra, dec = astro.dir_to_equa(local_zenith, local_azimuth, rand_times)
theta, phi = sa.equatorial_to_healpy(ra, dec)
pix = hp.ang2pix(nside, theta, phi)
pix_indices.append(pix)
return np.array(pix_indices).flatten()
In [134]:
zenith = df_data['lap_ra'].values
azimuth = df_data['lap_dec'].values
time = df_data['start_time_mjd'].values
In [142]:
time_future = client.scatter(time)
reference_map = make_reference_skymap(zenith[:1000], azimuth[:1000], time_future,
num_resamples=1,
random_state=4)
In [143]:
reference_map.sum()
Out[143]:
In [121]:
stacked = np.hstack((zenith[:1000000, None], azimuth[:1000000, None]))
stacked
Out[121]:
In [75]:
time_future = client.scatter(time)
futures = []
for i in np.array_split(stacked, 1000):
z = i[:, 0]
a = i[:, 1]
f = client.submit(get_pixel_values, z, a, time_future,
num_resamples=1,
random_state=idx)
futures.append(f)
In [44]:
time_future = client.scatter(time)
pixels_future = client.submit(get_pixel_values, zenith[:1000], azimuth[:1000], time_future,
num_resamples=1,
random_state=2000)
In [73]:
np.array(client.gather(futures)).flatten()
Out[73]:
In [37]:
del p
In [52]:
%lprun -f make_reference_skymap -f get_pixel_values make_reference_skymap(zenith[:100], azimuth[:100], time, num_resamples=1, random_state=2)
In [53]:
reference_map.sum()
Out[53]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [79]:
from time import sleep
import random
In [80]:
@delayed
def inc(x):
return x + 1
@delayed
def double(x):
return x + 2
@delayed
def add(x, y):
return x + y
In [81]:
data = [1, 2, 3, 4, 5]
In [83]:
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
total = delayed(sum)(output)
In [84]:
total
Out[84]:
In [85]:
total.compute()
Out[85]:
In [ ]: