MCMC simulations are good candidates for parallelisation, especially when trying to
produce enough samples to get smooth approximations of the posterior. The emcee program comes with a threads
option, but it doesn't really speed up many typical use cases.
To explain this I refer to to emcee docs:
It is also important to note that the multiprocessing module works by spawning a large number of new python processes and running the code in isolation within those processes. This means that there is a significant amount of overhead involved at each step of the parallelization process. With this in mind, it is not surprising that running a simple problem like the quickstart example in parallel will run much slower than the equivalent serial code. If your log-probability function takes a significant amount of time (> 1 second or so) to compute then using the parallel sampler actually provides significant speed gains.
In addition there are issues around pickling functions which would require editing the emcee code
to use dill
: something I don't really want to do. Instead, we can turn to the IPython parallel computing tools. A typical use case would be to get each core on your personal computer to run the production chain, then combine the results from all the parallel results.
In this notebook I will use the quickstart example from emcee and show how to do this.
Firstly I have taken the quickstart example and stripped out all the plot creation to leave just the minimum working example for testing. If you want to know more about the example go read the quickstart.
In [1]:
%matplotlib inline
from __future__ import print_function
import emcee
import triangle
import numpy as np
import scipy.optimize as op
import matplotlib.pyplot as pl
from matplotlib.ticker import MaxNLocator
# Reproducible results!
np.random.seed(123)
# Choose the "true" parameters.
m_true = -0.9594
b_true = 4.294
f_true = 0.534
# Generate some synthetic data from the model.
N = 50
x = np.sort(10*np.random.rand(N))
yerr = 0.1+0.5*np.random.rand(N)
y = m_true*x+b_true
y += np.abs(f_true*y) * np.random.randn(N)
y += yerr * np.random.randn(N)
# Define the probability function as likelihood * prior.
def lnprior(theta):
m, b, lnf = theta
if -5.0 < m < 0.5 and 0.0 < b < 10.0 and -10.0 < lnf < 1.0:
return 0.0
return -np.inf
def lnlike(theta, x, y, yerr):
m, b, lnf = theta
model = m * x + b
inv_sigma2 = 1.0/(yerr**2 + model**2*np.exp(2*lnf))
return -0.5*(np.sum((y-model)**2*inv_sigma2 - np.log(inv_sigma2)))
def lnprob(theta, x, y, yerr):
lp = lnprior(theta)
if not np.isfinite(lp):
return -np.inf
return lp + lnlike(theta, x, y, yerr)
# Find the maximum likelihood value.
chi2 = lambda *args: -2 * lnlike(*args)
result = op.minimize(chi2, [m_true, b_true, np.log(f_true)], args=(x, y, yerr))
# Set up the sampler.
ndim, nwalkers = 3, 100
pos = [result["x"] + 1e-4*np.random.randn(ndim) for i in range(nwalkers)]
sampler = emcee.EnsembleSampler(nwalkers, ndim, lnprob, args=(x, y, yerr))
The sampler
objects contains the run_mcmc
method which will run the simulations. First let's define the number of points and a suitable burnin period:
In [2]:
npoints = 1000
burnin = 100
In [3]:
sampler.reset()
out = sampler.run_mcmc(pos, npoints, rstate0=np.random.get_state())
samples = sampler.chain[:, burnin:, :].reshape((-1, ndim))
print(samples.shape)
fig = triangle.corner(samples)
plt.show()
Now we know what the results should look like, we will show first how to parallelise the simulations and then check that the results are consistent with the normal method.
Here we create a function run
which takes a view, and some arguments which should be fairly readable. The only important point here is that we take the total number of poinst we want npoints
and divide it among the number of
cores available. If the total number of points doesn't factor nicely among the threads then we may loose or gain a point here and there.
In [4]:
import os,sys,time
import numpy as np
from IPython import parallel
rc = parallel.Client()
dview = rc[:]
dview.block = True
dview.use_dill() # Standard pickle fails with functions
local_dict = {'lnprior': lnprior,
'lnlike': lnlike,
'lnpron': lnprob,
'ndim': ndim}
def run(dview, pos, sampler, local_dict, npoints, burnin):
for key, val in local_dict.items():
dview[key] = val
dview['burnin'] = burnin
dview["npernode"] = int(npoints/len(dview))
dview['sampler'] = sampler
dview['pos'] = pos
dview['lnprior'] = lnprior
dview.execute("sampler.run_mcmc(pos, npernode, rstate0=np.random.get_state())\n"+
"samples = sampler.chain[:, burnin:, :].reshape((-1, ndim))")
return dview.gather("samples")
In [5]:
sampler.reset()
samples = run(dview, pos, sampler, local_dict, npoints, burnin)
print(samples.shape)
fig = triangle.corner(samples)
plt.show()
In [6]:
%%timeit
sampler.reset()
out = sampler.run_mcmc(pos, npoints, rstate0=np.random.get_state())
samples = sampler.chain[:, burnin:, :].reshape((-1, ndim))
In [7]:
%%timeit
sampler.reset()
samples = run(dview, pos, sampler, local_dict, npoints, burnin)
In [8]:
sampler_threads = emcee.EnsembleSampler(nwalkers, ndim, lnprob, args=(x, y, yerr), threads=4)
%timeit out = sampler_threads.run_mcmc(pos, npoints, rstate0=np.random.get_state())
This gives us a significant speed up, with very little effort. This solution was produced when attending the Next-Generation Computer modelling course hosted by the University of Southampton and in particular the IPython notebook sessions given by Min RK a core developer. Thanks to him for some hints.
Since first writing this I have found that the author of emcee
was well aware of this trick and had already
written an example using a slightly different method. In this method we initialise the sampler and give the pool
object the view of the parallel engines. However, it isn't clear to me at the moment how to split the runs between the engines and recover the output. Running with something like:
In [ ]:
from IPython.parallel import Client
c = Client()
view = c[:]
view.push({"lnprob": lnprob, "lnprior": lnprior, "lnlike": lnlike})
view.execute("import numpy as np")
view.push({"lnprob": lnprob})
view.execute("import numpy as np")
sampler_poolview = emcee.EnsembleSampler(nwalkers, ndim, lnprob, args=(x, y, yerr), pool=view)
out = sampler_poolview.run_mcmc(pos, npoints)
samples = sampler.chain[:, burnin:, :].reshape((-1, ndim))
print(samples.shape)
fig = triangle.corner(samples)
Just runs npoints
on all the engines, but I don't see how to recover the result from the other engines..If you know then please do let me know!
In [ ]: