FiPy in Parallel on the Cluster

Let's switch to running on a cluster to look at using FiPy with more parallel processes. First, let's define a script to do the FiPy run time testing. The script uses docopt to parse the parameters passed from the shell command. Again, a diffusion equation is being solved on a 3D grid.


In [1]:
%%writefile fipy_timing.py

"""
Usage: fipy_timing.py [-h] [--N N] [--iterations <iterations>] [--suite <suite>]

Options:
  -h --help                 Show this screen.
  --N=N                     Cell number is N^3 [default: 30]
  --iterations=<iterations> Number of iterations [default: 100]
  --suite=<suite>           Solver suite [default: trilinos]
  
"""

from docopt import docopt
import timeit
import numpy as np
import fipy as fp

arguments = docopt(__doc__, version='Run FiPy timing')
N = int(arguments['--N'])
iterations = int(arguments['--iterations'])
suite = arguments['--suite']

attempts = 3

setup_str = '''
import fipy as fp
import numpy as np
np.random.seed(1)
L = 1.
N = {N:d}
m = fp.GmshGrid3D(nx=N, ny=N, nz=N, dx=L / N, dy=L / N, dz=L / N)
v0 = 1.
for dim in range(3):
    x = np.linspace(0., L, N)
    fx = np.sum(np.array([np.sin(2 * x * np.pi * i * np.random.random()) / i for i in range(1, N)]), axis=0)
    v0 = np.outer(v0, fx).flatten()
v = fp.CellVariable(mesh=m)
v0 = np.resize(v0, len(v)) ## Gmsh doesn't always give us the correct sized grid!
eqn = fp.TransientTerm(1e-3) == fp.DiffusionTerm()
v[:] = v0.copy()

import fipy.solvers.{suite} as solvers
solver = solvers.linearPCGSolver.LinearPCGSolver(precon=None, iterations={iterations}, tolerance=1e-100)

eqn.solve(v, dt=1., solver=solver)
v[:] = v0.copy()
'''

timeit_str = '''
eqn.solve(v, dt=1., solver=solver)
fp.parallelComm.Barrier()
'''

timer = timeit.Timer(timeit_str, setup=setup_str.format(N=N, suite=suite, iterations=iterations))
times = timer.repeat(attempts, 1)

if fp.parallelComm.procID == 0:
    print min(times)


Overwriting fipy_timing.py

We also need a script to launch jobs on the cluster. The NSLOTS variable is generated by the queing system when the job is launched as a parallel job with

 $ qsub -pe nodal <NSLOTS> -cwd -o <output_file> launcher  <N>  <iterations> <suite>

for example.


In [2]:
%%writefile launcher

#!/bin/bash
source ~/.bashrc
mpirun -np $NSLOTS python fipy_timing.py --N=$1 --iterations=$2 --suite=$3


Writing launcher

In [3]:
!chmod +x ./launcher

The following cell launches a multidimensional set of simulations on the cluster. The time is written to a file name based on the parameter values.


In [ ]:
import itertools
 
Ns = (10, 20, 40, 80)
nps = (1, 2, 4, 8, 16, 24, 32, 48, 64)
suites = ('trilinos', 'pysparse')
iterations_ = (100, 200, 400, 800)

for N, iterations, np_, suite in itertools.product(Ns, iterations_, nps, suites):
    tmpfile = 'tmp-{0}-{1}-{2}-{3}.txt'.format(N, iterations, np_, suite)
    !rm $tmpfile
    if suite == 'pysparse' and np_ > 1:
        pass
    else:
        !qsub -pe nodal $np_ -cwd -o $tmpfile launcher $N $iterations $suite

In [ ]:
!qstat

The following cell post processes the data into a csv file, which is saved in version control.


In [ ]:
import os
import pandas as pd

def getdata():
    for f in os.listdir(os.getcwd()):
        if f.endswith('.txt') and f.startswith('tmp'): 
            try:
                run_time = float(open(f).read())
            except:
                run_time = 1000.0
            values = f.strip('.txt').strip('tmp').split('-')[1:]
            fs = (int, int, int, str)
            data = [x(y) for x, y in zip(fs, values)]
            data.append(run_time)
            yield data

df = pd.DataFrame(columns=['iterations', 'N', 'np', 'run time', 'suite'])
for N, iterations, np_, suite, run_time in getdata():
    df = df.append({'N' : N, 'iterations' : iterations, 'np' : np_, 'suite' : suite, 'run time' : run_time}, ignore_index=True)

df.to_csv('cluster.csv')

In [1]:
import pandas as pd
df = pd.read_csv('cluster.csv', index_col=0)

In [2]:
Ns = sorted(set(df['N']))
iterations_ = sorted(set(df['iterations']))

def ax_iter(Ns, iterations_):
    f = plt.figure(figsize=(6 * 3, 6 * 2))
    for i, N in enumerate(Ns):
        ax = f.add_subplot(2, 3, i + 1)
        for iterations in iterations_:
            yield ax, N, iterations   
        ax.set_title('$N={0}$'.format(N))
        ax.set_ylabel("Speed Up")
        ax.set_xlabel("Parallel Processes")
        ax.legend(loc="upper left")
        ax.set_ylim(ymax=64)
        ax.set_ylim(ymin=0.5)
        ax.set_xlim(xmin=0.9)
        ax.set_xlim(xmax=64)
        ax.loglog((1, 64), (1, 64), 'k--')
        ax.legend(loc="upper left")
    
for ax, N, iterations in ax_iter(Ns, iterations_):
    df_sub = df[(df['iterations'] == iterations) & (df['N'] == N)]
    df_tril = df_sub[df_sub['suite'] == 'trilinos'].sort('np')
    df_pysp = df_sub[df_sub['suite'] == 'pysparse'].sort('np')

    nprocs = np.array(df_tril['np'])
    run_times = np.array(df_tril['run time'])
    
    run_time_pysp = np.array(df_pysp['run time'])

    if len(run_times) > 0:
        line = ax.loglog(nprocs, run_times[0] / run_times, label='iterations={0}'.format(iterations))
    if len(run_time_pysp) > 0:
        ax.loglog((1,), (run_times[0] / run_time_pysp[0],), line[0].get_c() + 'o', label='pysparse')


The speed up graphs above correspond to different sized domains, each with $N^3$ cells. Each curve represents a different number of iterations in the solver. The dots represent the speed when switching to the PySparse solver suite (serial only). Currently results are only available up to 64 processes. The maximum number of slots that can be requested with the cluster is 64.

The results show quite good scaling for the larger problems. The two smallest grid sizes have a high proportion of overlap cells and thus too much communication overhead. PySparse is better than Trilinos for the smaller system, but is within a factor of 2 for the larger systems. In the the N=40 graph there is a considerable slow down at 24 and 32 processes. This kind of variation may be due to jobs running on machines with a shared load and communication overhead (any machine may have multiple separate parallel jobs). It is possible to request 64 slots or even a specific machine for each job, but would require long waits in the queue.

Further work should check the reproducibility of these results. Currently the best of three time steps is used for each job. A better approach may be to launch multiple instances of the same job so that jobs are launched on different machines with alternative shared loads. The data shown here represents the first set of runs that worked in terms of execution and post-processing.