RADICAL-Pilot Tutorial

Utilize the examples below to familiarize yourself with RADICAL-Pilot.

We will:

  • Modify settings (environment variables) if needed
  • Modify the example to print out the hostname of the machine that runs the Pilot

Please make sure that you always close the session before terminating the notebook using session.close()

1 RADICAL-Pilot Setup

Documentation: http://radicalpilot.readthedocs.org/en/latest/machconf.html#preconfigured-resources

First, we will import the necessary dependencies and define some helper functions.


In [71]:
%matplotlib inline
import os, sys
import commands
import radical.pilot as rp
import random
import pandas as pd
import ast
import seaborn as sns

def print_details(detail_object):
    if type(detail_object)==str:
        detail_object = ast.literal_eval(detail_object)
    for i in detail_object:
        detail_object[i]=str(detail_object[i])
    return pd.DataFrame(detail_object.values(), 
             index=detail_object.keys(), 
             columns=["Value"])

os.environ["RADICAL_PILOT_VERBOSE"]="ERROR"
os.environ["RADICAL_PILOT_DBURL"]="mongodb://mongo.radical-cybertools.org:24242/sc15-test000"

!/opt/anaconda/bin/radicalpilot-version


0.37.11.2

2. Local Pilot Example

This example shows how to execute a task using a Pilot-Job running on the local machine. In this case, the Pilot-Job is started using ssh on the edge node machine of the Hadoop cluster (which runs Jupyterhub - the iPython notebook server).

2.1 Create a new Session and Pilot-Manager.


In [72]:
session = rp.Session()
pmgr = rp.PilotManager(session=session)
umgr = rp.UnitManager (session=session,
                       scheduler=rp.SCHED_ROUND_ROBIN)
print "Session id: %s Pilot Manager: %s" % (session.uid, str(pmgr.as_dict()))


Session id: rp.session.ip-10-144-47-195.radical.016754.0005 Pilot Manager: {'uid': 'pmgr.0005'}

In [73]:
print_details(umgr.as_dict())


Out[73]:
Value
uid umgr.0005
scheduler RoundRobinScheduler
scheduler_details NO SCHEDULER DETAILS (Not Implemented)

2.2 Submit Pilot and add to Unit Manager


In [74]:
pdesc = rp.ComputePilotDescription()
pdesc.resource = "local.localhost_anaconda"  # NOTE: This is a "label", not a hostname
pdesc.runtime  = 10 # minutes
pdesc.cores    = 2
pdesc.cleanup  = False
pilot = pmgr.submit_pilots(pdesc)
umgr.add_pilots(pilot)

In [75]:
print_details(pilot.as_dict())


Out[75]:
Value
uid pilot.0005
stdout None
start_time None
resource_detail {'cores_per_node': None, 'nodes': None}
submission_time 1447607422.85
logfile None
resource local.localhost_anaconda
log []
sandbox file://localhost/home/radical/radical.pilot.sa...
state PendingLaunch
stop_time None
stderr None

2.3 Submit Compute Units

Create a description of the compute unit, which specifies the details of the task to be executed.


In [76]:
cudesc = rp.ComputeUnitDescription()
cudesc.environment = {'CU_NO': 1}
cudesc.executable  = "/bin/echo"
cudesc.arguments   = ['I am CU number $CU_NO']
cudesc.cores       = 1
print_details(cudesc.as_dict())


Out[76]:
Value
kernel None
executable /bin/echo
name None
restartable False
output_staging None
stdout None
pre_exec None
mpi False
environment {'CU_NO': 1}
cleanup False
arguments ['I am CU number $CU_NO']
stderr None
cores 1
post_exec None
input_staging None

Submit the previously created ComputeUnit descriptions to the PilotManager. This will trigger the selected scheduler (in this case the round-robin scheduler) to start assigning ComputeUnits to the ComputePilots.


In [78]:
print "Submit Compute Units to Unit Manager ..."
cu_set = umgr.submit_units([cudesc])
print "Waiting for CUs to complete ..."
umgr.wait_units()
print "All CUs completed successfully!"
cu_results = cu_set[0]
details=cu_results.as_dict()


Submit Compute Units to Unit Manager ...
Waiting for CUs to complete ...
All CUs completed successfully!

The next command will provide the state of the Pilot and other pilot details.


In [79]:
print_details(details)


Out[79]:
Value
log [<radical.pilot.logentry.Logentry object at 0x...
state Done
working_directory file://localhost/home/radical/radical.pilot.sa...
uid unit.000046
submission_time 1447607526.08
execution_details {u'control': u'umgr', u'stdout': u'I am CU num...
stop_time None
start_time None
exit_code 0
name None

And some more details...


In [80]:
print_details(details["execution_details"])


Out[80]:
Value
control umgr
stdout I am CU number 1\n
callbackhistory [{u'timestamp': 1447607526.828442, u'state': u...
Agent_Output_Directives []
Agent_Output_Status None
Agent_Input_Status None
exec_locs None
FTW_Input_Directives []
log [{u'timestamp': 1447607526.089792, u'message':...
exit_code 0
FTW_Input_Status None
state Done
unitmanager umgr.0005
statehistory [{u'timestamp': 1447607526.078748, u'state': u...
FTW_Output_Directives []
pilot_sandbox file://localhost/home/radical/radical.pilot.sa...
description {u'kernel': None, u'executable': u'/bin/echo',...
restartable False
started None
FTW_Output_Status None
finished None
Agent_Input_Directives []
pilot pilot.0005
submitted 1447607526.08
sandbox file://localhost/home/radical/radical.pilot.sa...
stderr
_id unit.000046

Parse the output of the CU


In [81]:
print cu_results.stdout.strip()


I am CU number 1

2.4 Exercise

Write a task (i.e., a ComputeUnit) that prints out the hostname of the machine!

Answer: In the example above, in cudesc.executable replace /bin/echo with hostname.

2.5 Performance Analysis

In the examples below we will show how RADICAL-Pilot can be used for interactive analytics. We will plot and analyze the execution times of a set of ComputeUnits.


In [82]:
def get_runtime(compute_unit):
    details=compute_unit.as_dict()
    execution_details=details['execution_details']
    state_details=execution_details["statehistory"]
    results = {}
    for i in state_details:
        results[i["state"]]=i["timestamp"]
    start = results["Scheduling"]
    end = results["Done"]
    runtime = end-start
    return runtime

In [83]:
import random
cudesc_list = []
for i in range(20):
    cudesc = rp.ComputeUnitDescription()
    cudesc.executable  = "/bin/sleep"
    cudesc.environment = {'CU_NO': i}
    cudesc.arguments   = ['%d'%(random.randrange(10))]
    cudesc.cores       = 1
    cudesc_list.append(cudesc)

In [84]:
cu_set = umgr.submit_units(cudesc_list)

In [85]:
states = umgr.wait_units()

In [86]:
runtimes=[]
for compute_unit in cu_set:
    str(compute_unit)
    runtimes.append(get_runtime(compute_unit))

/bin/sleep assigns a random sleep time. We plot the distribution of runtimes of the above 20 ComputeUnits using Seaborn. See distplot documentation.


In [87]:
plot=sns.distplot(runtimes, kde=False, axlabel="Runtime")


2.6 Close and Delete Session


In [88]:
session.close()
del session

3. YARN Pilot Example

Having submitted multiple jobs using RADICAL-Pilot locally, in this section we will examine how to submit multiple tasks to a YARN cluster using RADICAL-Pilot. Although not of primary importance for this tutorial, it is worth noting that the YARN cluster is a remote resource compared to the submission host (edge node).

3.1 Create a new Session and Pilot-Manager.


In [89]:
import getpass
yarn_session = rp.Session()
c = rp.Context('ssh')
c.user_id = getpass.getuser()
yarn_session.add_context(c)
pmgr = rp.PilotManager(session=yarn_session)
umgr = rp.UnitManager(session=yarn_session,
                      scheduler=rp.SCHED_ROUND_ROBIN)
print "Session id: %s Pilot Manager: %s" % (yarn_session.uid, str(pmgr.as_dict()))


Session id: rp.session.ip-10-144-47-195.radical.016754.0006 Pilot Manager: {'uid': 'pmgr.0006'}

3.2 Submit Pilot and add to Unit Manager

Note the change in the resource description:

    pdesc.resource = "yarn.aws-vm"  # NOTE: This is a "label", not a hostname

In [90]:
pdesc = rp.ComputePilotDescription ()
pdesc.resource = "yarn.aws-vm"  # NOTE: This is a "label", not a hostname
pdesc.runtime  = 30 # minutes
pdesc.cores    = 1
pdesc.cleanup  = False
# submit the pilot.
print "Submitting Compute Pilot to Pilot Manager ..."
pilot = pmgr.submit_pilots(pdesc)
umgr.add_pilots(pilot)


Submitting Compute Pilot to Pilot Manager ...

In [91]:
print_details(pilot.as_dict())


Out[91]:
Value
uid pilot.0006
stdout None
start_time None
resource_detail {'cores_per_node': None, 'nodes': None}
submission_time 1447608249.47
logfile None
resource yarn.aws-vm
log []
sandbox sftp://sc15.radical-cybertools.org/home/radica...
state PendingLaunch
stop_time None
stderr None

3.3 Submit Compute Units

Create a description of the compute unit, which specifies the details on the executed task.


In [92]:
cudesc = rp.ComputeUnitDescription()
cudesc.environment = {'CU_NO': "1"}
cudesc.executable  = "/bin/echo"
cudesc.arguments   = ['I am CU number $CU_NO']
cudesc.cores       = 1
print_details(cudesc.as_dict())


Out[92]:
Value
kernel None
executable /bin/echo
name None
restartable False
output_staging None
stdout None
pre_exec None
mpi False
environment {'CU_NO': '1'}
cleanup False
arguments ['I am CU number $CU_NO']
stderr None
cores 1
post_exec None
input_staging None

Submit the previously created ComputeUnit descriptions to the PilotManager. This will trigger the selected scheduler to start assigning ComputeUnits to the ComputePilots.


In [93]:
print "Submit Compute Units to Unit Manager ..."
cu_set = umgr.submit_units([cudesc])
print "Waiting for CUs to complete ..."
umgr.wait_units()
print "All CUs completed successfully!"
cu_results = cu_set[0]
details=cu_results.as_dict()


Submit Compute Units to Unit Manager ...
Waiting for CUs to complete ...
All CUs completed successfully!

In [94]:
print_details(details)


Out[94]:
Value
log [<radical.pilot.logentry.Logentry object at 0x...
state Done
working_directory sftp://sc15.radical-cybertools.org/home/radica...
uid unit.000067
submission_time 1447608255.01
execution_details {u'control': u'umgr', u'stdout': u'[... CONTEN...
stop_time None
start_time None
exit_code 0
name None

In [63]:
print_details(details["execution_details"])


Out[63]:
Value
control umgr
stdout [... CONTENT SHORTENED ...]\notobufRpcEngine:2...
callbackhistory [{u'timestamp': 1447558876.250819, u'state': u...
Agent_Output_Directives []
Agent_Output_Status None
Agent_Input_Status None
exec_locs None
FTW_Input_Directives []
log [{u'timestamp': 1447558875.294721, u'message':...
exit_code 0
FTW_Input_Status None
state Done
unitmanager umgr.0004
statehistory [{u'timestamp': 1447558875.276375, u'state': u...
FTW_Output_Directives []
pilot_sandbox sftp://sc15.radical-cybertools.org/home/radica...
description {u'kernel': None, u'executable': u'/bin/echo',...
restartable False
started None
FTW_Output_Status None
finished None
Agent_Input_Directives []
pilot pilot.0004
submitted 1447558875.28
sandbox sftp://sc15.radical-cybertools.org/home/radica...
stderr
_id unit.000044

In [64]:
print cu_results.stdout.strip()


[... CONTENT SHORTENED ...]
otobufRpcEngine:250 - Call: getApplicationReport took 1ms
2015-11-15 03:41:44 DEBUG Client:1032 - IPC Client (1561421084) connection to /10.63.179.69:8050 from radical sending #530
2015-11-15 03:41:44 DEBUG Client:1089 - IPC Client (1561421084) connection to /10.63.179.69:8050 from radical got value #530
2015-11-15 03:41:44 DEBUG ProtobufRpcEngine:250 - Call: getApplicationReport took 0ms
2015-11-15 03:41:44 DEBUG Client:1032 - IPC Client (1561421084) connection to /10.63.179.69:8050 from radical sending #531
2015-11-15 03:41:44 DEBUG Client:1089 - IPC Client (1561421084) connection to /10.63.179.69:8050 from radical got value #531
2015-11-15 03:41:44 DEBUG ProtobufRpcEngine:250 - Call: getApplicationReport took 0ms
2015-11-15 03:41:44 INFO  Client:792 - Application has completed successfully. Breaking monitoring loop
2015-11-15 03:41:44 INFO  Client:240 - Application completed successfully
2015-11-15 03:41:44 DEBUG Client:97 - stopping client from cache: org.apache.hadoop.ipc.Client@8a051f4
I am CU number 1

4. Close and Delete Session


In [95]:
yarn_session.close()
del yarn_session