In [1]:
from __future__ import division, print_function
import os
import sys
from numbers import Number
import numpy as np
import pandas as pd
import healpy as hp
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter
import seaborn as sns
from dask import delayed, compute
from dask.diagnostics import ProgressBar
import dask.array as da
import pyunfold

from icecube import astro

import comptools as comp
import sky_anisotropy as sa

color_dict = comp.color_dict

%matplotlib inline


/data/user/jbourbeau/virtualenv/cr-composition_el7/lib/python2.7/site-packages/seaborn/apionly.py:6: UserWarning: As seaborn no longer sets a default style on import, the seaborn.apionly module is deprecated. It will be removed in a future version.
  warnings.warn(msg, UserWarning)

In [2]:
%load_ext line_profiler

In [3]:
from dask.distributed import Client, LocalCluster

In [4]:
cluster = LocalCluster(n_workers=5, threads_per_worker=5, diagnostics_port=8787)
client = Client(cluster)

In [5]:
client


Out[5]:

Client

Cluster

  • Workers: 5
  • Cores: 25
  • Memory: 26.14 GB

In [7]:
config = 'IC86.2012'
num_groups = 2

comp_list = comp.get_comp_list(num_groups=num_groups)
energybins = comp.get_energybins(config)
num_ebins = len(energybins.log_energy_midpoints)

In [8]:
feature_list, feature_labels = comp.get_training_features()

In [ ]:
print('Loading data into memory...')
df_data = comp.load_data(config=config,
                         energy_reco=True,
                         log_energy_min=6.1,
                         log_energy_max=8.0,
#                          columns=['start_time_mjd', 'lap_ra', 'lap_dec'],
                         columns=feature_list + ['start_time_mjd', 'lap_ra', 'lap_dec'],
                         n_jobs=None,
                         verbose=True)


Loading data into memory...

In [103]:
stacked = np.hstack((zenith[:10, None], azimuth[:10, None]))
stacked


Out[103]:
array([[ 1.99483155, -1.32342388],
       [ 2.36813548, -1.08170764],
       [ 3.78556748, -1.17676562],
       [ 5.90600101, -1.27144041],
       [ 4.71149746, -1.23404052],
       [ 5.16640096, -1.22737469],
       [ 2.37528744, -1.42057519],
       [ 5.36860618, -1.29641703],
       [ 0.53741855, -1.54191018],
       [ 5.01993018, -1.40907366]])

In [113]:
for i in np.array_split(stacked, 4):
    print(i[:, 0], i[:, 1])


[1.99483155 2.36813548 3.78556748] [-1.32342388 -1.08170764 -1.17676562]
[5.90600101 4.71149746 5.16640096] [-1.27144041 -1.23404052 -1.22737469]
[2.37528744 5.36860618] [-1.42057519 -1.29641703]
[0.53741855 5.01993018] [-1.54191018 -1.40907366]

In [133]:
def make_reference_skymap(zenith, azimuth, time, num_resamples=20, nside=64, random_state=2, verbose=False):
    
    np.random.seed(random_state)
    
#     assert len(set([zenith.shape, azimuth.shape, time.shape])) == 1
    
    stacked = np.hstack((zenith[:, None], azimuth[:, None]))
    print(stacked.shape)
    
    batches = min(1000, len(stacked) / 10)
    pix_indices = [delayed(get_pixel_values)(i[:, 0], i[:, 1], time, num_resamples=num_resamples, nside=nside, random_state=random_state, verbose=verbose)
                   for i in np.array_split(stacked, 50)]
    with ProgressBar():
        pix_indices = compute(*pix_indices)
    pix_indices = np.asarray(pix_indices).flatten()

    npix = hp.nside2npix(nside)
    reference_map = np.zeros(npix, dtype=float)
    print(pix_indices.shape)
    reference_map[pix_indices] += 1

    return reference_map


def get_pixel_values(zenith, azimuth, time, num_resamples=20, nside=64, random_state=2, verbose=False):
    npix = hp.nside2npix(nside)
    reference_map = np.zeros(npix, dtype=float)
    pix_indices = []
    for local_zenith, local_azimuth in zip(zenith, azimuth):
        # Reference skymap
        rand_times = np.random.choice(time, size=num_resamples)

        ra, dec = astro.dir_to_equa(local_zenith, local_azimuth, rand_times)
        theta, phi = sa.equatorial_to_healpy(ra, dec)
        pix = hp.ang2pix(nside, theta, phi)
        pix_indices.append(pix)
        
    return np.array(pix_indices).flatten()

In [134]:
zenith = df_data['lap_ra'].values
azimuth = df_data['lap_dec'].values
time = df_data['start_time_mjd'].values

In [142]:
time_future = client.scatter(time)
reference_map = make_reference_skymap(zenith[:1000], azimuth[:1000], time_future,
                                      num_resamples=1,
                                      random_state=4)


(1000, 2)
(1000,)

In [143]:
reference_map.sum()


Out[143]:
975.0

In [121]:
stacked = np.hstack((zenith[:1000000, None], azimuth[:1000000, None]))
stacked


Out[121]:
array([[ 1.99483155, -1.32342388],
       [ 2.36813548, -1.08170764],
       [ 3.78556748, -1.17676562],
       ...,
       [ 6.01759998, -1.36413852],
       [ 1.35561904, -1.18228258],
       [ 5.32648   , -1.30117913]])

In [75]:
time_future = client.scatter(time)
futures = []
for i in np.array_split(stacked, 1000):
    z = i[:, 0]
    a = i[:, 1]
    f = client.submit(get_pixel_values, z, a, time_future,
                      num_resamples=1,
                      random_state=idx)
    futures.append(f)

In [44]:
time_future = client.scatter(time)
pixels_future = client.submit(get_pixel_values, zenith[:1000], azimuth[:1000], time_future,
                              num_resamples=1,
                              random_state=2000)

In [73]:
np.array(client.gather(futures)).flatten()


Out[73]:
array([14218,  7299,  5011, ...,  7077, 39430,  2512])

In [37]:
del p

In [52]:
%lprun -f make_reference_skymap -f get_pixel_values make_reference_skymap(zenith[:100], azimuth[:100], time, num_resamples=1, random_state=2)


Timer unit: 1e-06 s

Total time: 0.114654 s
File: <ipython-input-48-7ad2cd282a81>
Function: make_reference_skymap at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     1                                           def make_reference_skymap(zenith, azimuth, time, num_resamples=20, nside=64, random_state=2, verbose=False):
     2                                               
     3         1         10.0     10.0      0.0      np.random.seed(random_state)
     4                                               
     5                                           #     assert len(set([zenith.shape, azimuth.shape, time.shape])) == 1
     6                                               
     7                                               
     8         1     114519.0 114519.0     99.9      pix_indices = get_pixel_values(zenith, azimuth, time, num_resamples=num_resamples, nside=nside, random_state=random_state, verbose=verbose)
     9         1          3.0      3.0      0.0      npix = hp.nside2npix(nside)
    10         1         36.0     36.0      0.0      reference_map = np.zeros(npix, dtype=float)
    11         1         85.0     85.0      0.1      reference_map[pix_indices] += 1
    12                                           
    13         1          1.0      1.0      0.0      return reference_map

Total time: 0.113824 s
File: <ipython-input-48-7ad2cd282a81>
Function: get_pixel_values at line 16

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    16                                           def get_pixel_values(zenith, azimuth, time, num_resamples=20, nside=64, random_state=2, verbose=False):
    17         1          4.0      4.0      0.0      npix = hp.nside2npix(nside)
    18         1         46.0     46.0      0.0      reference_map = np.zeros(npix, dtype=float)
    19         1          1.0      1.0      0.0      pix_indices = []
    20       101        127.0      1.3      0.1      for local_zenith, local_azimuth in zip(zenith, azimuth):
    21                                                   # Reference skymap
    22       100       4104.0     41.0      3.6          rand_times = np.random.choice(time, size=num_resamples)
    23                                           
    24       100     102705.0   1027.0     90.2          ra, dec = astro.dir_to_equa(local_zenith, local_azimuth, rand_times)
    25       100        982.0      9.8      0.9          theta, phi = sa.equatorial_to_healpy(ra, dec)
    26       100       5667.0     56.7      5.0          pix = hp.ang2pix(nside, theta, phi)
    27       100        188.0      1.9      0.2          pix_indices.append(pix)
    28         1          0.0      0.0      0.0      return pix_indices

In [53]:
reference_map.sum()


Out[53]:
1850.0

In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [79]:
from time import sleep
import random

In [80]:
@delayed
def inc(x):
    return x + 1

@delayed
def double(x):
    return x + 2

@delayed
def add(x, y):
    return x + y

In [81]:
data = [1, 2, 3, 4, 5]

In [83]:
output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = delayed(sum)(output)

In [84]:
total


Out[84]:
Delayed('sum-0cb8f1a4-41be-49ac-9689-998e609ce4b2')

In [85]:
total.compute()


Out[85]:
45

In [ ]: