In [1]:
import numba as nb
import numpy as np
# 普通的 for
def add1(x, c):
rs = [0.] * len(x)
for i, xx in enumerate(x):
rs[i] = xx + c
return rs
# list comprehension
def add2(x, c):
return [xx + c for xx in x]
# 使用 jit 加速后的 for
@nb.jit(nopython=True)
def add_with_jit(x, c):
rs = [0.] * len(x)
for i, xx in enumerate(x):
rs[i] = xx + c
return rs
# jit 的错误使用姿势
@nb.jit(nopython=True)
def wrong_add(x, c):
rs = [0] * len(x)
for i, xx in enumerate(x):
rs[i] = xx + c
return rs
y = np.random.random(10**5).astype(np.float32)
x = y.tolist()
assert np.allclose(add1(x, 1), add2(x, 1), add_with_jit(x, 1))
%timeit add1(x, 1)
%timeit add2(x, 1)
%timeit add_with_jit(x, 1)
print(np.allclose(wrong_add(x, 1), 1))
numba
不支持 list comprehension,详情可参见这里jit
会在某种程度上“预编译”你的代码,这意味着它会在某种程度上固定住各个变量的数据类型;所以在jit
下定义数组时,如果想要使用的是float
数组的话,就不能像上述wrong_add
里那样用[0] * len(x)
定义、而应该在0
后面加一个小数点:[0.] * len(x)
jit
能够加速的不限于for
,但一般而言加速for
会比较常见、效果也比较显著。我在我实现的numpy
版本的卷积神经网络(CNN
)中用了jit
后、可以把代码加速 60 多倍。具体代码可以参见这里,不过如果不想看源代码的话,可以参见CNN(zh-cn).ipynb,我在其中做了一些相应的、比较简单的实验
In [2]:
assert np.allclose(y + 1, add_with_jit(x, 1))
%timeit add_with_jit(x, 1)
%timeit y + 1
In [3]:
@nb.vectorize(nopython=True)
def add_with_vec(yy, c):
return yy + c
assert np.allclose(y + 1, add_with_vec(y, 1), add_with_vec(y, 1.))
%timeit add_with_vec(y, 1)
%timeit add_with_vec(y, 1.)
%timeit y + 1
%timeit y + 1.
In [4]:
@nb.vectorize("float32(float32, float32)", target="parallel", nopython=True)
def add_with_vec(y, c):
return y + c
assert np.allclose(y+1, add_with_vec(y,1.))
%timeit add_with_vec(y, 1.)
%timeit y + 1
parallel
后更慢了,但如果使用 Intel Distribution for Python 的话,会发现parallel
版本甚至会比numpy
原生的版本要稍快一些
In [5]:
@nb.vectorize("float32(float32, float32, float32)", target="parallel", nopython=True)
def clip_with_parallel(y, a, b):
if y < a:
return a
if y > b:
return b
return y
@nb.vectorize("float32(float32, float32, float32)", nopython=True)
def clip(y, a, b):
if y < a:
return a
if y > b:
return b
return y
assert np.allclose(np.clip(y, 0.1, 0.9), clip(y, 0.1, 0.9), clip_with_parallel(y, 0.1, 0.9))
%timeit clip_with_parallel(y, 0.1, 0.9)
%timeit clip(y, 0.1, 0.9)
%timeit np.clip(y, 0.1, 0.9)
总之,使用parallel
时不能一概而论,还是要做些实验。
需要指出的是,vectorize
中的参数target
一共有三种取值:cpu
(默认)、parallel
和cuda
。关于选择哪个取值,官方文档上有很好的说明:
A general guideline is to choose different targets for different data sizes and algorithms. The “cpu” target works well for small data sizes (approx. less than 1KB) and low compute intensity algorithms. It has the least amount of overhead. The “parallel” target works well for medium data sizes (approx. less than 1MB). Threading adds a small delay. The “cuda” target works well for big data sizes (approx. greater than 1MB) and high compute intensity algorithms. Transfering memory to and from the GPU adds significant overhead.
In [6]:
import math
from concurrent.futures import ThreadPoolExecutor
def np_func(a, b):
return 1 / (a + np.exp(-b))
@nb.jit(nopython=True, nogil=False)
def kernel1(result, a, b):
for i in range(len(result)):
result[i] = 1 / (a[i] + math.exp(-b[i]))
@nb.jit(nopython=True, nogil=True)
def kernel2(result, a, b):
for i in range(len(result)):
result[i] = 1 / (a[i] + math.exp(-b[i]))
def make_single_task(kernel):
def func(length, *args):
result = np.empty(length, dtype=np.float32)
kernel(result, *args)
return result
return func
def make_multi_task(kernel, n_thread):
def func(length, *args):
result = np.empty(length, dtype=np.float32)
args = (result,) + args
chunk_size = (length + n_thread - 1) // n_thread
chunks = [[arg[i*chunk_size:(i+1)*chunk_size] for i in range(n_thread)] for arg in args]
with ThreadPoolExecutor(max_workers=n_thread) as e:
for _ in e.map(kernel, *chunks):
pass
return result
return func
length = 10 ** 6
a = np.random.rand(length).astype(np.float32)
b = np.random.rand(length).astype(np.float32)
nb_func1 = make_single_task(kernel1)
nb_func2 = make_multi_task(kernel1, 4)
nb_func3 = make_single_task(kernel2)
nb_func4 = make_multi_task(kernel2, 4)
rs_np = np_func(a, b)
rs_nb1 = nb_func1(length, a, b)
rs_nb2 = nb_func2(length, a, b)
rs_nb3 = nb_func3(length, a, b)
rs_nb4 = nb_func4(length, a, b)
assert np.allclose(rs_np, rs_nb1, rs_nb2, rs_nb3, rs_nb4)
%timeit np_func(a, b)
%timeit nb_func1(length, a, b)
%timeit nb_func2(length, a, b)
%timeit nb_func3(length, a, b)
%timeit nb_func4(length, a, b)