git clone https://github.com/EVS-ATMOS/cmac2.0.git
cd cmac2.0
conda env create -f environment-3.6.yml
conda activate cmac_env
module load gcc/6.3.0
export COIN_INSTALL_DIR=/path/to/anaconda3/envs/cmac_env
pip install git+https://github.com/jjhelmus/CyLP@py3
git clone https://code.arm.gov/adi_cmac2.git
git clone https://code.arm.gov/adi_py.git
git clone https://code.arm.gov/adi_pyart_glue.git
module load adi
cd adi_py
python setup.py install
cd ..
cd adi_pyart_glue
python setup.py install
cd ..
cd adi_cmac2
python setup.py install
touch /path/to/anaconda3/envs/cmac_env/etc/conda/activate.d/env_var.sh
touch /path/to/anaconda3/envs/cmac_env/etc/conda/deactivate.d/env_var.sh
#!/bin/bash
module load postgresql
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/software/user_tools/current/cades-arm/apps/lib64
export C_INCLUDE_PATH=$C_INCLUDE_PATH:/software/user_tools/current/cades-arm/apps/include:/software/dev_tools/swtree/cs400_centos7.5_pe2018/anaconda3/5.1.0/centos7.5_intel18.0.0/anaconda/pkgs/libnetcdf-4.6.1-he6cff42_8/include/
export PKG_CONFIG_PATH=$PKG_CONFIG_PATH::/software/user_tools/current/cades-arm/apps/lib64/pkgconfig:/software/dev_tools/swtree/cs400_centos7.5_pe2018/anaconda3/5.1.0/centos7.5_intel18.0.0/anaconda/pkgs/libnetcdf-4.6.1-he6cff42_8/lib/pkgconfig/
#!/bin/bash
module unload postgresql
Import all of the needed libraries
In [1]:
import subprocess
import os
import sys
from dask_jobqueue import PBSCluster
from distributed import Client, progress
from datetime import datetime, timedelta
from pkg_resources import load_entry_point
from distributed import progress
In [2]:
def exec_adi(info_dict):
"""
This function will call adi_cmac2 from within Python. It takes in a dictionary where the inputs to adi_cmac2 are
stored.
Parameters
----------
info_dict: dict
A dictionary with the following keywords:
'facility' = The facility marker (i.e. 'sgp', 'nsa', etc.)
'site' = The site marker (i.e. i4, i5, i6)
'start_date' = The start date as a string formatted YYYYMMDD
'end_date' = The end date as a string formatted YYYYMMDD
"""
facility = info_dict['facility']
site = info_dict['site']
start_date = info_dict['start_date']
end_date = info_dict['end_date']
# Change this directory to where you want your adi logs stored
logs_dir = "/home/rjackson/adi_logs"
# Set the path to your datasteam here!
os.environ["DATASTREAM_DATA"] = "/lustre/or-hydra/cades-arm/rjackson/"
logs_dir += logs_dir + "/" + site + start_date + "_" + end_date
if not os.path.isdir(logs_dir):
os.makedirs(logs_dir)
os.environ["LOGS_DATA"] = logs_dir
os.environ["PROJ_LIB"] = "/home/rjackson/anaconda3/envs/adi_env3/share/proj/"
# Set the path to the clutter file here!
os.environ["CMAC_CLUTTER_FILE"] = "/home/rjackson/cmac2.0/scripts/clutter201901.nc"
subprocess.call(("/home/rjackson/anaconda3/envs/adi_env3/bin/adi_cmac2 -D 1 -f " +
facility + " -s " + site + " -b " + start_date + " -e "+ end_date), shell=True)
This will start a distributed cluster on the arm_high_mem queue. I have set it to have 6 adi_cmac2 processes per node, with 36 total processes being ran. Feel free to change these values as you see fit. TYou will need to change the environment name and paths to what you named your adi_cmac2 environment on your machine. You will also need to change the path to your conda.sh.
In [3]:
the_cluster = PBSCluster(processes=6, cores=36, queue="arm_high_mem",
walltime="3:00:00", resource_spec="qos=std",
job_extra=["-A arm", "-W group_list=cades-arm"],
env_extra=[". /home/rjackson/anaconda3/etc/profile.d/conda.sh", "conda activate adi_env3"])
the_cluster.scale(36)
In [4]:
client = Client(the_cluster)
client
Out[4]:
Run the above code to start the distributed client, and then use the output of this cell to determine whether your client got started. You should have nonzero resources available if the cluster has started.
In [5]:
client
Out[5]:
This creates the list of dictionaries mapped onto exec_adi when adi_cmac2 is run on the cluster.
In [6]:
def make_date_list_dict_list(start_day, end_day):
"""
This automatically generates a list of day inputs for the exec_adi function.
Parameters
----------
start_day: datetime
The start date
end_day:
The end date
Returns
-------
the_list: A list of dictionary inputs for exec_adi
"""
cur_day = start_day
the_list = []
while(cur_day < end_day):
next_day = cur_day + timedelta(days=1)
temp_dict = {}
# Change these next two lines to fit your facility
temp_dict['facility'] = "I5"
temp_dict['site'] = "sgp"
temp_dict['start_date'] = cur_day.strftime("%Y%m%d")
temp_dict['end_date'] = next_day.strftime("%Y%m%d")
the_list.append(temp_dict)
cur_day = cur_day + timedelta(days=1)
return the_list
# Here we specify the dates that we want to process
date_list = make_date_list_dict_list(datetime(2019, 1, 1), datetime(2019,2,6))
In [7]:
# Run the cluster
futures = client.map(exec_adi, date_list)
In [8]:
# Put up a little progress bar!
progress(futures)
In [94]:
# This will make the tasks quit
del futures
cluster.stop_all_jobs()