Python was not designed to be very good at parallel processing. There are two major problems at the core of the language that make it hard to implement parallel algorithms.
The first of these issues is the most famous obstacle towards a convincing multi-threading approach, where a single instance of the Python interpreter runs in several threads. The second point is more subtle, but makes it harder to do multi-processing, where several independent instances of the Python interpreter work together to achieve parallelism. We will first explain an elegant way to work around the Global Interpreter Lock, or GIL: use Cython.
The GIL means that the Python interpreter will only operate on one thread at a time. Even when we think we run in a gazillion threads, Python itself uses only one. Multi-threading in Python is only usefull to wait for I/O and to perform system calls. To do useful CPU intensive work in multi-threaded mode, we need to develop functions that are implemented in C, and tell Python when we call these functions not to worry about the GIL. The easiest way to achieve this, is to use Cython. We develop a number-crunching prime adder, and have it run in parallel threads.
We'll load the multiprocessing, threading and queue modules to do our plumbing, and the cython extension so we can do the number crunching, as is shown in this blog post.
In [1]:
%load_ext cython
import multiprocessing
import threading
import queue
We define a function that computes the sum of all primes below a certain integer n, and don't try to be smart about it; the point is that it needs a lot of computation. These functions are designated nogil, so that we can be certain no Python objects are accessed. Finally we create a single Python exposed function that uses the:
with nogil:
...
statement. This is a context-manager that lifts the GIL for the duration of its contents.
In [2]:
%%cython
from libc.math cimport ceil, sqrt
cdef inline int _is_prime(int n) nogil:
"""return a boolean, is the input integer a prime?"""
if n == 2:
return True
cdef int max_i = <int>ceil(sqrt(n))
cdef int i = 2
while i <= max_i:
if n % i == 0:
return False
i += 1
return True
cdef unsigned long _sum_primes(int n) nogil:
"""return sum of all primes less than n """
cdef unsigned long i = 0
cdef int x
for x in range(2, n):
if _is_prime(x):
i += x
return i
def sum_primes(int n):
with nogil:
result = _sum_primes(n)
return result
In fact, we only loaded the multiprocessing module to get the number of CPUs on this machine. We also get a decent amount of work to do in the input_range.
In [3]:
input_range = range(int(1e6), int(2e6), int(5e4))
ncpus = multiprocessing.cpu_count()
print("We have {} cores to work on!".format(ncpus))
In [4]:
%%time
for i in input_range:
print(sum_primes(i), end=' ', flush=True)
print()
In [5]:
%%time
### We need to define a worker function that fetches jobs from the queue.
def worker(q):
while True:
try:
x = q.get(block=False)
print(sum_primes(x), end=' ', flush=True)
except queue.Empty:
break
### Create the queue, and fill it with input values
work_queue = queue.Queue()
for i in input_range:
work_queue.put(i)
### Start a number of threads
threads = [
threading.Thread(target=worker, args=(work_queue,))
for i in range(ncpus)]
for t in threads:
t.start()
### Wait until all of them are done
for t in threads:
t.join()
print()
On my laptop, a dual-core hyper-threaded Intel(R) Core(TM) i5-5300U CPU, this runs just over two times faster than the single threaded code. However, setting up a queue and a pool of workers is quite cumbersome. Also, this approach doesn't scale up if the dependencies between our computations get more complex. Next we'll use Noodles to provide the multi-threaded environment to execute our work. We'll need three functions:
schedule to decorate our work functionrun_parallel to run the work in parallelgather to collect our work into a workflow
In [6]:
from noodles import (schedule, run_parallel, gather)
In [7]:
%%time
@schedule
def s_sum_primes(n):
result = sum_primes(n)
print(result, end=' ', flush=True)
return result
p_prime_sums = gather(*(s_sum_primes(i) for i in input_range))
prime_sums = run_parallel(p_prime_sums, n_threads=ncpus)
print()
That does look much nicer! We have much less code, the code we do have is clearly separating function and form, and this approach is easily expandable to more complex situations.