The following code will take the CLI commands produced in 01-JJA-L2V-Configuration-Files
notebook
aws config
Let's import the functions defined before for loading parameters
In [1]:
from load_config import params_to_cli
In [3]:
llr, emb, pred,evaluation = params_to_cli("CONFIGS/ex1-ml-1m-config.yml", "CONFIGS/ex4-du04d100w10l80n10d30p1q1-1000-081417-params.yml")
In [4]:
llr
Out[4]:
In [5]:
evaluation
Out[5]:
This function will format the AWS CLI commands so we can pass them to the cluster using boto3
In [7]:
def create_steps(llr=None, emb=None, pred=None, evaluation=None, name=''):
if llr != None:
Steps=[
{
'Name': name + '-LLR',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': (llr).split(),
}
},
{
'Name': name + '-EMB',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': (emb).split(),
}
},
{
'Name': name + '-PRED',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': (pred).split(),
}
},
{
'Name': name + '-EVAL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': (evaluation).split(),
}
}
]
else:
Steps=[
{
'Name': name + '-EMB',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': (emb).split(),
}
},
{
'Name': name + '-PRED',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': (pred).split(),
}
},
{
'Name': name + '-EVAL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': (evaluation).split(),
}
}
]
return Steps
To load the commands into EMR
Here we create steps based on the three steps in the pipeline
In [8]:
# ex2 = create_steps(llr=llr, emb=emb, pred=pred, evaluation=evaluation, name='EXP3')
ex3 = create_steps(llr=llr, emb=emb, pred=pred, evaluation=evaluation, name='EXP3')
# ex4 = create_steps(emb=emb348, pred=pred348, name='EXP4')
# ex5 = create_steps(emb=emb349, pred=pred349, name='EXP5')
If we are adding multiple runs of the pipeline
In [13]:
# steps = ex2 + ex3 + ex4 + ex5
steps = ex3
In [14]:
steps
Out[14]:
To run the steps into EMR using boto3
In [10]:
import boto3
In [11]:
client = boto3.client('emr')
In [12]:
cluster_id = 'j-2JGJ9RIFQ4VRK'
In [15]:
response = client.add_job_flow_steps(
JobFlowId = cluster_id,
Steps= steps
)
In [16]:
response
Out[16]:
In [ ]: