Parallel

The executed version of this tutorial is at https://elephant.readthedocs.io/en/latest/tutorials/parallel.html

elephant.parallel module provides a simple interface to parallelize multiple calls to any user-specified function. The typical use case is calling a function many times with different parameters.

Available executors

elephant.parallel has 3 interfaces to choose from, depending whether the user has a laptop/PC or the computation is being done on a cluster machine with many nodes and MPI installed.

  • ProcessPoolExecutor is a wrapper of python built-in concurrent.futures.ProcessPoolExecutor. This is recommended to run on laptops and personal computers;
  • MPIPoolExecutor is a wrapper of mpi4py.futures.MPIPoolExecutor. This is recommened to run on cluster nodes with MPI-2 installed;
  • MPICommExecutor is a wrapper of mpi4py.futures.MPICommExecutor. This is a legacy MPI-1 class for MPIPoolExecutor and is recommended to run only on cluster nodes which do not support MPI-2 protocol.

Besides these three, a SingleProcess executor is available as a fall-back option to test executions in a single process (no speedup).

All listed above classes have the same API and can be used interchangeably.

How to use

Let's say you want to call some function my_function() for each element in a list iterables_list like so:

(eq. 1) results = [my_function(arg) for arg in iterables_list].

If my_function's implementaion does not use parallelization, you can obtain the results by computing my_function() asynchronously for each element in arguments list. Then the result of eq. 1 is equivalent to

(eq. 2) results = Executor().execute(my_function, iterables_list),

where Executor can be any of the available executors listed above. For more information about parallel executors in Python refer to https://docs.python.org/3/library/concurrent.futures.html.

Examples

Example 1. Computing the mean firing rate

mean_firing_rate() function in elephant.statistics works with a single spiketrain as input. Let's parallelize it by computing firing rates of 8 spiketrains with random spike times.


In [1]:
import numpy as np
import quantities as pq

from elephant.spike_train_generation import homogeneous_poisson_process
from elephant.statistics import mean_firing_rate, time_histogram

from elephant.parallel import SingleProcess, ProcessPoolExecutor

try:
    import mpi4py
    from elephant.parallel.mpi import MPIPoolExecutor, MPICommExecutor
except ImportError:
    raise ImportError("To run this tutorial, please install mpi4py with 'pip install mpi4py' and restart the jupyter notebook kernel")

In [2]:
rate = 10 * pq.Hz
spiketrains = [homogeneous_poisson_process(rate, t_stop=10*pq.s) for _ in range(8)]

We start with a sanity check by computing the mean firing rate of the spiketrains with SingleProcess executor, which is run in the main process with no parallelization.


In [3]:
firing_rate0 = SingleProcess().execute(mean_firing_rate, spiketrains)
firing_rate0


Out[3]:
[array(8.9) * 1/s,
 array(9.6) * 1/s,
 array(9.2) * 1/s,
 array(8.7) * 1/s,
 array(10.) * 1/s,
 array(9.4) * 1/s,
 array(7.9) * 1/s,
 array(9.4) * 1/s]

Let's verify that all three other executors produce the same result, but now with parallelization turned on.


In [4]:
firing_rate1 = ProcessPoolExecutor().execute(mean_firing_rate, spiketrains)
firing_rate1


Out[4]:
[array(8.9) * 1/s,
 array(9.6) * 1/s,
 array(9.2) * 1/s,
 array(8.7) * 1/s,
 array(10.) * 1/s,
 array(9.4) * 1/s,
 array(7.9) * 1/s,
 array(9.4) * 1/s]

In [5]:
firing_rate2 = MPIPoolExecutor().execute(mean_firing_rate, spiketrains)
firing_rate2


Out[5]:
[array(8.9) * 1/s,
 array(9.6) * 1/s,
 array(9.2) * 1/s,
 array(8.7) * 1/s,
 array(10.) * 1/s,
 array(9.4) * 1/s,
 array(7.9) * 1/s,
 array(9.4) * 1/s]

In [6]:
firing_rate3 = MPICommExecutor().execute(mean_firing_rate, spiketrains)
firing_rate3


Out[6]:
[array(8.9) * 1/s,
 array(9.6) * 1/s,
 array(9.2) * 1/s,
 array(8.7) * 1/s,
 array(10.) * 1/s,
 array(9.4) * 1/s,
 array(7.9) * 1/s,
 array(9.4) * 1/s]

All executors produce identical output, as intended.

Note about MPI executors

If you print the detailed information about either of MPI executors, you will notice in this notebook that they use only one node (core) which is equivalent to SingleProcess executor.


In [7]:
print(MPIPoolExecutor())


MPIPoolExecutor(max_workers=None, {'maxprocs': '1', 'soft': '1', 'host': 'PC0Y2Q1V', 'arch': 'x86_64', 'thread_level': 'MPI_THREAD_MULTIPLE', 'ompi_np': '1'})

This is because both MPI executors require -m mpi4py.futures flag while running python scripts:

mpiexec -n numprocs python -m mpi4py.futures pyfile [arg] ...

If you run it without mpiexec command, as in this notebook, MPI features will still be available but only with one single core (maxprocs=1). For more information of how to launch MPI processes in Python refer to https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#command-line.

Example 2. Custom functions and positional argument

Sometimes you might want to iterate over the second (or third, etc.) argument of a function. To do this, you need to create a custom function that passes its first input argument into the right position of the original function. Below is an example of how to compute time histograms of spiketrains with different binsize values (the second argument).


In [8]:
# step 1: initialize the first argument - spiketrains
spiketrains = [homogeneous_poisson_process(rate, t_stop=10*pq.s) for _ in range(8)]

# step 2: define your custom function
def my_custom_function(binsize):
    # specify all custom key-word options here
    return time_histogram(spiketrains, binsize, output='counts')

In [9]:
binsize_list = np.linspace(0.1, 1, num=8) * pq.s

time_hist = ProcessPoolExecutor().execute(my_custom_function, binsize_list)

time_hist contains 8 AnalogSignals - one AnalogSignal per binsize from binsize_list.

Benchmark

Finally, let's see if ProcessPoolExecutor brings any speedup compared to sequential processing.


In [10]:
import warnings
warnings.filterwarnings("ignore")

# initialize the iteration list
binsize_list = np.linspace(0.1, 1, 100) * pq.s

In [11]:
# sequential processing
%timeit [time_histogram(spiketrains, binsize) for binsize in binsize_list]


722 ms ± 43.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [12]:
# with parallelization
%timeit ProcessPoolExecutor(max_workers=4).execute(my_custom_function, binsize_list)


374 ms ± 4.29 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)