Parsl Tutorial

Parsl is a native Python library that allows you to write functions that execute in parallel and tie them together with dependencies to create workflows. Parsl wraps Python functions as "Apps" using the @App decorator. Decorated functions can run in parallel when all their inputs are ready.

For more comprehensive documentation and examples, please refer our documentation


In [3]:
# Import Parsl
import parsl
from parsl import *
print(parsl.__version__)


0.2.2-alpha

DataFlowKernal

Parsl's DataFlowKernel acts as an abstraction layer over any pool of execution resources (e.g., clusters, clouds, threads).

In this example we use a pool of threads. to facilitate local parallel exectuion.


In [2]:
# Let's create a pool of threads to execute our functions
workers = ThreadPoolExecutor(max_workers=4)

# We pass the workers to the DataFlowKernel which will execute our Apps over the workers.
dfk = DataFlowKernel(executors=[workers])


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-2-af5a81472b0c> in <module>()
      3 
      4 # We pass the workers to the DataFlowKernel which will execute our Apps over the workers.
----> 5 dfk = DataFlowKernel(executors=[workers])

TypeError: __init__() got an unexpected keyword argument 'executors'

Hello World App

As a first example let's define a simple Python function that returns the string 'Hello World!'. This function is made into a Parsl App using the @App decorator. The decorator specifies the type of App ('python'|'bash') and the DataFlowKernel object as arguments.


In [ ]:
# Here we define our first App function, a simple python app that returns a string
@App('python', dfk)
def hello ():
    return 'Hello World!'

app_future = hello()

Futures

Unlike a regular Python function, when an App is called it returns an AppFuture. Futures act as a proxy to the results (or exceptions) that the App will produce once its execution completes. You can retrieve the status of a future object with future.done() or you can ask it to wait for its result with future.result(). It is important to note that while the done() call provides the current status, the result() call blocks execution until the App is complete and the result is available.


In [ ]:
# Check status 
print("Status: ", app_future.done())

# Get result
print("Result: ", app_future.result())

Data Dependencies

Futures can be passed between Apps. When a future created by one App is passed as an input to another, an implicit data dependency is created. Parsl will manage the execution of these Apps by ensuring they are executed when dependencies are resolved.

Let's see an example of this using the monte-carlo method to calculate pi. We call 3 iterations of this slow function, and take the average. The dependency chain looks like this :

App Calls    pi()  pi()   pi()
              \     |     /
Futures        a    b    c
                \   |   /
App Call        avg_three()
                    |
Future            avg_pi

In [ ]:
@App('python', dfk)
def pi(total):
    # App functions have to import modules they will use.
    import random     
    # Set the size of the box (edge length) in which we drop random points
    edge_length = 10000
    center = edge_length / 2
    c2  = center ** 2
    count = 0
    
    for i in range(total):
        # Drop a random point in the box.
        x,y = random.randint(1, edge_length),random.randint(1, edge_length)
        # Count points within the circle
        if (x-center)**2 + (y-center)**2 < c2:
            count += 1
    
    return (count*4/total)

@App('python', dfk)
def avg_three(a,b,c):
    return (a+b+c)/3

Parallelism

Here we call the function pi() three times, each of which run independently in parallel. We then call another App avg_three() with the three futures that were returned from the pi() calls. Since avg_three() is also a parsl App, it returns a future immediately, but defers execution (blocks) until all the futures passed to it as inputs have been resolved.


In [ ]:
a, b, c = pi(10**6), pi(10**6), pi(10**6)
avg_pi  = avg_three(a, b, c)

In [ ]:
# Print the results
print("A: {0:.5f} B: {1:.5f} C: {2:.5f}".format(a.result(), b.result(), c.result()))
print("Average: {0:.5f}".format(avg_pi.result()))