In [5]:
import warnings
warnings.filterwarnings('ignore')
import dask.bag as db
import pyart
import importlib
import netCDF4
import os
import subprocess
import sys
import imageio
from glob import glob
from cmac import cmac, quicklooks, get_sounding_times, get_sounding_file_name
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from IPython.display import Image, display
%matplotlib inline
This subroutine, will do both the processing and plotting for one radar file. A more sophisicated version of this is contained within the cmac_dask script.
In [2]:
def run_cmac_and_plotting(radar_file_path, radar_config,
sounde_file, clutter_file_path, sonde_file,
out_path, img_directory, sweep=0, dd_lobes=False):
""" For dask we need the radar plotting routines all in one subroutine. """
try:
radar = pyart.io.read(radar_file_path)
except TypeError:
print(radar_file_path + ' has encountered TypeError!')
return
# Defining variables based on the argument args.config_dict.
save_name = radar_config['save_name']
town = radar_config['town']
alt = radar_config['site_alt']
field_shape = radar_config['field_shape']
max_lat = radar_config['max_lat']
min_lat = radar_config['min_lat']
max_lon = radar_config['max_lon']
min_lon = radar_config['min_lon']
facility = radar_config['facility']
if dd_lobes is True:
if facility == 'I4':
dms_radar1_coords = [radar_config['site_i4_dms_lon'],
radar_config['site_i4_dms_lat']]
dms_radar2_coords = [radar_config['site_i5_dms_lon'],
radar_config['site_i5_dms_lat']]
elif facility == 'I5':
dms_radar1_coords = [radar_config['site_i5_dms_lon'],
radar_config['site_i5_dms_lat']]
dms_radar2_coords = [radar_config['site_i4_dms_lon'],
radar_config['site_i4_dms_lat']]
elif facility == 'I6':
dms_radar1_coords = [radar_config['site_i6_dms_lon'],
radar_config['site_i6_dms_lat']]
dms_radar2_coords = [radar_config['site_i4_dms_lon'],
radar_config['site_i4_dms_lat']]
else:
dms_radar1_coords = None
dms_radar2_coords = None
radar_start_date = netCDF4.num2date(radar.time['data'][0],
radar.time['units'])
year_str = "%04d" % radar_start_date.year
month_str = "%02d" % radar_start_date.month
day_str = "%02d" % radar_start_date.day
hour_str = "%02d" % radar_start_date.hour
minute_str = "%02d" % radar_start_date.minute
second_str = "%02d" % radar_start_date.second
file_name = (out_path + '/' + save_name + '.'
+ year_str + month_str + day_str + '.' + hour_str
+ minute_str + second_str + '.nc')
# Load clutter files.
clutter = pyart.io.read(clutter_file_path)
clutter_field_dict = clutter.fields['xsapr_clutter']
radar.add_field(
'xsapr_clutter', clutter_field_dict, replace_existing=True)
del clutter
# Retrieve closest sonde in time to the time of the radar file.
sonde = netCDF4.Dataset(sonde_file)
# Running the cmac code to produce a cmac_radar object.
cmac_radar = cmac(radar, sonde, facility, town, alt, verbose=False)
# Free up some memory.
del radar
sonde.close()
# Produce the cmac_radar file from the cmac_radar object.
pyart.io.write_cfradial(file_name, cmac_radar)
print('## A CMAC radar object has been created at ' + file_name)
if not os.path.exists(img_directory):
os.makedirs(img_directory)
subprocess.call('chmod -R g+rw ' + img_directory, shell=True)
# Producing all the cmac_radar quicklooks.
quicklooks(cmac_radar, save_name,
image_directory=img_directory,
sweep=sweep, max_lat=max_lat,
min_lat=min_lat, max_lon=max_lon,
min_lon=min_lon, dd_lobes=dd_lobes,
dms_radar1_coords=dms_radar1_coords,
dms_radar2_coords=dms_radar2_coords)
# Delete the cmac_radar object and move on to the next radar file.
del cmac_radar
return
We have to start our dask cluster. Normally, the script to do this on stratus is in qsub_xsapr. The dask-scheduler has to be running on one compute node and the dask-workers on the other computer nodes. However, since we are only on one node, we can just use multiprocessing to do our work for us.
CMAC2.0 has different dictionaries that specify various configurations for the 3 XSAPRs in SGP. Here we will load i5 since the demo data is from XSAPR i5.
In [4]:
# Load radar configuration (here, it is I5)
config_module = importlib.import_module('cmac.config')
radar_config = getattr(config_module, 'config_xsapr_i5')
We can then use a dask bag to map the radar list into distributed memory and then execute the processing code on each file using .map().compute() on the bag
In [5]:
radar_files = glob('/home/rjackson/i5_test_data/*.maint')
img_directory = '/home/rjackson/xsapr_imgs/'
out_path = '/home/rjackson/xsapr_files/'
sonde_file = '/home/rjackson/i5_test_data/sgpsondewnpnC1.b1.20170926.113600.cdf'
clutter_file_path = '/home/rjackson/clutter201709.nc'
In [ ]:
the_bag = db.from_sequence(radar_files)
# Wrap your function into a lambda function so that .map can use it!
the_function = lambda x: run_cmac_and_plotting(x, radar_config,
sonde_file, clutter_file_path, sonde_file,
out_path, img_directory)
with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof, CacheProfiler() as cprof:
result = the_bag.map(the_function).compute()
In [7]:
prof.visualize()
In [14]:
im_files = glob('/home/rjackson/xsapr_imgs/reflectivity*.png')
im_files.sort()
images = []
for filename in im_files:
images.append(imageio.imread(filename))
imageio.mimsave('refl_animation.gif', images, duration=0.5)
with open('refl_animation.gif','rb') as f:
display(Image(f.read()))
In [13]:
im_files = glob('/home/rjackson/xsapr_imgs/masked_corrected_reflectivity*.png')
im_files.sort()
images = []
for filename in im_files:
images.append(imageio.imread(filename))
imageio.mimsave('refl_animation.gif', images, duration=0.5)
with open('refl_animation.gif','rb') as f:
display(Image(f.read()))
In [ ]: