We have to files to specify configuration (location of files) and parameters of the pipeline. For this we will be using '.yml' files
In [1]:
import yaml
import datetime
In [ ]:
# %load CONFIGS/ex1-ml-1m-config.yml
JARS:
LLR_JAR: "llr-assembly-1.2.jar"
EMBEDDINGS_JAR: "n2v-assembly-3.7.jar"
PREDICTIONS_JAR: "prediction-assembly-2.2.jar"
EVALUATION_JAR: "evaluation-assembly-1.5.jar"
PATHS:
JARS: "s3://sandbox-l2v/JARs/"
OUTPUT: "s3://sandbox-l2v/datasets/ml-1m/"
DATA:
TRAIN: "s3://sandbox-l2v/datasets/ml-1m/split/split-cleaned-formatted-4and5/ml1m-train-clean4and5"
VALIDATION: "s3://sandbox-l2v/datasets/ml-1m/split/split-cleaned-formatted/ml1m-validation-clean"
TRAIN-VALIDATION: ""
TEST: ""
This contains the location of the files in S3
In [ ]:
# %load CONFIGS/ex3420-du05d100w10l80n10d30p5q1-900-072717-params.yml
EMBEDDINGS:
degree: 30
dim: 100
numWalks: 10
p: 5
q: 1
walkLength: 80
window: 10
EVALUATION:
options: allMetrics
LLR:
options: default
threshold: 0.5
useroritem: user
PREDICTIONS:
neighbors: 900
ntype: KNN
This contains the hyperparameters of the L2V pipeline
These functions load the parameters in the two configuration files ex1-ml-1m-config.yml
and CONFIGS/ex3420-du05d100w10l80n10d30p5q1-900-072717-params.yml
and outputs AWS CLI commands which can be used to run the different steps of the pipeline in EMR.
In [2]:
def load_configs_params(config_yml_path, params_yml_path):
with open(config_yml_path, 'r') as config_ymlfile:
l2v_cfg = yaml.load(config_ymlfile)
with open(params_yml_path, 'r') as params_ymlfile:
l2v_params = yaml.load(params_ymlfile)
return l2v_cfg, l2v_params
In [3]:
def write_LLR_cli(l2v_params, l2v_cfg):
today = datetime.datetime.now().strftime("%m%d%y")
llr_JAR = l2v_cfg['PATHS']["JARS"] + l2v_cfg['JARS']["LLR_JAR"]
jar_serialization = l2v_cfg['JARS']['LLR_JAR'].replace("-assembly-","").replace(".jar", "").replace(".","")
llr_params_serialization = l2v_params['LLR']['options'][0] + l2v_params['LLR']['useroritem'][0] + str(l2v_params['LLR']['threshold']).replace(".","")
output_folder = jar_serialization + "-" + today + "-" + llr_params_serialization
output_for_llr = l2v_cfg['PATHS']["OUTPUT"] + "llr_output/" + output_folder
LLR_EMR = """spark-submit --deploy-mode cluster --class llr.LLR {} --master yarn --options {} --useroritem {} --threshold {} --interactionsFile {} --outputFile {} --separator , --maxInteractionsPerUserOrItem 500 --seed 12345""".format(llr_JAR, l2v_params['LLR']['options'], l2v_params['LLR']['useroritem'], l2v_params['LLR']['threshold'], l2v_cfg['DATA']['TRAIN'], output_for_llr )
return LLR_EMR, output_for_llr, llr_params_serialization
In [4]:
def write_EMB_cli(l2v_params, l2v_cfg, output_for_llr, llr_params_serialization):
today = datetime.datetime.now().strftime("%m%d%y")
input_embeddings = output_for_llr + "/part-00000"
ne_jar = l2v_cfg['JARS']["EMBEDDINGS_JAR"].replace("n2v-assembly-","").replace(".","").replace("jar","")
embs = "d{}w{}l{}n{}d{}-".format(l2v_params['EMBEDDINGS']['dim'], l2v_params['EMBEDDINGS']['window'], l2v_params['EMBEDDINGS']['walkLength'], l2v_params['EMBEDDINGS']['numWalks'] , l2v_params['EMBEDDINGS']['degree'])
n2v = "p{}q{}".format(l2v_params['EMBEDDINGS']['p'], l2v_params['EMBEDDINGS']['q'])
ne_output_folder = "embeddings" + ne_jar + "-" + llr_params_serialization + "-" + today + "-" + embs + n2v
output_for_embeddings = l2v_cfg['PATHS']["OUTPUT"] + "network-embeddings/" + ne_output_folder
embedding_JAR = l2v_cfg['PATHS']["JARS"] + l2v_cfg['JARS']["EMBEDDINGS_JAR"]
embeddings_d = l2v_params['EMBEDDINGS']["dim"]
w = l2v_params['EMBEDDINGS']["window"]
l = l2v_params['EMBEDDINGS']["walkLength"]
n = l2v_params['EMBEDDINGS']["numWalks"]
de = l2v_params['EMBEDDINGS']["degree"]
p = l2v_params['EMBEDDINGS']["p"]
q = l2v_params['EMBEDDINGS']["q"]
network_embeddings_EMR = """spark-submit --deploy-mode cluster --class Main {} --dim {} --window {} --walkLength {} --numWalks {} --degree {} --p {} --q {} --weighted true --directed false --indexed true --input {} --output {} --cmd node2vec""".format(embedding_JAR, embeddings_d, w, l, n, de, p, q, input_embeddings, output_for_embeddings )
return network_embeddings_EMR, output_for_embeddings, embs, n2v, embeddings_d
In [5]:
def write_PRED_cli(l2v_params, l2v_cfg, output_for_embeddings, llr_params_serialization, embs, n2v, embeddings_d):
today = datetime.datetime.now().strftime("%m%d%y")
prediction_JAR = l2v_cfg["PATHS"]["JARS"] + l2v_cfg["JARS"]["PREDICTIONS_JAR"]
p_ntype = l2v_params["PREDICTIONS"]["ntype"]
p_neighbors = l2v_params["PREDICTIONS"]["neighbors"]
emb_path = output_for_embeddings + ".emb" + "/part-00000"
p_output = "-" + str(p_neighbors) + "-" + today
# p_output_folder = llr_params_serialization + "-" + embs[:4] + n2v + "-" + str(p_neighbors) + "-" + today
p_output_folder = llr_params_serialization + "-" + embs + n2v + "-" + str(p_neighbors) + "-" + today
prediction_path = l2v_cfg["PATHS"]["OUTPUT"] + "predictions/" + p_output_folder
rmse_path = l2v_cfg["PATHS"]["OUTPUT"] + "rmse/" + p_output_folder
prediction_EMR = """spark-submit --deploy-mode cluster --class Prediction --master yarn-cluster {} --dim {} --ntype {} --train {} --test {} --embedding {} --neighbors {} --predictions {}""".format(prediction_JAR, embeddings_d, p_ntype, l2v_cfg["DATA"]["TRAIN"], l2v_cfg["DATA"]["VALIDATION"], emb_path, p_neighbors, prediction_path)
return prediction_EMR, p_output_folder, prediction_path
In [6]:
def write_EVAL_cli(l2v_cfg, l2v_params, p_output_folder, prediction_path):
evaluation_JAR = l2v_cfg["PATHS"]["JARS"] + l2v_cfg["JARS"]["EVALUATION_JAR"]
options = l2v_params["EVALUATION"]["options"]
inputFile = prediction_path + "/part-00000"
outputFile = l2v_cfg["PATHS"]["OUTPUT"] + "eval/" + p_output_folder
evaluation_EMR = """spark-submit --deploy-mode cluster --class eval --master yarn {} --options {} --inputFile {} --outputFile {}""".format(evaluation_JAR,options,inputFile,outputFile)
return evaluation_EMR
In [7]:
def params_to_cli(path_to_l2v_config, path_to_l2v_params):
# load params
l2v_cfg, l2v_params = load_configs_params(path_to_l2v_config, path_to_l2v_params)
# llr command
llr, output_folder_LLR, llr_params_serialization = write_LLR_cli(l2v_params, l2v_cfg)
# embeddings command
emb, output_for_embeddings, embs, n2v, embeddings_d = write_EMB_cli(l2v_params, l2v_cfg, output_folder_LLR, llr_params_serialization)
# prediction command
pred, p_output_folder, prediction_path = write_PRED_cli(l2v_params, l2v_cfg, output_for_embeddings, llr_params_serialization, embs, n2v, embeddings_d)
# evaluation command
evaluation = write_EVAL_cli(l2v_cfg, l2v_params, p_output_folder, prediction_path)
return llr, emb, pred, evaluation
In [8]:
llr, emb, pred, evaluation = params_to_cli("CONFIGS/ex1-ml-1m-config.yml", "CONFIGS/ex3420-du05d100w10l80n10d30p5q1-900-072717-params.yml")
Now these 3 variables contain the AWS CLI parameters for each of the modules
In [9]:
llr
Out[9]:
In [10]:
emb
Out[10]:
In [11]:
pred
Out[11]:
In [12]:
evaluation
Out[12]:
In [ ]:
In [ ]:
The next step is to load this steps using BOTO3
In [ ]: