In [1]:
# Import Parsl
import parsl
from parsl import *
print(parsl.__version__) # The version should be v0.2.1+
In [2]:
workers = ThreadPoolExecutor(max_workers=4)
# We pass the workers to the DataFlowKernel which will execute our Apps over the workers.
dfk = DataFlowKernel(executors=[workers])
To demonstrate how to run apps written as Bash scripts, we use two mock science applications: simulate.sh and stats.sh. The simulation.sh script serves as a trivial proxy for any more complex scientific simulation application. It generates and prints a set of one or more random integers in the range [0-2^62) as controlled by its command line arguments. The stats.sh script serves as a trivial model of an "analysis" program. It reads N files each containing M integers and prints the average of all those numbers to stdout. Like simulate.sh it logs environmental information to stderr.
The following cell show how apps can be composed from arbitrary Bash scripts. The simulate signature shows how variables can be passed to the Bash script (e.g., "sim_steps") as well as how standard Parsl parameters are managed (e.g., "stdout").
In [3]:
@App('bash', dfk)
def simulate(sim_steps=1, sim_range=100, sim_values=5, outputs=[], stdout=None, stderr=None):
# The bash app function requires that the bash script is returned from the function as a
# string. Positional and Keyword args to the fn() are formatted into the cmd_line string
# All arguments to the app function are made available at the time of string formatting a
# string assigned to cmd_line.
# Here we compose the command-line call to simulate.sh with keyword arguments to simulate()
# and redirect stdout to the first file listed in the outputs list.
return '''echo "sim_steps: {sim_steps}\nsim_range: {sim_range}\nsim_values: {sim_values}"
echo "Starting run at $(date)"
$PWD/bin/simulate.sh --timesteps {sim_steps} --range {sim_range} --nvalues {sim_values} > {outputs[0]}
echo "Done at $(date)"
ls
'''
Now that we've defined an app, let's run 10 parallel instances of it using a for loop. Each run will write 100 random numbers, each between 0 and 99, to the output file.
In order to track files created by Bash apps, a list of data futures (one for each file in the outputs[] list) is made available as an attribute of the AppFuture returned upon calling the decorated app fn.
<App_Future> = App_Function(... , outputs=['x.txt', 'y.txt'...])
[<DataFuture> ... ] = <App_Future>.outputs
In [4]:
simulated_results = []
# Launch 10 parallel runs of simulate() and put the futures in a list
for sim_index in range(10):
sim_fut = simulate(sim_steps=1,
sim_range=100,
sim_values=100,
outputs = ['stdout.{0}.txt'.format(sim_index)],
stderr='stderr.{0}.txt'.format(sim_index))
simulated_results.extend([sim_fut])
The variable "simulated_results" contains a list of AppFutures, each corresponding to a running bash app.
Now let's print the status of the 10 jobs by checking if the app futures are done.
Note: you can re-run this step until all the jobs complete (all status are True) or go on, as a later step will block until all the jobs are complete.
In [5]:
print ([i.done() for i in simulated_results])
In [6]:
# Grab just the data futures for the output files from each simulation
simulation_outputs = [i.outputs[0] for i in simulated_results]
In [7]:
@App('bash', dfk)
def analyze(inputs=[], stdout=None, stderr=None):
# Here we compose the commandline for stats.sh that take a list of filenames as arguments
# Since we want a space separated list, rather than a python list (e.g: ['x.txt', 'y.txt'])
# we create a string by joining the filenames of each item in the inputs list and using
# that string to format the cmd_line explicitly
input_files = ' '.join([i for i in inputs])
return '$PWD/bin/stats.sh {0}'.format(input_files)
In [8]:
results = analyze(inputs=simulation_outputs,
stdout='analyze.out',
stderr='analyze.err')
results.result()
with open('analyze.out', 'r') as f:
print(f.read())