The main goal of this notebook is to provide some benchmarks for running FiPy in parallel on a workstation as well as on a larger cluster (the cluster work has now been moved to an alternative notebook in this directory). The first part of the notebook demonstrates using FiPy in parallel with IPython's parallel architecture. Although we have tested FiPy in the past on up to 64 parallel processes, these results don't seem to be available on the web anywhere. I hope to address the lack of data for FiPy in parallel with this notebook.
FiPy uses both mpi4py
and PyTrilinos
, which both require that the
IPython parallel engines are launched with MPI support using
ipcluster start -n 4 --engines=MPIEngineSetLauncher
To test that FiPy is working correctly in IPython, run the following.
In [1]:
import pandas as pd
In [2]:
from IPython.parallel import Client
c = Client()
view = c[:]
view.block = True
view.activate() # enable magics
%px import fipy as fp
%px procID = fp.parallelComm.procID
%px Nproc = fp.parallelComm.Nproc
print view['procID']
print view['Nproc']
The procID
s should be different.
Below we use a simple example to test FiPy in parallel. Just a
diffusion problem on a 3D grid. The domain is initialized with a
random field. The GmshGrid3D
class is used to obtain an optimal
partition using METIS. The regular Grid3D
class only uses slices,
which is highly sub-optimal.
In [3]:
def setup(N):
import fipy as fp
import numpy as np
np.random.seed(1)
L = 1.
m = fp.GmshGrid3D(nx=N, ny=N, nz=N, dx=L / N, dy=L / N, dz=L / N)
v0 = 1.
for dim in range(3):
x = np.linspace(0., L, N)
fx = np.sum(np.array([np.sin(2 * x * np.pi * i * np.random.random()) / i for i in range(1, N)]), axis=0)
v0 = np.outer(v0, fx).flatten()
v = fp.CellVariable(mesh=m)
v0 = np.resize(v0, len(v)) ## Gmsh doesn't always give us the correct sized grid!
eqn = fp.TransientTerm(1e-3) == fp.DiffusionTerm()
v[:] = v0.copy()
return eqn, v, v0
view['setup'] = setup
In [4]:
%%px
eqn, v, v0 = setup(60)
eqn.solve(v, dt=1.)
%timeit v[:] = v0.copy(); eqn.solve(v, dt=1.)
The above cell executes two time steps, but only the second step is timed. FiPy does a lot of memory allocation on the first step so the first step is not typical of the run times for subsequent steps. For most "real" simulations, the setup time is inconsequential compared to running hundreds of time steps. In parallel on 4 parallel processes the time taken my laptop is 3.07 s. Let's see how fast this is in serial.
In [5]:
eqn, v, v0 = setup(60)
eqn.solve(v, dt=1.)
%timeit v[:] = v0.copy(); eqn.solve(v, dt=1.)
It's not much faster in parallel than in serial. The seemingly bad parallel scaling is because FiPy is not consistent in its choice of solver for different solver suites. In serial, FiPy used PySparse which uses PCG, while in parallel Trilinos is used and its default is GMRES, which is much slower for this problem.
In [176]:
%px print eqn.getDefaultSolver().__class__
print eqn.getDefaultSolver().__class__
An added complication is that each solver is using a different preconditioner. Instead of letting FiPy choose its default solver, let's instatiate the solver we want.
In [7]:
import fipy.solvers.pysparse as solvers
solver = solvers.linearPCGSolver.LinearPCGSolver(precon=None, iterations=500, tolerance=1e-15)
eqn.solve(v, solver=solver, dt=1.)
%timeit v[:] = v0.copy(); eqn.solve(v, solver=solver, dt=1.)
In [8]:
import fipy.solvers.trilinos as solvers
solver = solvers.linearPCGSolver.LinearPCGSolver(precon=None, iterations=500, tolerance=1e-15)
eqn.solve(v, solver=solver, dt=1.)
%timeit v[:] = v0.copy(); eqn.solve(v, solver=solver, dt=1.)
The time taken with a Trilinos solver is now more in line with the PySparse solvers. Although the stopping criterion may still not be consistent. Let's try this in parallel.
In [9]:
%%px
import fipy.solvers.trilinos as solvers
solver = solvers.linearPCGSolver.LinearPCGSolver(precon=None, iterations=500, tolerance=1e-15)
eqn.solve(v, solver=solver, dt=1.)
%timeit v[:] = v0.copy(); eqn.solve(v, dt=1., solver=solver)
We now see some speed up with 4 processes, a speed up of 2.8. Not great, but at least it's in the right direction. One thing we can do is compare the speed up for different sized meshes. In order to capture the timing output, we will switch over to using the timeit
module directly instead of the magic.
In [129]:
setup_str = '''
import fipy as fp
import numpy as np
np.random.seed(1)
L = 1.
N = {N:d}
m = fp.GmshGrid3D(nx=N, ny=N, nz=N, dx=L / N, dy=L / N, dz=L / N)
v0 = 1.
for dim in range(3):
x = np.linspace(0., L, N)
fx = np.sum(np.array([np.sin(2 * x * np.pi * i * np.random.random()) / i for i in range(1, N)]), axis=0)
v0 = np.outer(v0, fx).flatten()
v = fp.CellVariable(mesh=m)
v0 = np.resize(v0, len(v)) ## Gmsh doesn't always give us the correct sized grid!
eqn = fp.TransientTerm(1e-3) == fp.DiffusionTerm()
v[:] = v0.copy()
import fipy.solvers.{suite} as solvers
solver = solvers.linearPCGSolver.LinearPCGSolver(precon=None, iterations={iterations}, tolerance=1e-100)
eqn.solve(v, dt=1., solver=solver)
v[:] = v0.copy()
'''
timeit_str = '''
eqn.solve(v, dt=1., solver=solver)
fp.parallelComm.Barrier()
'''
In [4]:
import timeit
import itertools
%px import timeit
attempts = 3
view['timeit_str'] = timeit_str
view['attempts'] = attempts
view['setup_str'] = setup_str
runtimes = pd.DataFrame(columns=['iterations', 'N', 'mode', 'suite', 'run times'])
Ns = (10, 20, 30, 40, 50, 60, 70, 80)
iterations_ = (100, 200, 400, 800)
modes = ('serial', 'parallel')
suites = ('pysparse', 'trilinos')
for N, iterations, mode, suite in itertools.product(Ns, iterations_, modes, suites):
if mode == 'serial':
timer = timeit.Timer(timeit_str, setup=setup_str.format(N=N, suite=suite, iterations=iterations))
times = timer.repeat(attempts, 1)
runtime = min(times)
elif suite == 'trilinos':
view['N'] = N
view['iterations'] = iterations
view['suite'] = suite
%px timer = timeit.Timer(timeit_str, setup=setup_str.format(N=N, suite=suite, iterations=iterations))
%px times = timer.repeat(attempts, 1)
runtime = max(np.array(view['times']).min(axis=1))
else:
runtime = None
runtimes = runtimes.append({'N' : N, 'iterations' : iterations, 'mode' : mode, 'suite' : suite, 'run time' : runtime})
In [29]:
runtimes.to_csv('laptop_runtimes.csv')
In [1]:
import pandas as pd
df = pd.read_csv('laptop_runtimes.csv', index_col=0)
iterations_ = (100, 200, 400, 800)
In [2]:
f = plt.figure(figsize=(15,5))
ax1 = f.add_subplot(131)
ax2 = f.add_subplot(132)
ax3 = f.add_subplot(133)
for iters in iterations_:
parallel_df = df[(df['iterations'] == iters) & (df['mode'] == 'parallel') & (df['suite'] == 'trilinos')].sort('N')
serial_df = df[(df['iterations'] == iters) & (df['mode'] == 'serial') & (df['suite'] == 'trilinos')].sort('N')
pysparse_df = df[(df['iterations'] == iters) & (df['mode'] == 'serial') & (df['suite'] == 'pysparse')].sort('N')
Ns = parallel_df['N']
parallelTimes = np.array(parallel_df['run times'])
serialTimes = np.array(serial_df['run times'])
pysparseTimes = np.array(pysparse_df['run times'])
ax1.semilogx(Ns**3, serialTimes / parallelTimes, label=iters)
ax2.semilogx(Ns**3, pysparseTimes / serialTimes, label=iters)
if iters == 800:
ax3.loglog(Ns**3, parallelTimes, label='parallel Trilios')
ax3.loglog(Ns**3, serialTimes, label='serial Trilinos')
ax3.loglog(Ns**3, pysparseTimes, label='PySparse')
ax1.set_ylabel("Speed Up")
ax2.set_ylabel("Speed Up")
ax1.set_xlabel("$N^3$")
ax2.set_xlabel("$N^3$")
ax1.set_title("Speed up, serial / parallel")
ax2.set_title("Speed up, PySparse / Trilinos (serial)")
ax3.set_title("Raw Times, 800 iterations")
ax3.set_xlabel("$N^3$")
ax3.set_ylabel("Time (s)")
ax1.legend(loc='upper left')
ax2.legend(loc='upper left')
l = ax3.legend(loc='upper left')
The graph on the left is the speed up when switching from serial to parallel. The center graph is the speed up when switching from PySparse to Trilinos. The graph on the right shows the raw run times with an iteration count of 800. The x-axis is the number of cells in the domain. The best parallel speed up is about 2.7 and occurs for 800 iterations at ~30000 cells. The left graph shows that there is a drop off in parallel speed up as the domain size increases. This is consistent with a probable saturation of some aspect of the parallel communication. This has nothing to do with memory as the raw run times are not dramatically increased for larger numbers of cells.
In this example the only communication occurs during the solver iterations. This fact supports the decreased speed up for larger system sizes as the iteration count increases. The communication requirement is probably a linear function of the number of iterations.
An important consideration is the proportion of time spent in the
solver. I ran a few quick tests to check this (not shown here). I used
a system size of N=60
and checked the run times for different
numbers of iterations. A fairly consistent time per iteration count of
0.00265 (s) was observed for the parallel simulations and 0.0035 (s)
for the serial simulations. This is independent of the number of
iterations. Outside of the solver, the speed up was over 2$\times$
faster.
The previous testing used a fixed process count of
mpi4py
and PyTrilinos
there doesn't seem
to be a way to decrease the number of parallel engines interactively
without restarting the cluster. Even if we select a subview of the
parallel engines (with view = Client()[:3]
for example), FiPy is still
using mpi4py
's value of Nproc=4
. This issue needs to be ironed out
in FiPy. It should be possible to modify FiPy so that some parallel
nodes have zero elements. This already occurs for small test cases in
parallel.Due to this issue, the parallel runs are executed externally using !mpirun
. The following cell creates a script called fipy_timing.py
, which takes the control parameters as command line arguments
In [10]:
%%writefile fipy_timing.py
"""
Usage: fipy_timing.py [-h] [--N N] [--iterations <iterations>] [--suite <suite>]
Options:
-h --help Show this screen.
--N=N Cell number is N^3 [default: 30]
--iterations=<iterations> Number of iterations [default: 100]
--suite=<suite> Solver suite [default: trilinos]
"""
from docopt import docopt
import timeit
import numpy as np
import fipy as fp
arguments = docopt(__doc__, version='Run FiPy timing')
N = int(arguments['--N'])
iterations = int(arguments['--iterations'])
suite = arguments['--suite']
attempts = 3
setup_str = '''
import fipy as fp
import numpy as np
np.random.seed(1)
L = 1.
N = {N:d}
m = fp.GmshGrid3D(nx=N, ny=N, nz=N, dx=L / N, dy=L / N, dz=L / N)
v0 = 1.
for dim in range(3):
x = np.linspace(0., L, N)
fx = np.sum(np.array([np.sin(2 * x * np.pi * i * np.random.random()) / i for i in range(1, N)]), axis=0)
v0 = np.outer(v0, fx).flatten()
v = fp.CellVariable(mesh=m)
v0 = np.resize(v0, len(v)) ## Gmsh doesn't always give us the correct sized grid!
eqn = fp.TransientTerm(1e-3) == fp.DiffusionTerm()
v[:] = v0.copy()
import fipy.solvers.{suite} as solvers
solver = solvers.linearPCGSolver.LinearPCGSolver(precon=None, iterations={iterations}, tolerance=1e-100)
eqn.solve(v, dt=1., solver=solver)
v[:] = v0.copy()
'''
timeit_str = '''
eqn.solve(v, dt=1., solver=solver)
fp.parallelComm.Barrier()
'''
timer = timeit.Timer(timeit_str, setup=setup_str.format(N=N, suite=suite, iterations=iterations))
times = timer.repeat(attempts, 1)
if fp.parallelComm.procID == 0:
print min(times)
The following cell runs fipy_timing.py
for different control parameters and collects the results into a data frame.
In [263]:
import itertools
runtimes = pd.DataFrame(columns=['iterations', 'N', 'np', 'run time', 'suite'])
Ns = (10, 20, 30, 40, 50, 60, 70, 80)
iterations_ = (100, 200, 400, 800)
nps = (1, 2, 3, 4, 5, 6, 7, 8)
suites = ('trilinos', 'pysparse')
tmpfile = 'tmp.txt'
for N, iterations, np_, suite in itertools.product(Ns, iterations_, nps, suites):
if suite == 'pysparse' and np_ > 1:
runtime = None
else:
!mpirun -np $np_ python fipy_timing.py --N=$N --suite=$suite --iterations=$iterations > $tmpfile
runtime = float(open(tmpfile).read())
runtimes = runtimes.append({'N' : N, 'iterations' : iterations, 'np' : np_, 'suite' : suite, 'run time' : runtime}, ignore_index=True)
runtimes.to_csv('laptop_runtimes_nproc.csv')
In [3]:
df = pd.read_csv('laptop_runtimes_nproc.csv')
In [4]:
f = plt.figure(figsize=(20, 5))
ax1 = f.add_subplot(141)
ax2 = f.add_subplot(142)
ax3 = f.add_subplot(143)
ax4 = f.add_subplot(144)
for iterations, ax in zip((800, 400, 200, 100), (ax1, ax2, ax3, ax4)):
for N in (10, 20, 30, 40, 50, 60, 70, 80):
sub_df = df[(df['iterations'] == iterations) & (df['N'] == N) & (df['suite'] == 'trilinos')].sort('np')
nprocs = sub_df['np']
speed_up = np.array(sub_df[sub_df['np'] == 1]['run time']) / np.array(sub_df['run time'])
ax.semilogx(sub_df['np'], speed_up, label='$N={0}$'.format(N))
ax.semilogx((1, 8), (1, 8), 'k--')
ax.set_ylabel("Speed Up")
ax.set_xlabel("Parallel Processes")
ax.legend(loc="upper left")
ax.set_ylim(ymax=3)
ax.set_title("iterations={0}".format(iterations))
A few things from the graphs:
N=10
is slow because the overlap is such a high proportion of the number of cellsN=80
is slow because the global communication cost is becoming excessive.The graphs above really just confirm that there is an issue with communication. The load average when the laptop is not running Python jobs is about 0.5.