In [1]:
from __future__ import division
import numpy as np
import quantecon as qe
In [2]:
import warnings
numba_installed = True
try:
from numba import jit
except ImportError:
numba_installed = False
numba_warning_message = "Numba import failed. Falling back to non-optimized routine."
warnings.warn(numba_warning_message, UserWarning)
In [3]:
def mc_sample_path(P, init, sample_size):
# CDFs, one for each row of P
cdfs = np.cumsum(P, axis=-1)
# Random values, uniformly sampled from [0, 1)
u = np.random.random(size=sample_size)
# === set up array to store output === #
X = np.empty(sample_size, dtype=int)
if isinstance(init, int):
X[0] = init
else:
cdf0 = np.cumsum(init)
X[0] = cdf0.searchsorted(u[0], side='right')
# === generate the sample path === #
n = len(cdfs)
for t in range(sample_size-1):
lo = -1
hi = n - 1
while(lo < hi-1):
m = (lo + hi) // 2
if u[t+1] < cdfs[X[t], m]:
hi = m
else:
lo = m
X[t+1] = hi
return X
def mc_sample_path_numpy(P, init, sample_size):
# CDFs, one for each row of P
cdfs = np.cumsum(P, axis=-1)
# Random values, uniformly sampled from [0, 1)
u = np.random.random(size=sample_size)
# === set up array to store output === #
X = np.empty(sample_size, dtype=int)
if isinstance(init, int):
X[0] = init
else:
cdf0 = np.cumsum(init)
X[0] = cdf0.searchsorted(u[0], side='right')
# === generate the sample path === #
for t in range(sample_size-1):
X[t+1] = cdfs[X[t]].searchsorted(u[t+1], side='right')
return X
if numba_installed:
mc_sample_path = jit(mc_sample_path)
else:
mc_sample_path = mc_sample_path_numpy
In [4]:
def mc_sample_path_3(P, init, sample_size):
# CDFs, one for each row of P
cdfs = np.cumsum(P, axis=-1)
# Random values, uniformly sampled from [0, 1)
u = np.random.random(size=sample_size)
# === set up array to store output === #
X = np.empty(sample_size, dtype=int)
if isinstance(init, int):
X[0] = init
else:
cdf0 = np.cumsum(init)
X[0] = cdf0.searchsorted(u[0], side='right')
# === generate the sample path === #
if numba_installed:
@jit(nopython=True)
def mc_sample_path_jit(cdfs, u, out):
n = len(cdfs)
sample_size = len(u)
for t in range(sample_size-1):
lo = -1
hi = n - 1
while(lo < hi-1):
m = (lo + hi) // 2
if u[t+1] < cdfs[out[t], m]:
hi = m
else:
lo = m
out[t+1] = hi
mc_sample_path_jit(cdfs, u, X)
return X
# if not numba_installed
for t in range(sample_size-1):
X[t+1] = cdfs[X[t]].searchsorted(u[t+1], side='right')
return X
In [5]:
P = np.array([[0.4, 0.6], [0.2, 0.8]])
init = (0.25, 0.75)
sample_size = 10**4
for func in [qe.mc_sample_path, mc_sample_path_numpy, mc_sample_path, mc_sample_path_3]:
X = func(P, init, sample_size)
print X.sum() / sample_size # Should be close to 0.75
In [6]:
P = np.array([[0.4, 0.6], [0.2, 0.8]])
init = (0.25, 0.75)
sample_size = 10**5 * 2
for func in [qe.mc_sample_path, mc_sample_path_numpy, mc_sample_path, mc_sample_path_3]:
%timeit func(P, init, sample_size)
In [7]:
def random_probvec(k, m):
"""
Create probability vectors.
Parameters
----------
k : scalar(int)
Dimension of each probability vectors.
m : scalar(int)
Number of probability vectors.
Returns
-------
ndarray(float, ndim=2)
Array of shape (m, k) containing probability vectors as rows.
"""
x = np.empty((m, k+1))
r = np.random.rand(m, k-1)
r.sort(axis=-1)
x[:, 0], x[:, 1:k], x[:, k] = 0, r, 1
return np.diff(x, axis=-1)
def random_stochmat(k):
return random_probvec(k, k)
In [8]:
sizes = [10, 100, 1000, 3000]
rand_matrices = []
for n in sizes:
Q = random_stochmat(n)
rand_matrices.append(Q)
In [9]:
sample_size = 10**5 * 2
init = 0
for i, Q in enumerate(rand_matrices):
print 'rand_matrices[{0}] ({1} x {2})'.format(i, Q.shape[0], Q.shape[1])
for func in [qe.mc_sample_path, mc_sample_path_numpy, mc_sample_path, mc_sample_path_3]:
%timeit func(Q, init, sample_size)
In [10]:
import platform
print platform.platform()
In [11]:
import sys
print sys.version
In [12]:
print np.__version__
In [13]:
import numba
print numba.__version__
In [13]: