In [1]:
%load_ext cython
import numpy as np

(We'll be using openmp a bit further down, so we have to switch to the GCC compiler. On Mac OS X, clang doesn't work with openmp)


In [2]:
import os
os.environ['CC'] = 'gcc-6'

Using concurrent.futures

"Normal" Python code, nothing to do with Cython


In [4]:
# Needs Python 3, or "pip install futures" for Python 2
from concurrent.futures import ThreadPoolExecutor

def run(f, x, num_threads=4):
    """ 
        f: a function with an array parameter
        x: an array
        
        x will be split into pieces and each piece passed to f
        in a separate thread.
    """
    with ThreadPoolExecutor(max_workers=num_threads) as pool:
        # These are views not copies!
        sections = np.array_split(x, num_threads)
        # Submit a job for each section of array
        jobs = [pool.submit(f, s) for s in sections]
    # Wait for each job to finish, grab its result
    return sum(j.result() for j in jobs)

Our work functions


In [5]:
%%cython -f
# cython: boundscheck = False
from libc.math cimport log

def f1(double[:] x):
    cdef int i, n = x.shape[0]
    cdef double result = 0
    for i in range(n):
        if x[i] > 0.5:
            result += log(x[i])
        else:
            result += 1.0
    return result

def f2(double[:] x):
    cdef int i, n = x.shape[0]
    cdef double result = 0
    with nogil:                    # <-- This is the important bit
        for i in range(n):
            if x[i] > 0.5:
                result += log(x[i])
            else:
                result += 1.0
    return result

In [6]:
data = np.random.rand(50000000)
print('First three elements:',data[:3])


First three elements: [ 0.18013086  0.78173535  0.2938238 ]

In [7]:
print('f1 direct', f1(data))
print('f2 direct', f2(data))
print()
print('f1 via run()', run(f1, data))
print('f2 via run()', run(f2, data))


f1 direct 17329423.428916324
f2 direct 17329423.428916324

f1 via run() 17329423.42891491
f2 via run() 17329423.42891491

Timings


In [8]:
print('Direct calls')
%timeit f1(data)
%timeit f2(data)
print()
print('Using threads')
%timeit run(f1, data)
%timeit run(f2, data)


Direct calls
1 loop, best of 3: 483 ms per loop
1 loop, best of 3: 478 ms per loop

Using threads
1 loop, best of 3: 487 ms per loop
10 loops, best of 3: 118 ms per loop

Compare prange() and ThreadPoolExecutor?

Remember the prange() version from a previous video?


In [9]:
%%cython -f
# distutils: extra_compile_args = -fopenmp
# distutils: extra_link_args = -fopenmp
# cython: boundscheck = False
from libc.math cimport log
from cython.parallel cimport prange
        
def f_openmp(double[:] x):
    cdef int i, n = x.shape[0]
    cdef double tmp, result = 0
    for i in prange(n, nogil=True):
        if x[i] > 0.5:
            tmp = log(x[i])
        else:
            tmp = 1.0
        result += tmp
    return result

Safety first! (check they return the same result)


In [10]:
print(f2(data))
print(run(f2, data))
print(f_openmp(data))


17329423.428916324
17329423.42891491
17329423.428915005

Showdown!


In [11]:
def pr(s):
    print()
    print(s)
    print('='*len(s))
    
pr('Single threaded')
%timeit f1(data)
pr('Threaded: using concurrent.futures')
%timeit run(f2, data)
pr('Threaded: using openmp')
%timeit f_openmp(data)


Single threaded
===============
1 loop, best of 3: 491 ms per loop

Threaded: using concurrent.futures
==================================
10 loops, best of 3: 118 ms per loop

Threaded: using openmp
======================
10 loops, best of 3: 99.6 ms per loop

OpenMP wins! But...


In [12]:
pr('Threaded: using concurrent.futures with 8 threads')
%timeit run(f2, data, num_threads=8)


Threaded: using concurrent.futures with 8 threads
=================================================
10 loops, best of 3: 86.5 ms per loop

Increase threads for OpenMP?


In [13]:
%%cython -f
# distutils: extra_compile_args = -fopenmp
# distutils: extra_link_args = -fopenmp
# cython: boundscheck = False
from libc.math cimport log
from cython.parallel cimport prange
        
def f_openmp(double[:] x):
    cdef int i, n = x.shape[0]
    cdef double tmp, result = 0
    for i in prange(n, nogil=True, num_threads=8):
        if x[i] > 0.5:
            tmp = log(x[i])
        else:
            tmp = 1.0
        result += tmp
    return result

In [14]:
pr('Threaded: using concurrent.futures with 8 threads')
%timeit run(f2, data, num_threads=8)
pr('Threaded: using openmp with 8 threads')
%timeit f_openmp(data)


Threaded: using concurrent.futures with 8 threads
=================================================
10 loops, best of 3: 86.2 ms per loop

Threaded: using openmp with 8 threads
=====================================
10 loops, best of 3: 101 ms per loop

In [ ]: