The module work_queue_futures allows you to execute your computations as a distributed system, that is, not only using the resources locally available in a computer, but also using remote resources you may have access. It is most effective when you have more work to be computed than resources available, and when that work can be split in independent tasks that run for more than 30 seconds, but less than a day.
A work_queue_futures application consists of two parts:
In this notebook we will construct our first work_queue_futures master. The first need we need to do is to import the python objects we'll need:
In [ ]:
from work_queue_futures import WorkQueueFutures, FutureTask, FutureTaskError
In [ ]:
q = WorkQueueFutures(name = 'my-wq-app', port = 0, local_worker = {'cores':1})
print('queue is accepting connections on port {}'.format(q.port))
We created the queue specifying three arguments:
In [ ]:
task = FutureTask('/bin/date')
q.submit(task)
Once submitted, the task is assigned to an available worker. We can ask for its result with:
In [ ]:
if task.result():
print(task.output)
Note that if the task is not finished, then
task.result()
will block until the task is completed. This is inconvinient, as we do not have a way to know which task will finish next. Rather than wait for results on particular tasks, we can define functions to be executed when the task is completed. These functions (known as callbacks) receive the completed task as an argument. We can add as many callbacks as we want, and they are executed in the order they were added.
Once we added the callback, we submit the task as usual and call q.join()
, which waits for all submitted tasks to complete.
In [ ]:
def my_print_when_done(task):
print('the output of task {} is {}'.format(task.id, task.output))
# we now submit several tasks at the same time:
for i in range(5):
task = FutureTask('/bin/date')
task.add_done_callback(my_print_when_done)
q.submit(task)
q.join()
Up to now our tasks run an executable that already exists in the machine running the worker, and have as result whatever output that command would print to the screen. For more complex tasks, we will need to specify the input and output files needed.
For example, we can redirect the output of the command to a file:
In [ ]:
task = FutureTask('/bin/date > my.date')
task.specify_output_file('my.date')
q.submit(task)
# We use task.result() to wait for the task completion.
task.result()
In [ ]:
# Once the task finished, we can read the file to see its contents:
if task.result():
with open('my.date') as f:
print(f.read())
Similarly, we can specify input files. Let's use the output of the previous task as the input of the next example:
In [ ]:
task = FutureTask('/bin/hexdump my.date > my.hexdump')
task.specify_input_file('my.date')
task.specify_output_file('my.hexdump')
q.submit(task)
In [ ]:
# We wait for the task results, and explore the output file:
if task.result():
with open('my.hexdump') as f:
print(f.read())
In [ ]:
task = FutureTask('./my-non-existent-executable > output')
task.specify_input_file('my-non-existent-executable')
q.submit(task)
try:
task.result()
except FutureTaskError as e:
print('task {} had an error: {}'.format(task.id, e))
In [ ]:
# Create a script that will be executed remotely
with open('my-python-script', 'w') as f:
f.write("""#! /usr/bin/env python
import sys
print('hello from version:\\n{}'.format(sys.version))
""")
import os
os.chmod('my-python-script', 0o755)
In [ ]:
# Run the environment using a conda environment
task = FutureTask('./my-python-script')
task.specify_input_file('my-python-script')
task.specify_runtime_env('conda', 'my-conda-env.tar.gz')
q.submit(task)
q.join()
print(task.output)
print("It took {} seconds".format((task.execute_cmd_finish - task.execute_cmd_start)/1e6))
In [ ]:
# Run the environment using a singularity image
task = FutureTask('./my-python-script')
task.specify_input_file('my-python-script')
task.specify_runtime_env('singularity', 'my-singularity.img')
q.submit(task)
q.join()
print(task.output)
print("It took {} seconds".format((task.execute_cmd_finish - task.execute_cmd_start)/1e6))
In [ ]: