In [1]:
from __future__ import print_function

import numpy as np
from scipy.integrate import odeint
import multiprocessing
import sys
import functools

Embarrassingly Parallel Programming in Python

What is parallel programming?

Most computers these days can do multiple things at once, using multiple cores. This is great, as it allows your computer to do multiple things simultaneously, without tying up your computer.

But sometimes, those extra cores in our computer are going to waste. You're waiting for your stellar model to converge, and it's taking forever, but only 1 of 4 cores is actually doing any work.

It'd be great to get some of those other cores to help out when it's needed. That won't be all the time, but for slow running code, there are ways you can easily cut runtime by half, with only minor modifications

What is "Embarrassingly Parallel" Programming?

Basically, when you have lots of independent processes. This is the simplest version of parallel programming, but it also offers the most speedup.

Good example: Given the astrometry of a nearby star, I want to integrate its motion backwards 104 times, slightly varying its initial conditions each time. This will let me estimate how the uncertainty in its current position effects the uncertainty of how close it passed by the Sun.

Bad example: Given a comoving box, filled with Nbodies, where Nbodies=109, and cosmological initial conditions, what would you expect the local cosmic web to look like at z=0.

The good example is good because it's independent. You know the 104 initial conditions you want to test. The bad example is bad because it's so dependent. Each body depends on the location of every other body, and each time step depends on the previous timestep.

There are times when you'll need something more complicated, rather than "embarassingly easy," but even those tasks will often have steps that are "embarrasingly parallel," so this is a good place to start.

This talk is meant to:

  • introduce to parallel programming:
  • show when it can make your research go faster
  • show that it can be easy in python

This talk isn't meant to:

  • be an indepth discussion of the inner workings
  • be a guide on parallel computing on distributed clusters

Those topics are interesting, and could be important for your work, but I'm going to keep it simple for today. At the end I'll add some other readings I've found helpful.

Also, it should be noted that Python is an interpretted language, and behaves quite different than Fortan, C/C++, etc. Python has its own set of challenges, and the tools you use in compiled languages might not apply in Python.

Basic Concepts

Before we jump into multiprocessing, it'll be good to start with a bit of background.

The simplest time to incorporate paralle programming is within the map - filter - reduce model. It's neither perfect nor comprehensive, but it's a starting point.

Mapping is when you start with a set of inputs, apply a function to them to return a different set of outputs. In linear algebra, a matrix is something that maps an input vector to an output vector. (E.g. a rotation matrix can rotate [map] a vector along $\hat{x}$ to a vector along $\hat{y}$). For scalars, a function is what maps a scalar to a scalar. (E.g. cosmology calculators can map values of redshift, $z$ to luminosity distance, $D_L$.)

In Python a map is a function which can be applied to a large number of inputs element-wise. In this way it's not too different from a simple for loop, or a list comprehension.

A Python filter is then a function that allows you to filter out unwanted values. Maybe you want to remove observing data when there was bad data. Maybe you want to remove data for a certain type of object. A filter will take a list, and only return the values which pass your filtering function.

Finally, a reduce function allows you to combine all your data into a single number, rather than a list of elements. Maybe now that have a collection of good pixels, you want the total photometric flux and uncertainty. Maybe you want to integrate some value over your IMF. NOTE: reduce has been removed by Python 3. If you really need it, check out: https://docs.python.org/3.0/library/functools.html#functools.reduce

The archetypal program flow is this: you have some initial inputs listing what you want to process. Map does the work for each of those elements. Filter lets you mask / remove unwanted results. Reduce helps you combine it into a reduced value. As always, the process is much more fluid in reality.

Map - Filter - Reduce Example

Given a temperatures in Celcius, convert it to Fahrenheit, and get the average of all the readings below freezing.

NOTE: the serialized Map-Reduce framework behaves differently between Python2 and Python3. (E.g. reduce has been eliminated, map now uses "lazy" evaluation, etc.) There are probably better ways to do this in Python3; these work-arounds aren't ideal. But our focus is on parallel code, so this won't affect us much.


In [2]:
def getF(C):
    return C*9./5 + 32

def isFreezingF(F):
    if F < 32:
        return True
    else:
        return False # return 0 or False to be filtered out


Cs = [-9.2, 6.5, -17.3, 37.8]

Fs = list(map(getF, Cs))

Freezing = list(filter(isFreezingF, Fs))
num_freezing = len(list(Freezing))
if sys.version_info[0] is 3:
    # Python 3
    Fs = list(map(getF, Cs)) #force non-lazy evaluation
    Freezing = list(filter(isFreezingF, Fs)) #force non-lazy evaluation
    num_freezing = len(Freezing)
    FreezingAverage = functools.reduce(np.add, Freezing) / num_freezing
else:
    # Python != 3
    Fs = map(getF, Cs)
    Freezing = filter(isFreezingF, Fs)
    num_freezing = len(Freezing)
    FreezingAverage = functools.reduce(np.add, Freezing) / num_freezing




print("Temperatures in C: ", Cs)
print("Temperatures in F: ", Fs)
print("Temperatures which are below freezing: ", Freezing)
print("Average Temperature when it is freezing: ", FreezingAverage, "F")


Temperatures in C:  [-9.2, 6.5, -17.3, 37.8]
Temperatures in F:  [15.440000000000001, 43.7, 0.8599999999999959, 100.03999999999999]
Temperatures which are below freezing:  [15.440000000000001, 0.8599999999999959]
Average Temperature when it is freezing:  8.15 F

Let's try a "real" problem.

Given a potential: $$ \Phi = \ln r + V(\phi) = \ln r + \ln (1 - e \cos \phi) $$ with an eccentricity $ e = .3 $, and a dimensionless energy $ E = -1 $, try integrating a number of test particles for a range of initial radii $ x_0 < x_\mathrm{max} = \frac{\exp^{E}}{1 - e} \approx 0.52$.

I.e. integrate particles in the given potential with the initial conditions: $$ x_0 \in (0, x_\mathrm{max})$$ $$ y_0 = 0 $$ $$ \dot{x}_0 = 0 $$ $$ \dot{y}_0 = \sqrt{2 ( E - \Phi) - \dot{x_0}^2} $$

This sounds like an excellent time to use this map framework. We have a range of initial conditions, which have to get processed by the same potential with the same physics, and each integration is independent of all the rest.


In [3]:
eccentricity = .3
Energy       = -1
x_max        = np.exp(Energy) / (1 - eccentricity)

#padding to be added to avoid scientifically interesting,
#  but numerically problematic initial conditions
padding      = .1
x_0s         = np.linspace(padding, x_max - padding) 


def derivs(w, t, eccentricity):
    x,y, v_x, v_y = w
    
    r = np.sqrt(x**2 + y**2)
    denominator = r**2 - (eccentricity * x * r)

    # Calculate accelerations    
    a_x = -1*(x - eccentricity * r) / denominator
    a_y = -1*y / denominator
    
    return [v_x, v_y, a_x, a_y]

def integrate(x_0, eccentricity=eccentricity, hmax=.1, num_steps=100):
    x_0 = x_0
    y_0 = 0
    v_x_0 = 0
    v_y_0 = np.sqrt(2 *(Energy - (np.log(x_0 * (1-eccentricity)))))
    
    w_0 = [x_0, y_0, v_x_0, v_y_0]
    
    ts = np.linspace(0,100, num=num_steps)
    ws = odeint(derivs, w_0, ts, args = (eccentricity,), hmax=hmax)
    ## if this was real work, you'd want to save your results now
    ##   or return ws, or do something before you lose your results

Running these integrations -- For loop vs. Map

How do the methods compare?


In [4]:
%%time
## Serial example

for x_0 in x_0s:
    integrate(x_0)


CPU times: user 11 s, sys: 38.5 ms, total: 11 s
Wall time: 11 s

In [5]:
%%time
## Serial example

if sys.version_info[0] is 3:
    # Python 3
    result = list(map(integrate, x_0s)) # Force non-lazy evaluation
else:
    # Python != 3
    result = map(integrate, x_0s)


CPU times: user 11.1 s, sys: 33.4 ms, total: 11.1 s
Wall time: 11.2 s

A little better, but not by much... (Map becomes more useful for large datasets, which return a large set of results)

Let's try in parallel

First, check how many cores are available


In [6]:
print(multiprocessing.cpu_count())


4

Now use the multiprocessing.Pool version of map


In [7]:
%%time
## Parallel example

pool = multiprocessing.Pool(2) # initialize thread pool N threads
pool.map(integrate, x_0s)


CPU times: user 10.4 ms, sys: 7.85 ms, total: 18.2 ms
Wall time: 6.03 s

In [8]:
%%time
## Parallel example

pool = multiprocessing.Pool(4) # initialize thread pool N threads
pool.map(integrate, x_0s)


CPU times: user 11 ms, sys: 11.5 ms, total: 22.5 ms
Wall time: 5.13 s

In [9]:
%%time
## Parallel example

pool = multiprocessing.Pool(8) # initialize thread pool N threads
pool.map(integrate, x_0s)


CPU times: user 17.8 ms, sys: 19.6 ms, total: 37.4 ms
Wall time: 5.26 s

In [10]:
%%time
## Parallel example

pool = multiprocessing.Pool(32) # initialize thread pool N threads
pool.map(integrate, x_0s)


CPU times: user 41.8 ms, sys: 64 ms, total: 106 ms
Wall time: 5.09 s

What does this show?

First, we had to make very few changes to our code. It can be easy to improve existing projects.

Also, we see that pool.map is much faster than the normal map, and that as we use more threads, things speed up. But there's a limit. If our computer only has 4 cores available, creating more than 4 threads isn't very useful. In some cases it can actually add administrative overhead, while not adding any computing power.

Beyond simple maps

Using pool.map requires that you use exactly the same function on each input, without any keywords. Often times we want a bit more flexibility.

apply_async allows us to explicitly pass arbitrary inputs to arbitrary functions. This can be cumbersome, but what you lose in elegance, you gain in flexibility.

Let's try an example where we change the potential, while keeping the initial condition fixed:


In [11]:
%%time
## Serial example

eccentricities = [.3, .4, .5, .6]
result_list = []

for e in eccentricities:
    # Will start an integration, and wait for it to finish before moving on to the next one.
    result = integrate(.2, eccentricity=e, hmax = .0005, num_steps = 100000)
    result_list.append(result)


CPU times: user 9 s, sys: 79.6 ms, total: 9.08 s
Wall time: 9.04 s

In [12]:
%%time
## Parallel example

eccentricities = [.3, .4, .5, .6]
result_list = []
integration_list = []

pool = multiprocessing.Pool(4)

for e in eccentricities:
    # will start an integration, but continue through the for loop, *without waiting for integrations to finish*
    integration = pool.apply_async(integrate, [.2], {"eccentricity" : e, "hmax" : .0005, "num_steps":100000} )
    integration_list.append(integration)

for integration in integration_list:
    # .get() forces python to wait until the process completes
    #   Otherwise Python will continue going through your file, without waiting for your results
    #   If done incorrectly, this can cause a "race condition" where you incorrectly assume the process is complete
    result = integration.get()
    result_list.append(result)
    
pool.close()


CPU times: user 15.7 ms, sys: 14 ms, total: 29.7 ms
Wall time: 3.95 s

Why aren't things speeding up as fast as I'd expect?

Amdahl's Law

At first, going from 1 core to 4 cores seems like it should give a 4x improvement, but in practice we never fully achieve that efficiency.

Amdahl's Law basically states that parallelizing a certain aspect of your project will only save you as much time as that part would have taken. If your project takes 1 hour to run, and you parallelize a portion that only took 30 seconds of that hour, your code is still going to take ~1 hour to run.

Parallelization also tends to add overhead. If done right, this overhead will be worth the overall speedup. But things will work best if you make sure parallel processing is actually appropriate for what you're doing.

Examples of when to stay serial

  • when I have lots of small steps which all depend on each other
  • when I'm doing something small + simple which runs ~instantly (will the speed up be worth the extra time it takes to program? Is the main source of slowness simply due to the overhead of python trying to read in my file, rather than actually running it?)
  • when I'm doing something memory heavy (APLpy/Montage eats up all my memory; if I try to process 4 images all at once, I'll need 4x the memory)
  • complicated Input/Output (read/write) operations

Further Reading

Hat tip to http://chriskiehl.com/article/parallelism-in-one-line/, for a blog post that helped me get started with this.

In C/C++/Fortran you can apply a similar parallel programming model called OpenMP. Things work a little differently, but it's a good starting place for injecting easy parallelizations. It requires shared memory though; things get more complicated when you want multiple computers to work together, each with their own memory.

For applying similar concepts in C, I've liked this overview: http://gribblelab.org/CBootcamp/A2_Parallel_Programming_in_C.html. It's reasonably concise, while covering a lot of big topics, and giving a nubmer of simple working examples.

(Most up-to-date version can be found at: https://github.com/egentry/AstroComputingSeminar_Parallel)