The following code will take the CLI commands produced in 01-JJA-L2V-Configuration-Files notebook

  • You need to run aws config

Let's import the functions defined before for loading parameters


In [1]:
from load_config import params_to_cli

In [3]:
llr, emb, pred,evaluation = params_to_cli("CONFIGS/ex1-ml-1m-config.yml", "CONFIGS/ex4-du04d100w10l80n10d30p1q1-1000-081417-params.yml")

In [4]:
llr


Out[4]:
'spark-submit --deploy-mode cluster --class llr.LLR s3://sandbox-l2v/JARs/llr-assembly-1.2.jar --master yarn --options default --useroritem user --threshold 0.4 --interactionsFile s3://sandbox-l2v/datasets/ml-1m/split/split-cleaned-formatted-4and5/ml1m-train-clean4and5 --outputFile s3://sandbox-l2v/datasets/ml-1m/llr_output/llr12-081417-du04 --separator "," --maxInteractionsPerUserOrItem 500 --seed 12345'

In [5]:
evaluation


Out[5]:
'spark-submit --deploy-mode cluster --class eval --master yarn s3://sandbox-l2v/JARs/evaluation-assembly-1.5.jar --options allMetrics --inputFile s3://sandbox-l2v/datasets/ml-1m/predictions/du04-d100w10l80n10d30-p1q1-1000-081417/part-00000 --outputFile s3://sandbox-l2v/datasets/ml-1m/eval/du04-d100w10l80n10d30-p1q1-1000-081417'

This function will format the AWS CLI commands so we can pass them to the cluster using boto3


In [7]:
def create_steps(llr=None, emb=None, pred=None, evaluation=None, name=''):
    if llr != None:
        Steps=[

        {
            'Name': name + '-LLR',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': (llr).split(),
            }
        },
        {
            'Name': name + '-EMB',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': (emb).split(),
            }
        },
        {
            'Name': name + '-PRED',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': (pred).split(),
            }
        },
            {
            'Name': name + '-EVAL',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': (evaluation).split(),
            }
        }
    ]
    else:
        Steps=[
        {
            'Name': name + '-EMB',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': (emb).split(),
            }
        },
        {
            'Name': name + '-PRED',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': (pred).split(),
            }
        },
            {
            'Name': name + '-EVAL',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': (evaluation).split(),
            }
        }
    ]
                     
            
    return Steps

To load the commands into EMR

Here we create steps based on the three steps in the pipeline


In [8]:
# ex2 = create_steps(llr=llr, emb=emb, pred=pred, evaluation=evaluation, name='EXP3')
ex3 = create_steps(llr=llr, emb=emb, pred=pred, evaluation=evaluation, name='EXP3')
# ex4 = create_steps(emb=emb348, pred=pred348, name='EXP4')
# ex5 = create_steps(emb=emb349, pred=pred349, name='EXP5')

If we are adding multiple runs of the pipeline


In [13]:
# steps = ex2 + ex3 + ex4 + ex5
steps = ex3

In [14]:
steps


Out[14]:
[{'ActionOnFailure': 'CONTINUE',
  'HadoopJarStep': {'Args': ['spark-submit',
    '--deploy-mode',
    'cluster',
    '--class',
    'llr.LLR',
    's3://sandbox-l2v/JARs/llr-assembly-1.2.jar',
    '--master',
    'yarn',
    '--options',
    'default',
    '--useroritem',
    'user',
    '--threshold',
    '0.4',
    '--interactionsFile',
    's3://sandbox-l2v/datasets/ml-1m/split/split-cleaned-formatted-4and5/ml1m-train-clean4and5',
    '--outputFile',
    's3://sandbox-l2v/datasets/ml-1m/llr_output/llr12-081417-du04',
    '--separator',
    '","',
    '--maxInteractionsPerUserOrItem',
    '500',
    '--seed',
    '12345'],
   'Jar': 'command-runner.jar'},
  'Name': 'EXP3-LLR'},
 {'ActionOnFailure': 'CONTINUE',
  'HadoopJarStep': {'Args': ['spark-submit',
    '--deploy-mode',
    'cluster',
    '--class',
    'Main',
    's3://sandbox-l2v/JARs/n2v-assembly-3.7.jar',
    '--dim',
    '100',
    '--window',
    '10',
    '--walkLength',
    '80',
    '--numWalks',
    '10',
    '--degree',
    '30',
    '--p',
    '1',
    '--q',
    '1',
    '--weighted',
    'true',
    '--directed',
    'false',
    '--indexed',
    'true',
    '--input',
    's3://sandbox-l2v/datasets/ml-1m/llr_output/llr12-081417-du04/part-00000',
    '--output',
    's3://sandbox-l2v/datasets/ml-1m/network-embeddings/embeddings37-du04-081417-d100w10l80n10d30-p1q1',
    '--cmd',
    'node2vec'],
   'Jar': 'command-runner.jar'},
  'Name': 'EXP3-EMB'},
 {'ActionOnFailure': 'CONTINUE',
  'HadoopJarStep': {'Args': ['spark-submit',
    '--deploy-mode',
    'cluster',
    '--class',
    'Prediction',
    '--master',
    'yarn-cluster',
    's3://sandbox-l2v/JARs/prediction-assembly-2.2.jar',
    '--dim',
    '100',
    '--ntype',
    'KNN',
    '--train',
    's3://sandbox-l2v/datasets/ml-1m/split/split-cleaned-formatted-4and5/ml1m-train-clean4and5',
    '--test',
    's3://sandbox-l2v/datasets/ml-1m/split/split-cleaned-formatted/ml1m-validation-clean',
    '--embedding',
    's3://sandbox-l2v/datasets/ml-1m/network-embeddings/embeddings37-du04-081417-d100w10l80n10d30-p1q1.emb/part-00000',
    '--neighbors',
    '1000',
    '--rmse',
    's3://sandbox-l2v/datasets/ml-1m/rmse/du04-d100w10l80n10d30-p1q1-1000-081417',
    '--predictions',
    's3://sandbox-l2v/datasets/ml-1m/predictions/du04-d100w10l80n10d30-p1q1-1000-081417'],
   'Jar': 'command-runner.jar'},
  'Name': 'EXP3-PRED'},
 {'ActionOnFailure': 'CONTINUE',
  'HadoopJarStep': {'Args': ['spark-submit',
    '--deploy-mode',
    'cluster',
    '--class',
    'eval',
    '--master',
    'yarn',
    's3://sandbox-l2v/JARs/evaluation-assembly-1.5.jar',
    '--options',
    'allMetrics',
    '--inputFile',
    's3://sandbox-l2v/datasets/ml-1m/predictions/du04-d100w10l80n10d30-p1q1-1000-081417/part-00000',
    '--outputFile',
    's3://sandbox-l2v/datasets/ml-1m/eval/du04-d100w10l80n10d30-p1q1-1000-081417'],
   'Jar': 'command-runner.jar'},
  'Name': 'EXP3-EVAL'}]

To run the steps into EMR using boto3


In [10]:
import boto3

In [11]:
client = boto3.client('emr')

In [12]:
cluster_id = 'j-2JGJ9RIFQ4VRK'

In [15]:
response = client.add_job_flow_steps(
    JobFlowId = cluster_id,
    Steps= steps
)

In [16]:
response


Out[16]:
{'ResponseMetadata': {'HTTPHeaders': {'content-length': '85',
   'content-type': 'application/x-amz-json-1.1',
   'date': 'Mon, 14 Aug 2017 23:05:18 GMT',
   'x-amzn-requestid': '0a786bd8-8145-11e7-8dd1-d31bf82860c5'},
  'HTTPStatusCode': 200,
  'RequestId': '0a786bd8-8145-11e7-8dd1-d31bf82860c5',
  'RetryAttempts': 0},
 'StepIds': ['s-1AN2B0U2UFIX9',
  's-2C0ZF4PO9CFPP',
  's-380HRBF5JCHJI',
  's-22VTB7A3XMKDZ']}

In [ ]: