In [1]:
import warnings
warnings.filterwarnings('ignore')
import dask.bag as db
import pyart
import importlib
import netCDF4
import os
import subprocess
import sys
import imageio
import numpy as np
from glob import glob
from cmac import cmac, quicklooks, get_sounding_times, get_sounding_file_name, config
from IPython.display import Image, display
from dask_jobqueue import PBSCluster
from datetime import datetime
from distributed import Client
%matplotlib inline
In [2]:
from dask_jobqueue import PBSCluster
from dask.distributed import Client, metrics, wait
# wait for jobs to arrive, depending on the queue, this may take some time
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler, progress
This subroutine, will do both the processing and plotting for one radar file. A more sophisicated version of this is contained within the cmac_dask script.
In [47]:
def run_cmac_and_plotting(radar_file_path, cmac_config, sonde_times,
sounding_files, clutter_file_path,
out_path, img_directory, sweep=0, dd_lobes=False):
""" For dask we need the radar plotting routines all in one subroutine. """
try:
radar = pyart.io.read(radar_file_path)
except TypeError:
print(radar_file_path + ' has encountered TypeError!')
return
radar_start_date = netCDF4.num2date(radar.time['data'][0],
radar.time['units'],
only_use_cftime_datetimes=False, only_use_python_datetimes=True)
year_str = "%04d" % radar_start_date.year
month_str = "%02d" % radar_start_date.month
day_str = "%02d" % radar_start_date.day
hour_str = "%02d" % radar_start_date.hour
minute_str = "%02d" % radar_start_date.minute
second_str = "%02d" % radar_start_date.second
save_name = cmac_config['save_name']
file_name = (out_path + year_str + month_str + '/' + save_name + '.'
+ year_str + month_str + day_str + '.' + hour_str
+ minute_str + second_str + '.nc')
if os.path.exists(file_name):
del radar
return
rad_time = datetime.strptime(radar.time["units"][0:33], "seconds since %Y-%m-%d %H:%M:%S")
# Load clutter files.
clutter = pyart.io.read(
clutter_file_path+'clutter_corcsapr2cfrppiM1.a1'
+ '.' + year_str + month_str + day_str + '.' + hour_str
+ minute_str + second_str + '.nc')
clutter_field_dict = clutter.fields['ground_clutter']
radar.add_field(
'ground_clutter', clutter_field_dict, replace_existing=True)
del clutter
sonde_index = np.argmin(np.abs(sonde_times - rad_time))
sounding_file = sounding_files[sonde_index]
# Retrieve closest sonde in time to the time of the radar file.
sonde = netCDF4.Dataset(sounding_file)
# Running the cmac code to produce a cmac_radar object.
try:
cmac_radar = cmac(radar, sonde, 'cacti_csapr2_ppi')
except ValueError:
del radar
sonde.close()
return
# Free up some memory.
del radar
sonde.close()
# Produce the cmac_radar file from the cmac_radar object.
pyart.io.write_cfradial(file_name, cmac_radar)
print('## A CMAC radar object has been created at ' + file_name)
if not os.path.exists(img_directory):
os.makedirs(img_directory)
subprocess.call('chmod -R g+rw ' + img_directory, shell=True)
# Producing all the cmac_radar quicklooks.
quicklooks(cmac_radar, 'cacti_csapr2_ppi',
dd_lobes=False, image_directory=img_directory)
# Delete the cmac_radar object and move on to the next radar file.
del cmac_radar
return
In [35]:
import os
In [37]:
os.path.exists
Out[37]:
We have to start our dask cluster. Normally, the script to do this on stratus is in qsub_xsapr. The dask-scheduler has to be running on one compute node and the dask-workers on the other computer nodes. However, since we are only on one node, we can just use multiprocessing to do our work for us.
CMAC2.0 has different dictionaries that specify various configurations for the 3 XSAPRs in SGP. Here we will load i5 since the demo data is from XSAPR i5.
In [5]:
meta_config = config.get_metadata('cacti_csapr2_ppi')
cmac_config = config.get_cmac_values('cacti_csapr2_ppi')
field_config = config.get_field_names('cacti_csapr2_ppi')
We can then use a dask bag to map the radar list into distributed memory and then execute the processing code on each file using .map().compute() on the bag
In [6]:
soundings_directory = '/lustre/or-hydra/cades-arm/proj-shared/corsondewnpnM1.b1/'
radar_directory = '/lustre/or-hydra/cades-arm/proj-shared/corcsapr2cfrppiM1.a1/'
clutter_directory = '/lustre/or-hydra/cades-arm/proj-shared/csapr2_clutter/'
radar_files = glob(radar_directory + '*/*', recursive=True)
sounding_files = glob(soundings_directory + '*')
#clutter_files = glob(clutter_directory)
img_directory = '/lustre/or-hydra/cades-arm/proj-shared/cacticsapr2cmacppi.c1.png'
out_path = '/lustre/or-hydra/cades-arm/proj-shared/cacticsapr2cmacppi.c1/'
log_path = '/lustre/or-hydra/cades-arm/proj-shared/csapr_log/'
#sonde_file = '/home/rjackson/i5_test_data/sgpsondewnpnC1.b1.20170926.113600.cdf'
#clutter_file_path = '/home/rjackson/clutter201709.nc'
In [7]:
radar_files.sort()
sounding_files.sort()
#clutter_files.sort()
In [8]:
def parse_sonde_times(file_list):
time_list = []
for name in file_list:
time_list.append(datetime.strptime(name.split('/')[-1], 'corsondewnpnM1.b1.%Y%m%d.%H%M%S.cdf'))
return np.array(time_list)
#def parse_clutter_times(file_list):
# time_list = []
# for name in file_list:
# time_list.append(datetime.strptime(name.split('/')[-1], 'clutter_corcsapr2cfrppiM1.a1.%Y%m%d.%H%M%S.nc'))
#return np.array(time_list)
sonde_times = parse_sonde_times(sounding_files)
#clutter_times = parse_clutter_times(clutter_files)
In [9]:
#clutter_times.sort()
sonde_times.sort()
In [10]:
len(radar_files)
Out[10]:
In [ ]:
rad_file = radar_files[0]
radar = pyart.io.read(rad_file)
rad_time = datetime.strptime(radar.time["units"][0:33], "seconds since %Y-%m-%d %H:%M:%S")
In [ ]:
sonde_index = np.argmin(np.abs(sonde_times - rad_time))
sounding_file = sounding_files[sonde_index]
In [ ]:
clutter_index = np.argmin(np.abs(clutter_times - rad_time))
clutter_file = clutter_files[clutter_index]
In [ ]:
print(sounding_file)
print(clutter_file)
In [ ]:
def run_cmac_from_index(rad_file):
radar = pyart.io.read(rad_file)
rad_time = datetime.strptime(radar.time["units"][0:33], "seconds since %Y-%m-%d %H:%M:%S")
del radar
sonde_index = np.argmin(np.abs(sonde_times - rad_time))
sounding_file = sounding_files[sonde_index]
clutter_file = clutter_files[i]
#f= open(os.path.join(log_path,"{0}.txt".format(str(i).zfill(4))),"w+")
#f.write('Radar file: {0}\n'.format(rad_file))
#f.write('Sounding file: {0}\n'.format(sounding_file))
#f.write('Clutter file: {0}\n'.format(clutter_file))
#f.close()
print(rad_file)
print(clutter_file)
run_cmac_and_plotting(rad_file, 'cacti_csapr2_ppi',
sounding_file, clutter_file,
out_path, img_directory)
In [48]:
#cluster = PBSCluster(name='dask-worker', memory='270GB', cores=36, processes=6, interface='ib0', queue='high_mem', project='arm',
# walltime='00:30:00')#, job-extra=['-W group_list=cades-arm'])
cluster1 = PBSCluster(processes=9, cores=36, walltime='14:00:00', memory='160GB',
name='dask-worker', interface='ib0', queue='arm_high_mem', project='arm',
job_extra=['-W group_list=cades-arm'])
cluster1.scale(4*9) # Ask for ten workers
client1 = Client(cluster1) # Connect this local process to remote workers
In [49]:
client1
Out[49]:
In [50]:
the_bag = db.from_sequence(radar_files)
the_function = lambda x: run_cmac_and_plotting(x, cmac_config, sonde_times,
sounding_files, clutter_directory,
out_path, img_directory, sweep=3, dd_lobes=False)
futures1 = the_bag.map(the_function)
In [51]:
progress(futures1.compute())
In [46]:
!qselect -u zsherman | xargs qdel
In [ ]:
run_cmac_and_plotting(radar_files[0], cmac_config, sonde_times,
sounding_files, clutter_directory,
out_path, img_directory, sweep=3, dd_lobes=False)
In [ ]:
rad_file = radar_files[540]
radar = pyart.io.read(rad_file)
In [ ]:
rad_time = datetime.strptime(radar.time["units"][0:33], "seconds since %Y-%m-%d %H:%M:%S")
sonde_index = np.argmin(np.abs(sonde_times - rad_time))
sounding_file = sounding_files[sonde_index]
clutter_index = np.argmin(np.abs(clutter_times - rad_time))
clutter_file = clutter_files[clutter_index]
In [ ]:
clutter = pyart.io.read(clutter_file)
clutter_field_dict = clutter.fields['ground_clutter']
In [ ]:
clutter_field_dict['data'].shape
In [ ]:
for key in radar.fields.keys():
print(radar.fields[key]['data'].shape)
In [ ]:
prof.visualize()
In [ ]:
# im_files = glob('/home/rjackson/xsapr_imgs/reflectivity*.png')
# im_files.sort()
# images = []
# for filename in im_files:
# images.append(imageio.imread(filename))
# imageio.mimsave('refl_animation.gif', images, duration=0.5)
# with open('refl_animation.gif','rb') as f:
# display(Image(f.read()))
In [ ]:
im_files = glob('/lcrc/group/earthscience/icrisologo/csapr_imgs/masked_corrected_reflectivity*.png')
im_files.sort()
images = []
for filename in im_files:
images.append(imageio.imread(filename))
imageio.mimsave('refl_animation.gif', images, duration=0.5)
with open('refl_animation.gif','rb') as f:
display(Image(f.read()))
In [ ]: