In [1]:
%load_ext cython
In [32]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from multiprocessing import Pool, Value, Array
import os
import time
import numpy as np
from numba import njit
In [3]:
def mc_pi(n):
s = 0
for i in range(n):
x = np.random.uniform(-1, 1)
y = np.random.uniform(-1, 1)
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
In [4]:
%%time
res = [mc_pi(int(1e5)) for i in range(10)]
In [5]:
@njit()
def mc_pi_numba(n):
s = 0
for i in range(n):
x = np.random.uniform(-1, 1)
y = np.random.uniform(-1, 1)
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
In [6]:
%%time
res = [mc_pi_numba(int(1e7)) for i in range(10)]
In [7]:
np.array(res)
Out[7]:
Install cythongsl if necessary and restart kernel.
! pip install cythongsl
In [8]:
%%cython -lgsl
import cython
from cython_gsl cimport *
@cython.cdivision(True)
def mc_pi_cython(int n):
cdef gsl_rng_type * T
cdef gsl_rng * r
cdef double s = 0.0
cdef double x, y
cdef int i
gsl_rng_env_setup()
T = gsl_rng_default
r = gsl_rng_alloc (T)
for i in range(n):
x = 2*gsl_rng_uniform(r) - 1
y = 2*gsl_rng_uniform(r)- 1
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
In [9]:
%%time
res = [mc_pi_cython(int(1e7)) for i in range(10)]
In [10]:
np.array(res)
Out[10]:
concurrent.futures
moduleConcurrent processes are processes that will return the same results regardless of the order in which they were executed. A "future" is something that will return a result sometime in the future. The concurrent.futures
module provides an event handler, which can be fed functions to be scheduled for future execution. This provides us with a simple model for parallel execution on a multi-core machine.
While concurrent futures provide a simpler interface, it is slower and less flexible when compared with using multiprocessing
for parallel execution.
In [11]:
%%time
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])
In [12]:
np.array(list(res))
Out[12]:
In [13]:
%%time
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))])
In [14]:
%%time
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))], chunksize=100)
In [15]:
def f(a, b):
return a + b
In [16]:
def f_(args):
return f(*args)
In [17]:
xs = np.arange(24)
chunks = np.array_split(xs, xs.shape[0]//2)
In [18]:
chunks
Out[18]:
In [19]:
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(f_, chunks)
list(res)
Out[19]:
In [20]:
%%time
with ThreadPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])
In [21]:
np.array(list(res))
Out[21]:
In [22]:
%%cython -lgsl
import cython
from cython_gsl cimport *
@cython.cdivision(True)
def mc_pi_cython_nogil(int n):
cdef gsl_rng_type * T
cdef gsl_rng * r
cdef double s = 0.0
cdef double x, y
cdef int i
gsl_rng_env_setup()
T = gsl_rng_default
r = gsl_rng_alloc (T)
with cython.nogil:
for i in range(n):
x = 2*gsl_rng_uniform(r) - 1
y = 2*gsl_rng_uniform(r)- 1
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
In [23]:
%%time
with ThreadPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi_cython_nogil, [int(1e7) for i in range(10)])
In [24]:
np.array(list(res))
Out[24]:
In [25]:
%%time
with mp.Pool(processes=4) as pool:
res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])
In [26]:
%%time
with mp.Pool(processes=4) as pool:
res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))])
In [27]:
def f(i):
time.sleep(np.random.random())
print(os.getpid(), i)
In [28]:
for i in range(10):
p = mp.Process(target=f, args=(i,))
p.start()
p.join()
In [29]:
def f(a, b):
return a + b
In [30]:
xs = np.arange(24)
with Pool(processes=4) as pool:
res = pool.starmap(f, np.array_split(xs, xs.shape[0]//2))
list(res)
Out[30]:
In [31]:
def f(a, b):
return a * b
In [32]:
from functools import partial
fp = partial(f, b=2)
In [33]:
xs = np.arange(24)
with Pool(processes=4) as pool:
res = pool.map(fp, xs)
np.array(list(res))
Out[33]:
In [34]:
def f1(q, i):
time.sleep(np.random.random())
q.put((os.getpid(), i))
In [35]:
q = mp.Queue()
res = []
for i in range(10):
p = mp.Process(target=f1, args=(q,i,))
p.start()
res.append(q.get())
p.join()
res
Out[35]:
In [36]:
def f2(i):
global counter
counter = counter + 1
print(os.getpid(), i)
In [37]:
counter = 0
f2(10)
print(counter)
In [38]:
counter = 0
for i in range(10):
p = mp.Process(target=f2, args=(i,))
p.start()
p.join()
In [39]:
counter
Out[39]:
We can use shared memory to do this, but it is slow because multiprocessing has to ensure that only one process gets to use counter at any one time. Multiprocesing provides Value and Array shared memory variables, but you can also convert arbitrary Python variables into shared memory objects (less efficient).
In [40]:
def f3(i, counter, store):
counter.value += 1
store[os.getpid() % 10] += i
In [41]:
%%time
counter = mp.Value('i', 0)
store = mp.Array('i', [0]*10)
for i in range(int(1e2)):
p = mp.Process(target=f3, args=(i, counter, store))
p.start()
p.join()
print(counter.value)
print(store[:])
We should try to avoid using shared memory as much as possible in parallel jobs as they drastically reduce efficiency. One useful approach is to use the map-reduce
pattern. We should also use Pool to reuse processes rather than spawn too many of them. We will see much more of the map-reduc
approach when we work with Spark.
In [42]:
def f4(i):
return (os.getpid(), 1, i)
In [43]:
%%time
# map step
with mp.Pool(processes=10) as pool:
res = pool.map(f4, range(int(1e2)))
#reeduce step
res = np.array(res)
counter = res[:, 1].sum()
print(counter)
store = np.zeros(10)
idx = res[:, 0] % 10
for i in range(10):
store[i] = res[idx==i, 2].sum()
print(store)
In [3]:
@njit()
def mc_pi_numba(n):
s = 0
for i in range(n):
x = np.random.uniform(-1, 1)
y = np.random.uniform(-1, 1)
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
def get_pis(n1, n2):
results = [mc_pi_numba(int(n1)) for i in range(n2)]
return results
In [4]:
%%time
get_pis(1e7, 10)
Out[4]:
In [34]:
from joblib import Parallel, delayed
@njit
def mc_pi(n):
s = 0
for i in range(n):
x = np.random.uniform(-1, 1)
y = np.random.uniform(-1, 1)
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
def get_pis(n1, n2, k):
n1, n2 = int(n1), int(n2)
results = Parallel(n_jobs=k)(delayed(mc_pi)(n1) for i in range(n2))
return results
In [37]:
%%time
get_pis(1e7, 10, 1)
Out[37]:
In [38]:
%%time
get_pis(1e7, 10, 8)
Out[38]:
Writing to shared memory requires careful coordination of processes, and many control and communication concepts are implemented in the multiprocessing library for this purpose, including semaphores, locks, barriers etc. We will not cover these concepts due to their complexity, choosing instead to decouple processes (leading to embarrassingly parallel problems) by making redundant copies of resources if necessary and reducing at a later stage if necessary. Most problems in statistical data analysis can be solved using this simple approach.
In the example below, up to 4 processes may be trying to increment and assign a new value to val at the same time. Because this takes two steps (increment the RHS, assign to LHS), it can happen that two or more processes increment at the same time, but this is only assigned and counted once.
In [44]:
def count1(i):
val.value += 1
for run in range(3):
val = Value('i', 0)
with Pool(processes=4) as pool:
pool.map(count1, range(1000))
print(val.value)
It is usually easier and faster to make copies of resources for each process so that no sharing is required.
In [45]:
def count2(i):
ix = os.getpid() % 4
arr[ix] += 1
for run in range(3):
arr = Array('i', [0]*4)
with Pool(processes=4) as pool:
pool.map(count2, range(1000))
print(arr[:], np.sum(arr))
Suppose there are two processes P1 and P2 and two resources A and B. Suppose P1 has a lock
on A and will only release A after it gains B, while P2 has a lock
on B and will only release the lock after it gains A. The two processes are doomed to wait forever; this is known as a deadlock and can occur when concurrent processes compete to have exclusive access to the same shared resources. A classic model of deadlock is the Dining Philosophers Problem.
We will not show any examples since it will simply freeze the notebook.