work queue application basics

The module work_queue_futures allows you to execute your computations as a distributed system, that is, not only using the resources locally available in a computer, but also using remote resources you may have access. It is most effective when you have more work to be computed than resources available, and when that work can be split in independent tasks that run for more than 30 seconds, but less than a day.

A work_queue_futures application consists of two parts:

  • Master. The master is the part we will write in this notebook. The master is responsible to create the work to be done.
  • Workers. Workers execute the work created by the master, and they run in the remote resources. We don't need to write a worker program, since they are the same for all work_queue_futures applications.

In this notebook we will construct our first work_queue_futures master. The first need we need to do is to import the python objects we'll need:


In [ ]:
from work_queue_futures import WorkQueueFutures, FutureTask, FutureTaskError

Creating a queue

Next, we need to create a queue to which we will add tasks to be completed:


In [ ]:
q = WorkQueueFutures(name = 'my-wq-app', port = 0, local_worker = {'cores':1})

print('queue is accepting connections on port {}'.format(q.port))

We created the queue specifying three arguments:

  • name: A project name that the workers will use to find where the master is located so that they can connect to it.
  • port: Set here to 0 to use any available port. The master waits for worker connections listening to this port.
  • local_worker: Since we are testing our first master, we ask the queue to create a local worker for us. In real applications workers will run in remote resources. (See the last part of this notebook.)

Running simple tasks

Now we create our first task. We simply want to run the unix date command:


In [ ]:
task = FutureTask('/bin/date')
q.submit(task)

Once submitted, the task is assigned to an available worker. We can ask for its result with:


In [ ]:
if task.result():
    print(task.output)

Automatically run code when tasks finish

Note that if the task is not finished, then

task.result()

will block until the task is completed. This is inconvinient, as we do not have a way to know which task will finish next. Rather than wait for results on particular tasks, we can define functions to be executed when the task is completed. These functions (known as callbacks) receive the completed task as an argument. We can add as many callbacks as we want, and they are executed in the order they were added.

Once we added the callback, we submit the task as usual and call q.join(), which waits for all submitted tasks to complete.


In [ ]:
def my_print_when_done(task):
    print('the output of task {} is {}'.format(task.id, task.output))
    
# we now submit several tasks at the same time:
for i in range(5):
    task = FutureTask('/bin/date')
    task.add_done_callback(my_print_when_done)
    q.submit(task)

q.join()

Input and output files

Up to now our tasks run an executable that already exists in the machine running the worker, and have as result whatever output that command would print to the screen. For more complex tasks, we will need to specify the input and output files needed.

For example, we can redirect the output of the command to a file:


In [ ]:
task = FutureTask('/bin/date > my.date')
task.specify_output_file('my.date')
q.submit(task)

# We use task.result() to wait for the task completion.
task.result()

In [ ]:
# Once the task finished, we can read the file to see its contents:
if task.result():
    with open('my.date') as f:
        print(f.read())

Similarly, we can specify input files. Let's use the output of the previous task as the input of the next example:


In [ ]:
task = FutureTask('/bin/hexdump my.date > my.hexdump')
task.specify_input_file('my.date')
task.specify_output_file('my.hexdump')
q.submit(task)

In [ ]:
# We wait for the task results, and explore the output file:
if task.result():
    with open('my.hexdump') as f:
        print(f.read())

Dealing with errors

Sometimes tasks do not generate the expected output files, or input files are not available, or the task fails for some other reason. In such cases, task.result() will throw an exception:


In [ ]:
task = FutureTask('./my-non-existent-executable > output')
task.specify_input_file('my-non-existent-executable')
q.submit(task)

try:
    task.result()
except FutureTaskError as e:
    print('task {} had an error: {}'.format(task.id, e))

Specifying environments


In [ ]:
# Create a script that will be executed remotely

with open('my-python-script', 'w') as f:
    f.write("""#! /usr/bin/env python
import sys
print('hello from version:\\n{}'.format(sys.version))

""")
    
import os
os.chmod('my-python-script', 0o755)

In [ ]:
# Run the environment using a conda environment
task = FutureTask('./my-python-script')
task.specify_input_file('my-python-script')
task.specify_runtime_env('conda', 'my-conda-env.tar.gz')
q.submit(task)

q.join()
print(task.output)
print("It took {} seconds".format((task.execute_cmd_finish - task.execute_cmd_start)/1e6))

In [ ]:
# Run the environment using a singularity image
task = FutureTask('./my-python-script')
task.specify_input_file('my-python-script')
task.specify_runtime_env('singularity', 'my-singularity.img')
q.submit(task)

q.join()
print(task.output)
print("It took {} seconds".format((task.execute_cmd_finish - task.execute_cmd_start)/1e6))

In [ ]: