Dask demo notebook

This notebook shows a demonstration of how to use dask and CMAC2.0 on a JupyterHub node.


In [5]:
import warnings
warnings.filterwarnings('ignore')
import dask.bag as db
import pyart
import importlib
import netCDF4
import os
import subprocess
import sys
import imageio

from glob import glob
from cmac import cmac, quicklooks, get_sounding_times, get_sounding_file_name
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from IPython.display import Image, display
%matplotlib inline

This subroutine, will do both the processing and plotting for one radar file. A more sophisicated version of this is contained within the cmac_dask script.


In [2]:
def run_cmac_and_plotting(radar_file_path, radar_config,
                          sounde_file, clutter_file_path, sonde_file,
                          out_path, img_directory, sweep=0, dd_lobes=False):
    """ For dask we need the radar plotting routines all in one subroutine. """
    try:
        radar = pyart.io.read(radar_file_path)
    except TypeError:
        print(radar_file_path + ' has encountered TypeError!')
        return

    # Defining variables based on the argument args.config_dict.
    save_name = radar_config['save_name']
    town = radar_config['town']
    alt = radar_config['site_alt']
    field_shape = radar_config['field_shape']
    max_lat = radar_config['max_lat']
    min_lat = radar_config['min_lat']
    max_lon = radar_config['max_lon']
    min_lon = radar_config['min_lon']

    facility = radar_config['facility']
    if dd_lobes is True:
        if facility == 'I4':
            dms_radar1_coords = [radar_config['site_i4_dms_lon'],
                                 radar_config['site_i4_dms_lat']]
            dms_radar2_coords = [radar_config['site_i5_dms_lon'],
                                 radar_config['site_i5_dms_lat']]
        elif facility == 'I5':
            dms_radar1_coords = [radar_config['site_i5_dms_lon'],
                                 radar_config['site_i5_dms_lat']]
            dms_radar2_coords = [radar_config['site_i4_dms_lon'],
                                 radar_config['site_i4_dms_lat']]
        elif facility == 'I6':
            dms_radar1_coords = [radar_config['site_i6_dms_lon'],
                                 radar_config['site_i6_dms_lat']]
            dms_radar2_coords = [radar_config['site_i4_dms_lon'],
                                 radar_config['site_i4_dms_lat']]
    else:
        dms_radar1_coords = None
        dms_radar2_coords = None

    radar_start_date = netCDF4.num2date(radar.time['data'][0],
                                        radar.time['units'])
    year_str = "%04d" % radar_start_date.year
    month_str = "%02d" % radar_start_date.month
    day_str = "%02d" % radar_start_date.day
    hour_str = "%02d" % radar_start_date.hour
    minute_str = "%02d" % radar_start_date.minute
    second_str = "%02d" % radar_start_date.second

    file_name = (out_path + '/' + save_name + '.'
                 + year_str + month_str + day_str + '.' + hour_str
                 + minute_str + second_str + '.nc')

    # Load clutter files.
    clutter = pyart.io.read(clutter_file_path)
    clutter_field_dict = clutter.fields['xsapr_clutter']
    radar.add_field(
        'xsapr_clutter', clutter_field_dict, replace_existing=True)
    del clutter

    # Retrieve closest sonde in time to the time of the radar file.
    sonde = netCDF4.Dataset(sonde_file)

    # Running the cmac code to produce a cmac_radar object.
    cmac_radar = cmac(radar, sonde, facility, town, alt, verbose=False)

    # Free up some memory.
    del radar
    sonde.close()

    # Produce the cmac_radar file from the cmac_radar object.
    pyart.io.write_cfradial(file_name, cmac_radar)
    print('## A CMAC radar object has been created at ' + file_name)

    if not os.path.exists(img_directory):
        os.makedirs(img_directory)
        subprocess.call('chmod -R g+rw ' + img_directory, shell=True)

    # Producing all the cmac_radar quicklooks.
    quicklooks(cmac_radar, save_name,
               image_directory=img_directory,
               sweep=sweep, max_lat=max_lat,
               min_lat=min_lat, max_lon=max_lon,
               min_lon=min_lon, dd_lobes=dd_lobes,
               dms_radar1_coords=dms_radar1_coords,
               dms_radar2_coords=dms_radar2_coords)

    # Delete the cmac_radar object and move on to the next radar file.
    del cmac_radar
    return

We have to start our dask cluster. Normally, the script to do this on stratus is in qsub_xsapr. The dask-scheduler has to be running on one compute node and the dask-workers on the other computer nodes. However, since we are only on one node, we can just use multiprocessing to do our work for us.

CMAC2.0 has different dictionaries that specify various configurations for the 3 XSAPRs in SGP. Here we will load i5 since the demo data is from XSAPR i5.


In [4]:
# Load radar configuration (here, it is I5)
config_module = importlib.import_module('cmac.config')
radar_config = getattr(config_module, 'config_xsapr_i5')

We can then use a dask bag to map the radar list into distributed memory and then execute the processing code on each file using .map().compute() on the bag


In [5]:
radar_files = glob('/home/rjackson/i5_test_data/*.maint')
img_directory = '/home/rjackson/xsapr_imgs/'
out_path = '/home/rjackson/xsapr_files/'
sonde_file = '/home/rjackson/i5_test_data/sgpsondewnpnC1.b1.20170926.113600.cdf'
clutter_file_path = '/home/rjackson/clutter201709.nc'

In [ ]:
the_bag = db.from_sequence(radar_files)

# Wrap your function into a lambda function so that .map can use it!
the_function = lambda x: run_cmac_and_plotting(x, radar_config,
                          sonde_file, clutter_file_path, sonde_file,
                          out_path, img_directory)
with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof, CacheProfiler() as cprof:
    result = the_bag.map(the_function).compute()


## 2017-09-26 21:42:14.272000
## Adding radar fields...
2
4104.0
Unfolding
Exec time:  14.689172744750977
Doing  0
Doing  1
Doing  2
Doing  3
Doing  4
Doing  5
Doing  6
Doing  7
Doing  8
Doing  9
Doing  10
Doing  11
Doing  12
Doing  13
Doing  14
Doing  15
Doing  16
Doing  17
Doing  18
Doing  19
Doing  20
Doing  21
Doing  22
Doing  23
(8640, 500)
(8640, 501)
##
## All CMAC fields have been added to the radar object.
##
## Appending metadata
## A CMAC radar object has been created at /home/rjackson/xsapr_files//sgpxsaprcmacsurI5.c1.20170926.214214.nc
##
## Keys for each gate id are as follows:
##    0:multi_trip
##    1:rain
##    2:snow
##    3:no_scatter
##    4:melting
##    5:clutter

In [7]:
prof.visualize()

In [14]:
im_files = glob('/home/rjackson/xsapr_imgs/reflectivity*.png')
im_files.sort()
images = []
for filename in im_files:
    images.append(imageio.imread(filename))
imageio.mimsave('refl_animation.gif', images, duration=0.5)

with open('refl_animation.gif','rb') as f:
    display(Image(f.read()))



In [13]:
im_files = glob('/home/rjackson/xsapr_imgs/masked_corrected_reflectivity*.png')
im_files.sort()
images = []
for filename in im_files:
    images.append(imageio.imread(filename))
imageio.mimsave('refl_animation.gif', images, duration=0.5)

with open('refl_animation.gif','rb') as f:
    display(Image(f.read()))



In [ ]: