Dask demo notebook

This notebook shows a demonstration of how to use dask and CMAC2.0 on a JupyterHub node.


In [1]:
import warnings
warnings.filterwarnings('ignore')
import dask.bag as db
import pyart
import importlib
import netCDF4
import os
import subprocess
import sys
import imageio
import numpy as np

from glob import glob
from cmac import cmac, quicklooks, get_sounding_times, get_sounding_file_name, config
from IPython.display import Image, display
from dask_jobqueue import PBSCluster
from datetime import datetime
from distributed import Client
%matplotlib inline


## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
##     JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119


In [2]:
from dask_jobqueue import PBSCluster
from dask.distributed import Client, metrics, wait
# wait for jobs to arrive, depending on the queue, this may take some time
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler, progress

This subroutine, will do both the processing and plotting for one radar file. A more sophisicated version of this is contained within the cmac_dask script.


In [47]:
def run_cmac_and_plotting(radar_file_path, cmac_config, sonde_times,
                          sounding_files, clutter_file_path, 
                          out_path, img_directory, sweep=0, dd_lobes=False):
    """ For dask we need the radar plotting routines all in one subroutine. """
    try:
        radar = pyart.io.read(radar_file_path)
    except TypeError:
        print(radar_file_path + ' has encountered TypeError!')
        return

    
    radar_start_date = netCDF4.num2date(radar.time['data'][0],
                                        radar.time['units'], 
                                        only_use_cftime_datetimes=False, only_use_python_datetimes=True)
    year_str = "%04d" % radar_start_date.year
    month_str = "%02d" % radar_start_date.month
    day_str = "%02d" % radar_start_date.day
    hour_str = "%02d" % radar_start_date.hour
    minute_str = "%02d" % radar_start_date.minute
    second_str = "%02d" % radar_start_date.second
    save_name = cmac_config['save_name']
    file_name = (out_path + year_str + month_str + '/' + save_name + '.'
                 + year_str + month_str + day_str + '.' + hour_str
                 + minute_str + second_str + '.nc')
    if os.path.exists(file_name):
        del radar
        return
    rad_time = datetime.strptime(radar.time["units"][0:33], "seconds since %Y-%m-%d %H:%M:%S")
    # Load clutter files.
    clutter = pyart.io.read(
        clutter_file_path+'clutter_corcsapr2cfrppiM1.a1'
        + '.' + year_str + month_str + day_str + '.' + hour_str
        + minute_str + second_str + '.nc')
    clutter_field_dict = clutter.fields['ground_clutter']
    radar.add_field(
        'ground_clutter', clutter_field_dict, replace_existing=True)
    del clutter
    sonde_index = np.argmin(np.abs(sonde_times - rad_time))
    sounding_file = sounding_files[sonde_index]
    # Retrieve closest sonde in time to the time of the radar file.
    sonde = netCDF4.Dataset(sounding_file)
    # Running the cmac code to produce a cmac_radar object.
    try:
        cmac_radar = cmac(radar, sonde, 'cacti_csapr2_ppi')
    except ValueError:
        del radar
        sonde.close()
        return
    # Free up some memory.
    del radar
    sonde.close()

    # Produce the cmac_radar file from the cmac_radar object.
    pyart.io.write_cfradial(file_name, cmac_radar)
    print('## A CMAC radar object has been created at ' + file_name)

    if not os.path.exists(img_directory):
        os.makedirs(img_directory)
        subprocess.call('chmod -R g+rw ' + img_directory, shell=True)

    # Producing all the cmac_radar quicklooks.
    quicklooks(cmac_radar, 'cacti_csapr2_ppi',
               dd_lobes=False, image_directory=img_directory)

    # Delete the cmac_radar object and move on to the next radar file.
    del cmac_radar
    return

In [35]:
import os

In [37]:
os.path.exists


Out[37]:
<function genericpath.exists(path)>

We have to start our dask cluster. Normally, the script to do this on stratus is in qsub_xsapr. The dask-scheduler has to be running on one compute node and the dask-workers on the other computer nodes. However, since we are only on one node, we can just use multiprocessing to do our work for us.

CMAC2.0 has different dictionaries that specify various configurations for the 3 XSAPRs in SGP. Here we will load i5 since the demo data is from XSAPR i5.


In [5]:
meta_config = config.get_metadata('cacti_csapr2_ppi')
cmac_config = config.get_cmac_values('cacti_csapr2_ppi')
field_config = config.get_field_names('cacti_csapr2_ppi')

We can then use a dask bag to map the radar list into distributed memory and then execute the processing code on each file using .map().compute() on the bag


In [6]:
soundings_directory = '/lustre/or-hydra/cades-arm/proj-shared/corsondewnpnM1.b1/'
radar_directory = '/lustre/or-hydra/cades-arm/proj-shared/corcsapr2cfrppiM1.a1/'
clutter_directory = '/lustre/or-hydra/cades-arm/proj-shared/csapr2_clutter/'
radar_files = glob(radar_directory + '*/*', recursive=True)
sounding_files = glob(soundings_directory + '*')
#clutter_files = glob(clutter_directory)

img_directory = '/lustre/or-hydra/cades-arm/proj-shared/cacticsapr2cmacppi.c1.png'
out_path = '/lustre/or-hydra/cades-arm/proj-shared/cacticsapr2cmacppi.c1/'
log_path = '/lustre/or-hydra/cades-arm/proj-shared/csapr_log/'
#sonde_file = '/home/rjackson/i5_test_data/sgpsondewnpnC1.b1.20170926.113600.cdf'
#clutter_file_path = '/home/rjackson/clutter201709.nc'

In [7]:
radar_files.sort()
sounding_files.sort()
#clutter_files.sort()

In [8]:
def parse_sonde_times(file_list):
    time_list = []
    for name in file_list:
        time_list.append(datetime.strptime(name.split('/')[-1], 'corsondewnpnM1.b1.%Y%m%d.%H%M%S.cdf'))
    return np.array(time_list)
        
#def parse_clutter_times(file_list):
 #   time_list = []
  #  for name in file_list:
   #     time_list.append(datetime.strptime(name.split('/')[-1], 'clutter_corcsapr2cfrppiM1.a1.%Y%m%d.%H%M%S.nc'))
    #return np.array(time_list)

sonde_times = parse_sonde_times(sounding_files)
#clutter_times = parse_clutter_times(clutter_files)

In [9]:
#clutter_times.sort()
sonde_times.sort()

In [10]:
len(radar_files)


Out[10]:
7732

In [ ]:
rad_file = radar_files[0]
radar = pyart.io.read(rad_file)
rad_time = datetime.strptime(radar.time["units"][0:33], "seconds since %Y-%m-%d %H:%M:%S")

In [ ]:
sonde_index = np.argmin(np.abs(sonde_times - rad_time))
sounding_file = sounding_files[sonde_index]

In [ ]:
clutter_index = np.argmin(np.abs(clutter_times - rad_time))
clutter_file = clutter_files[clutter_index]

In [ ]:
print(sounding_file)
print(clutter_file)

In [ ]:
def run_cmac_from_index(rad_file):
    radar = pyart.io.read(rad_file)
    rad_time = datetime.strptime(radar.time["units"][0:33], "seconds since %Y-%m-%d %H:%M:%S")
    del radar
    sonde_index = np.argmin(np.abs(sonde_times - rad_time))
    sounding_file = sounding_files[sonde_index]
    clutter_file = clutter_files[i]
    
    #f= open(os.path.join(log_path,"{0}.txt".format(str(i).zfill(4))),"w+")
    #f.write('Radar file: {0}\n'.format(rad_file))
    #f.write('Sounding file: {0}\n'.format(sounding_file))
    #f.write('Clutter file: {0}\n'.format(clutter_file))
    #f.close()
    print(rad_file)
    print(clutter_file)
    run_cmac_and_plotting(rad_file, 'cacti_csapr2_ppi',
                          sounding_file, clutter_file, 
                          out_path, img_directory)

In [48]:
#cluster = PBSCluster(name='dask-worker', memory='270GB', cores=36, processes=6, interface='ib0', queue='high_mem', project='arm',
#                    walltime='00:30:00')#, job-extra=['-W group_list=cades-arm'])
cluster1 = PBSCluster(processes=9, cores=36, walltime='14:00:00', memory='160GB',
                      name='dask-worker', interface='ib0', queue='arm_high_mem', project='arm',
                      job_extra=['-W group_list=cades-arm'])
cluster1.scale(4*9)         # Ask for ten workers
client1 = Client(cluster1)  # Connect this local process to remote workers

In [49]:
client1


Out[49]:

Client

Cluster

  • Workers: 36
  • Cores: 144
  • Memory: 640.08 GB

In [50]:
the_bag = db.from_sequence(radar_files)
the_function = lambda x: run_cmac_and_plotting(x, cmac_config, sonde_times,
                      sounding_files, clutter_directory, 
                      out_path, img_directory, sweep=3, dd_lobes=False)

futures1 = the_bag.map(the_function)

In [51]:
progress(futures1.compute())


---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-51-cca63e63ed0f> in <module>
----> 1 progress(futures1.compute())

~/.conda/envs/cmac_env/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         """
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168 

~/.conda/envs/cmac_env/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    435     keys = [x.__dask_keys__() for x in collections]
    436     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437     results = schedule(dsk, keys, **kwargs)
    438     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    439 

~/.conda/envs/cmac_env/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2593                     should_rejoin = False
   2594             try:
-> 2595                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2596             finally:
   2597                 for f in futures.values():

~/.conda/envs/cmac_env/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1892                 direct=direct,
   1893                 local_worker=local_worker,
-> 1894                 asynchronous=asynchronous,
   1895             )
   1896 

~/.conda/envs/cmac_env/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    776         else:
    777             return sync(
--> 778                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    779             )
    780 

~/.conda/envs/cmac_env/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    346     if error[0]:
    347         typ, exc, tb = error[0]
--> 348         raise exc.with_traceback(tb)
    349     else:
    350         return result[0]

~/.conda/envs/cmac_env/lib/python3.7/site-packages/distributed/utils.py in f()
    330             if callback_timeout is not None:
    331                 future = asyncio.wait_for(future, callback_timeout)
--> 332             result[0] = yield future
    333         except Exception as exc:
    334             error[0] = sys.exc_info()

~/.conda/envs/cmac_env/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/.conda/envs/cmac_env/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1751                             exc = CancelledError(key)
   1752                         else:
-> 1753                             raise exception.with_traceback(traceback)
   1754                         raise exc
   1755                     if errors == "skip":

KilledWorker: ("('lambda-285a1b9e40f787b01ec8852b357eabf1', 8)", <Worker 'tcp://10.23.217.94:43976', name: 0-4, memory: 0, processing: 4>)

In [46]:
!qselect -u zsherman | xargs qdel

In [ ]:
run_cmac_and_plotting(radar_files[0], cmac_config, sonde_times,
                      sounding_files, clutter_directory, 
                      out_path, img_directory, sweep=3, dd_lobes=False)

In [ ]:
rad_file = radar_files[540]
radar = pyart.io.read(rad_file)

In [ ]:
rad_time = datetime.strptime(radar.time["units"][0:33], "seconds since %Y-%m-%d %H:%M:%S")
sonde_index = np.argmin(np.abs(sonde_times - rad_time))
sounding_file = sounding_files[sonde_index]
clutter_index = np.argmin(np.abs(clutter_times - rad_time))
clutter_file = clutter_files[clutter_index]

In [ ]:
clutter = pyart.io.read(clutter_file)
clutter_field_dict = clutter.fields['ground_clutter']

In [ ]:
clutter_field_dict['data'].shape

In [ ]:
for key in radar.fields.keys():
    print(radar.fields[key]['data'].shape)

WHY am i getting an invalid shape error? Where is the shape (5402,1110) coming from?


In [ ]:
prof.visualize()

In [ ]:
# im_files = glob('/home/rjackson/xsapr_imgs/reflectivity*.png')
# im_files.sort()
# images = []
# for filename in im_files:
#     images.append(imageio.imread(filename))
# imageio.mimsave('refl_animation.gif', images, duration=0.5)

# with open('refl_animation.gif','rb') as f:
#     display(Image(f.read()))

In [ ]:
im_files = glob('/lcrc/group/earthscience/icrisologo/csapr_imgs/masked_corrected_reflectivity*.png')
im_files.sort()
images = []
for filename in im_files:
    images.append(imageio.imread(filename))
imageio.mimsave('refl_animation.gif', images, duration=0.5)

with open('refl_animation.gif','rb') as f:
    display(Image(f.read()))

In [ ]: