A notebook to test and demonstrate KernelSteinTest
. This implements the kernelized Stein discrepancy test of Chwialkowski et al., 2016 and Liu et al., 2016 in ICML 2016.
In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
#%config InlineBackend.figure_format = 'svg'
#%config InlineBackend.figure_format = 'pdf'
import kgof
import kgof.data as data
import kgof.density as density
import kgof.goftest as gof
import kgof.kernel as ker
import kgof.util as util
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as stats
In [2]:
# font options
font = {
#'family' : 'normal',
#'weight' : 'bold',
'size' : 18
}
plt.rc('font', **font)
plt.rc('lines', linewidth=2)
matplotlib.rcParams['pdf.fonttype'] = 42
matplotlib.rcParams['ps.fonttype'] = 42
In [ ]:
# true p
seed = 13
d =
# sample
n = 800
mean = np.zeros(d)
variance = 1.0
qmean = mean.copy()
qmean[0] = 0
qvariance = variance
p = density.IsotropicNormal(mean, variance)
ds = data.DSIsotropicNormal(qmean, qvariance)
# ds = data.DSLaplace(d=d, loc=0, scale=1.0/np.sqrt(2))
dat = ds.sample(n, seed=seed+1)
X = dat.data()
In [ ]:
# Test
alpha = 0.01
# Gaussian kernel with median heuristic
sig2 = util.meddistance(X, subsample=1000)**2
k = ker.KGauss(sig2)
# inverse multiquadric kernel
# From Gorham & Mackey 2017 (https://arxiv.org/abs/1703.01717)
# k = ker.KIMQ(b=-0.5, c=1.0)
bootstrapper = gof.bootstrapper_rademacher
kstein = gof.KernelSteinTest(p, k, bootstrapper=bootstrapper,
alpha=alpha, n_simulate=500, seed=seed+1)
In [ ]:
kstein_result = kstein.perform_test(dat, return_simulated_stats=True,
return_ustat_gram=True)
kstein_result
#kstein.compute_stat(dat)
In [ ]:
print('p-value: ', kstein_result['pvalue'])
print('reject H0: ', kstein_result['h0_rejected'])
In [ ]:
sim_stats = kstein_result['sim_stats']
plt.figure(figsize=(10, 4))
plt.hist(sim_stats, bins=20, normed=True);
plt.stem([kstein_result['test_stat']], [0.03], 'r-o', label='Stat')
plt.legend()
In [ ]:
from scipy.spatial.distance import squareform, pdist
def simulatepm(N, p_change):
'''
:param N:
:param p_change:
:return:
'''
X = np.zeros(N) - 1
change_sign = np.random.rand(N) < p_change
for i in range(N):
if change_sign[i]:
X[i] = -X[i - 1]
else:
X[i] = X[i - 1]
return X
class _GoodnessOfFitTest:
def __init__(self, grad_log_prob, scaling=1):
#scaling is the sigma^2 as in exp(-|x_y|^2/2*sigma^2)
self.scaling = scaling*2
self.grad = grad_log_prob
# construct (slow) multiple gradient handle if efficient one is not given
def grad_multiple(self, X):
#print self.grad
return np.array([(self.grad)(x) for x in X])
def kernel_matrix(self, X):
# check for stupid mistake
assert X.shape[0] > X.shape[1]
sq_dists = squareform(pdist(X, 'sqeuclidean'))
K = np.exp(-sq_dists/ self.scaling)
return K
def gradient_k_wrt_x(self, X, K, dim):
X_dim = X[:, dim]
assert X_dim.ndim == 1
differences = X_dim.reshape(len(X_dim), 1) - X_dim.reshape(1, len(X_dim))
return -2.0 / self.scaling * K * differences
def gradient_k_wrt_y(self, X, K, dim):
return -self.gradient_k_wrt_x(X, K, dim)
def second_derivative_k(self, X, K, dim):
X_dim = X[:, dim]
assert X_dim.ndim == 1
differences = X_dim.reshape(len(X_dim), 1) - X_dim.reshape(1, len(X_dim))
sq_differences = differences ** 2
return 2.0 * K * (self.scaling - 2 * sq_differences) / self.scaling ** 2
def get_statistic_multiple_dim(self, samples, dim):
num_samples = len(samples)
log_pdf_gradients = self.grad_multiple(samples)
# n x 1
log_pdf_gradients = log_pdf_gradients[:, dim]
# n x n
K = self.kernel_matrix(samples)
assert K.shape[0]==K.shape[1]
# n x n
gradient_k_x = self.gradient_k_wrt_x(samples, K, dim)
assert gradient_k_x.shape[0] == gradient_k_x.shape[1]
# n x n
gradient_k_y = self.gradient_k_wrt_y(samples, K, dim)
# n x n
second_derivative = self.second_derivative_k(samples, K, dim)
assert second_derivative.shape[0] == second_derivative.shape[1]
# use broadcasting to mimic the element wise looped call
pairwise_log_gradients = log_pdf_gradients.reshape(num_samples, 1) \
* log_pdf_gradients.reshape(1, num_samples)
A = pairwise_log_gradients * K
B = gradient_k_x * log_pdf_gradients
C = (gradient_k_y.T * log_pdf_gradients).T
D = second_derivative
V_statistic = A + B + C + D
#V_statistic = C
stat = num_samples * np.mean(V_statistic)
return V_statistic, stat
def compute_pvalues_for_processes(self, U_matrix, chane_prob, num_bootstrapped_stats=300):
N = U_matrix.shape[0]
bootsraped_stats = np.zeros(num_bootstrapped_stats)
with util.NumpySeedContext(seed=10):
for proc in range(num_bootstrapped_stats):
# W = np.sign(orsetinW[:,proc])
W = simulatepm(N, chane_prob)
WW = np.outer(W, W)
st = np.mean(U_matrix * WW)
bootsraped_stats[proc] = N * st
stat = N * np.mean(U_matrix)
return float(np.sum(bootsraped_stats > stat)) / num_bootstrapped_stats
def is_from_null(self, alpha, samples, chane_prob):
dims = samples.shape[1]
boots = 10 * int(dims / alpha)
num_samples = samples.shape[0]
U = np.zeros((num_samples, num_samples))
for dim in range(dims):
U2, _ = self.get_statistic_multiple_dim(samples, dim)
U += U2
p = self.compute_pvalues_for_processes(U, chane_prob, boots)
return p, U
In [ ]:
#sigma = np.array([[1, 0.2, 0.1], [0.2, 1, 0.4], [0.1, 0.4, 1]])
def grad_log_correleted(x):
#sigmaInv = np.linalg.inv(sigma)
#return - np.dot(sigmaInv.T + sigmaInv, x) / 2.0
return -(x-mean)/variance
#me = _GoodnessOfFitTest(grad_log_correleted)
qm = _GoodnessOfFitTest(grad_log_correleted, scaling=sig2)
#X = np.random.multivariate_normal([0, 0, 0], sigma, 200)
p_val, U = qm.is_from_null(0.05, X, 0.1)
print(p_val)
In [ ]:
plt.imshow(U, interpolation='none')
plt.colorbar()
In [ ]:
# U-statistic matrix from the new implementation
H = kstein_result['H']
plt.imshow(H, interpolation='none')
plt.colorbar()
In [ ]:
plt.imshow(U-H, interpolation='none')
plt.colorbar()
In [ ]:
x = np.random.randint(1, 5, 5)
y = np.random.randint(1, 3, 3)
In [ ]:
x
In [ ]:
y
In [ ]:
x[:, np.newaxis] - y[np.newaxis, :]
In [ ]:
In [ ]:
In [ ]: