How do you time step L parallel two state Markov processes most efficiently? Here we compare three approaches:
In [1]:
import numpy as np
%load_ext cython
In [2]:
# simple looping timestepping of the dynamics
def step_alternative(env, a, b, rand):
L = len(env)
for i in range(L):
if env[i] and (rand[i] > 1-b[i]):
env[i] = False
# crucially need else if here
# this before caused buggy behavior
# environment could otherwise be set to false before and then rechanged
elif (not env[i]) and (rand[i] < a[i]):
env[i] = True
return env
In [3]:
# simple looping timestepping of the dynamics
def step(x, a, b, rand):
out = np.empty(x.shape, dtype=bool)
for i in range(x.shape[0]):
if x[i]:
out[i] = rand[i] < 1-b[i]
else:
out[i] = rand[i] < a[i]
return out
In [4]:
%%cython
# cython accelerated simple looping timestepping of the dynamics
import cython
import numpy as np
cimport numpy as np
from libcpp cimport bool
# numpy boolean array can not be used in cython, therefore need to cast to uint8
@cython.boundscheck(False)
@cython.wraparound(False)
def step_cython(np.ndarray[np.uint8_t, cast=True] x,
np.ndarray[np.double_t] a,
np.ndarray[np.double_t] b,
np.ndarray[np.double_t] rand):
cdef np.ndarray[np.uint8_t, cast=True] out = np.empty(x.shape[0], dtype=np.bool)
cdef Py_ssize_t i
for i in range(x.shape[0]):
if x[i]:
out[i] = rand[i] < 1.0-b[i]
else:
out[i] = rand[i] < a[i]
return out
In [5]:
# vectorized timestepping of the dynamics using bitwise operators
def step_vectorized(env, a, b, rand):
return (env & (rand < 1-b)) | (~env & (rand < a))
In [6]:
x = np.ones(3, dtype=bool)
rand = np.asarray([0.1, 0.5, 0.9])
a = 1.0*np.ones(3)
b = 0.2*np.ones(3)
print step_cython(x, a, b, rand)
print step_vectorized(x, a, b, rand)
print step(x, a, b, rand)
In [7]:
L = 3
# parameters: switching rates a, b
b = 0.2 * np.ones(L)
a = 1.0 * np.ones(L)
rand = np.random.rand(L)
# ensure that all give the same result
np.testing.assert_array_equal(step(np.ones(L, dtype = bool), a, b, rand),
step_vectorized(np.ones(L, dtype = bool), a, b, rand))
np.testing.assert_array_equal(step(np.ones(L, dtype = bool), a, b, rand),
step_cython(np.ones(L, dtype = bool), a, b, rand))
np.testing.assert_array_equal(step(np.zeros(L, dtype = bool), a, b, rand),
step_vectorized(np.zeros(L, dtype = bool), a, b, rand))
np.testing.assert_array_equal(step(np.zeros(L, dtype = bool), a, b, rand),
step_cython(np.zeros(L, dtype = bool), a, b, rand))
All three function produce equivalent results.
In [8]:
for L in [1, 10, 100, 1000, 10000]:
print L
# parameters: switching rates a, b
b = 0.05 * np.ones(L)
a = 0.05 * np.ones(L)
rand = np.random.rand(L)
%timeit step(np.zeros(L, dtype = bool), a, b, rand)
%timeit step_cython(np.zeros(L, dtype = bool), a, b, rand)
%timeit step_vectorized(np.zeros(L, dtype = bool), a, b, rand)
The cython accelerated code is fastest outperforming the vectorized code especially for small arrays.
In [9]:
%%cython
# cython accelerated simple looping timestepping of the dynamics
import cython
import numpy as np
cimport numpy as np
from libcpp cimport bool
# numpy boolean array can not be used in cython, therefore need to cast to uint8
@cython.boundscheck(False)
@cython.wraparound(False)
def step_cython_single(np.ndarray[np.uint8_t, cast=True] x,
double a,
double b,
np.ndarray[np.double_t] rand):
cdef np.ndarray[np.uint8_t, cast=True] out = np.empty(x.shape[0], dtype=np.bool)
cdef Py_ssize_t i
for i in range(x.shape[0]):
if x[i]:
out[i] = rand[i] < 1.0-b
else:
out[i] = rand[i] < a
return out
In [10]:
for L in [1, 10, 100, 1000, 10000]:
print L
# parameters: switching rates a, b
b = 0.05
a = 0.05
rand = np.random.rand(L)
%timeit step_cython_single(np.zeros(L, dtype = bool), a, b, rand)
%timeit step_vectorized(np.zeros(L, dtype = bool), a, b, rand)