In [1]:
from __future__ import print_function
import numpy as np
from scipy.integrate import odeint
import multiprocessing
import sys
import functools
Most computers these days can do multiple things at once, using multiple cores. This is great, as it allows your computer to do multiple things simultaneously, without tying up your computer.
But sometimes, those extra cores in our computer are going to waste. You're waiting for your stellar model to converge, and it's taking forever, but only 1 of 4 cores is actually doing any work.
It'd be great to get some of those other cores to help out when it's needed. That won't be all the time, but for slow running code, there are ways you can easily cut runtime by half, with only minor modifications
Basically, when you have lots of independent processes. This is the simplest version of parallel programming, but it also offers the most speedup.
Good example: Given the astrometry of a nearby star, I want to integrate its motion backwards 104 times, slightly varying its initial conditions each time. This will let me estimate how the uncertainty in its current position effects the uncertainty of how close it passed by the Sun.
Bad example: Given a comoving box, filled with Nbodies, where Nbodies=109, and cosmological initial conditions, what would you expect the local cosmic web to look like at z=0.
The good example is good because it's independent. You know the 104 initial conditions you want to test. The bad example is bad because it's so dependent. Each body depends on the location of every other body, and each time step depends on the previous timestep.
There are times when you'll need something more complicated, rather than "embarassingly easy," but even those tasks will often have steps that are "embarrasingly parallel," so this is a good place to start.
Those topics are interesting, and could be important for your work, but I'm going to keep it simple for today. At the end I'll add some other readings I've found helpful.
Also, it should be noted that Python is an interpretted language, and behaves quite different than Fortan, C/C++, etc. Python has its own set of challenges, and the tools you use in compiled languages might not apply in Python.
Before we jump into multiprocessing, it'll be good to start with a bit of background.
The simplest time to incorporate paralle programming is within the map - filter - reduce model. It's neither perfect nor comprehensive, but it's a starting point.
Mapping is when you start with a set of inputs, apply a function to them to return a different set of outputs. In linear algebra, a matrix is something that maps an input vector to an output vector. (E.g. a rotation matrix can rotate [map] a vector along $\hat{x}$ to a vector along $\hat{y}$). For scalars, a function is what maps a scalar to a scalar. (E.g. cosmology calculators can map values of redshift, $z$ to luminosity distance, $D_L$.)
In Python a map is a function which can be applied to a large number of inputs element-wise. In this way it's not too different from a simple for loop, or a list comprehension.
A Python filter is then a function that allows you to filter out unwanted values. Maybe you want to remove observing data when there was bad data. Maybe you want to remove data for a certain type of object. A filter will take a list, and only return the values which pass your filtering function.
Finally, a reduce function allows you to combine all your data into a single number, rather than a list of elements. Maybe now that have a collection of good pixels, you want the total photometric flux and uncertainty. Maybe you want to integrate some value over your IMF.
NOTE: reduce has been removed by Python 3. If you really need it, check out: https://docs.python.org/3.0/library/functools.html#functools.reduce
The archetypal program flow is this: you have some initial inputs listing what you want to process. Map does the work for each of those elements. Filter lets you mask / remove unwanted results. Reduce helps you combine it into a reduced value. As always, the process is much more fluid in reality.
Given a temperatures in Celcius, convert it to Fahrenheit, and get the average of all the readings below freezing.
NOTE: the serialized Map-Reduce framework behaves differently between Python2 and Python3. (E.g. reduce has been eliminated, map now uses "lazy" evaluation, etc.) There are probably better ways to do this in Python3; these work-arounds aren't ideal. But our focus is on parallel code, so this won't affect us much.
In [2]:
def getF(C):
return C*9./5 + 32
def isFreezingF(F):
if F < 32:
return True
else:
return False # return 0 or False to be filtered out
Cs = [-9.2, 6.5, -17.3, 37.8]
Fs = list(map(getF, Cs))
Freezing = list(filter(isFreezingF, Fs))
num_freezing = len(list(Freezing))
if sys.version_info[0] is 3:
# Python 3
Fs = list(map(getF, Cs)) #force non-lazy evaluation
Freezing = list(filter(isFreezingF, Fs)) #force non-lazy evaluation
num_freezing = len(Freezing)
FreezingAverage = functools.reduce(np.add, Freezing) / num_freezing
else:
# Python != 3
Fs = map(getF, Cs)
Freezing = filter(isFreezingF, Fs)
num_freezing = len(Freezing)
FreezingAverage = functools.reduce(np.add, Freezing) / num_freezing
print("Temperatures in C: ", Cs)
print("Temperatures in F: ", Fs)
print("Temperatures which are below freezing: ", Freezing)
print("Average Temperature when it is freezing: ", FreezingAverage, "F")
Given a potential: $$ \Phi = \ln r + V(\phi) = \ln r + \ln (1 - e \cos \phi) $$ with an eccentricity $ e = .3 $, and a dimensionless energy $ E = -1 $, try integrating a number of test particles for a range of initial radii $ x_0 < x_\mathrm{max} = \frac{\exp^{E}}{1 - e} \approx 0.52$.
I.e. integrate particles in the given potential with the initial conditions: $$ x_0 \in (0, x_\mathrm{max})$$ $$ y_0 = 0 $$ $$ \dot{x}_0 = 0 $$ $$ \dot{y}_0 = \sqrt{2 ( E - \Phi) - \dot{x_0}^2} $$
This sounds like an excellent time to use this map framework. We have a range of initial conditions, which have to get processed by the same potential with the same physics, and each integration is independent of all the rest.
In [3]:
eccentricity = .3
Energy = -1
x_max = np.exp(Energy) / (1 - eccentricity)
#padding to be added to avoid scientifically interesting,
# but numerically problematic initial conditions
padding = .1
x_0s = np.linspace(padding, x_max - padding)
def derivs(w, t, eccentricity):
x,y, v_x, v_y = w
r = np.sqrt(x**2 + y**2)
denominator = r**2 - (eccentricity * x * r)
# Calculate accelerations
a_x = -1*(x - eccentricity * r) / denominator
a_y = -1*y / denominator
return [v_x, v_y, a_x, a_y]
def integrate(x_0, eccentricity=eccentricity, hmax=.1, num_steps=100):
x_0 = x_0
y_0 = 0
v_x_0 = 0
v_y_0 = np.sqrt(2 *(Energy - (np.log(x_0 * (1-eccentricity)))))
w_0 = [x_0, y_0, v_x_0, v_y_0]
ts = np.linspace(0,100, num=num_steps)
ws = odeint(derivs, w_0, ts, args = (eccentricity,), hmax=hmax)
## if this was real work, you'd want to save your results now
## or return ws, or do something before you lose your results
In [4]:
%%time
## Serial example
for x_0 in x_0s:
integrate(x_0)
In [5]:
%%time
## Serial example
if sys.version_info[0] is 3:
# Python 3
result = list(map(integrate, x_0s)) # Force non-lazy evaluation
else:
# Python != 3
result = map(integrate, x_0s)
In [6]:
print(multiprocessing.cpu_count())
Now use the multiprocessing.Pool version of map
In [7]:
%%time
## Parallel example
pool = multiprocessing.Pool(2) # initialize thread pool N threads
pool.map(integrate, x_0s)
In [8]:
%%time
## Parallel example
pool = multiprocessing.Pool(4) # initialize thread pool N threads
pool.map(integrate, x_0s)
In [9]:
%%time
## Parallel example
pool = multiprocessing.Pool(8) # initialize thread pool N threads
pool.map(integrate, x_0s)
In [10]:
%%time
## Parallel example
pool = multiprocessing.Pool(32) # initialize thread pool N threads
pool.map(integrate, x_0s)
First, we had to make very few changes to our code. It can be easy to improve existing projects.
Also, we see that pool.map is much faster than the normal map, and that as we use more threads, things speed up. But there's a limit. If our computer only has 4 cores available, creating more than 4 threads isn't very useful. In some cases it can actually add administrative overhead, while not adding any computing power.
Using pool.map requires that you use exactly the same function on each input, without any keywords. Often times we want a bit more flexibility.
apply_async allows us to explicitly pass arbitrary inputs to arbitrary functions. This can be cumbersome, but what you lose in elegance, you gain in flexibility.
In [11]:
%%time
## Serial example
eccentricities = [.3, .4, .5, .6]
result_list = []
for e in eccentricities:
# Will start an integration, and wait for it to finish before moving on to the next one.
result = integrate(.2, eccentricity=e, hmax = .0005, num_steps = 100000)
result_list.append(result)
In [12]:
%%time
## Parallel example
eccentricities = [.3, .4, .5, .6]
result_list = []
integration_list = []
pool = multiprocessing.Pool(4)
for e in eccentricities:
# will start an integration, but continue through the for loop, *without waiting for integrations to finish*
integration = pool.apply_async(integrate, [.2], {"eccentricity" : e, "hmax" : .0005, "num_steps":100000} )
integration_list.append(integration)
for integration in integration_list:
# .get() forces python to wait until the process completes
# Otherwise Python will continue going through your file, without waiting for your results
# If done incorrectly, this can cause a "race condition" where you incorrectly assume the process is complete
result = integration.get()
result_list.append(result)
pool.close()
At first, going from 1 core to 4 cores seems like it should give a 4x improvement, but in practice we never fully achieve that efficiency.
Amdahl's Law basically states that parallelizing a certain aspect of your project will only save you as much time as that part would have taken. If your project takes 1 hour to run, and you parallelize a portion that only took 30 seconds of that hour, your code is still going to take ~1 hour to run.
Parallelization also tends to add overhead. If done right, this overhead will be worth the overall speedup. But things will work best if you make sure parallel processing is actually appropriate for what you're doing.
APLpy/Montage eats up all my memory; if I try to process 4 images all at once, I'll need 4x the memory)Hat tip to http://chriskiehl.com/article/parallelism-in-one-line/, for a blog post that helped me get started with this.
In C/C++/Fortran you can apply a similar parallel programming model called OpenMP. Things work a little differently, but it's a good starting place for injecting easy parallelizations. It requires shared memory though; things get more complicated when you want multiple computers to work together, each with their own memory.
For applying similar concepts in C, I've liked this overview: http://gribblelab.org/CBootcamp/A2_Parallel_Programming_in_C.html. It's reasonably concise, while covering a lot of big topics, and giving a nubmer of simple working examples.
(Most up-to-date version can be found at: https://github.com/egentry/AstroComputingSeminar_Parallel)