In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
In [ ]:
import sys
import numpy as np
import matplotlib.pyplot as plt
In [ ]:
import freqopttest.kernel as ker
import scipy.stats as stats
def me_kernel(X, locs, gamma):
"""Compute a X.shape[0] x locs.shape[0] Gaussian kernel matrix where
the width gamma is only applied to the data X."""
n, d = X.shape
X = X/gamma
D2 = np.sum(X**2, 1)[:, np.newaxis] - 2*X.dot(locs.T) + np.sum(locs**2, 1)
K = np.exp(-D2)
return K
def draw_freqs(J, d, seed=3):
old = np.random.get_state()
np.random.seed(seed)
freqs = np.random.randn(J, d)
np.random.set_state(old)
return freqs
def t2_stat(X, Y, locs, gamma):
"""
locs: J x d
"""
n = X.shape[0]
g = me_kernel(X, locs, gamma)
h = me_kernel(Y, locs, gamma)
Z = g-h
Sig = np.cov(Z.T)
W = np.mean(Z, 0)
# test statistic
s = np.linalg.solve(Sig, W).dot(n*W)
return s
In [ ]:
# locations to test
# J = #locations
J = 3
#cen = (np.mean(X, 0) + np.mean(Y, 0))/2.0
locs = draw_freqs(J, 2)
# two Gaussian datasets
n = 70
d = 2
seed = 1
np.random.seed(seed)
X = np.random.randn(n, d)
Y = np.random.randn(n, d) + [0.5, 0]
#Y = np.random.randn(n, d)
#Y = np.random.randn(n, d)*1.2
In [ ]:
def me_test_plot(X, Y, locs, gamma):
J = locs.shape[0]
# plot the data
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(X[:, 0], X[:, 1], 'xb', label='X')
plt.plot(Y[:, 0], Y[:, 1], 'xr', label='Y')
plt.plot(locs[:, 0], locs[:, 1], '*k', markersize=13, label='Test locs')
plt.legend()
# compute the test statistic
#gamma = 1
s = t2_stat(X, Y, locs, gamma)
print('test stat s: %.4g' % s)
# compute the p-value under Chi2 with J degrees of freedom
p_value = stats.chi2.sf(s, J)
domain = np.linspace(stats.chi2.ppf(0.001, J), stats.chi2.ppf(0.9999, J), 200)
plt.subplot(1, 2, 2)
plt.plot(domain, stats.chi2.pdf(domain, J), label='$\chi^2$ (df=%d)'%J)
plt.stem([s], [stats.chi2.pdf(J, J)/2], 'or-', label='test stat')
plt.legend(loc='best', frameon=True)
plt.title('p-value: %.3g. test stat: %.3g'%(p_value, s))
plt.show()
In [ ]:
from mpl_toolkits.mplot3d import axes3d
from matplotlib import cm
def best_loc2_testpower(X, Y, gamma, loc1):
"""Show a heatmap of Lambda(T) on many locations of the test points.
J=2 (two locations). Assume loc1 is given. Vary loc2 (2d). """
# For simplicity, we will assume that J=2 (two frequencies)
# and that one (loc1) is fixed. We will optimize loc2 (2-dimensional).
XY = np.vstack((X,Y))
max1, max2 = np.max(XY, 0)
min1, min2 = np.min(XY, 0)
#sd1, sd2 = np.std(XY, 0)
sd1, sd2 = (0, 0)
# form a frequency grid to try
nd1 = 30
nd2 = 30
loc1_cands = np.linspace(min1-sd1/2, max1+sd1/2, nd1)
loc2_cands = np.linspace(min2-sd2/2, max2+sd2/2, nd2)
lloc1, lloc2 = np.meshgrid(loc1_cands, loc2_cands)
# nd2 x nd1 x 2
loc3d = np.dstack((lloc1, lloc2))
# #candidates x 2
all_loc2s = np.reshape(loc3d, (-1, 2) )
# all_locs = #candidates x J x 2
all_locs = np.array( [np.vstack((c, loc1)) for c in all_loc2s] )
# evaluate Lambda(T) on each candidate T on the grid. Size = (#candidates, )
stat_grid = np.array([t2_stat(X, Y, T, gamma) for T in all_locs])
stat_grid = np.reshape(stat_grid, (nd2, nd1) )
#ax = fig.gca(projection='3d')
#ax.plot_surface(lloc1, lloc2, stat_grid, rstride=8, cstride=8, alpha=0.3)
#cset = ax.contourf(lloc1, lloc2, stat_grid, zdir='z', offset=0, cmap=cm.coolwarm)
plt.figure(figsize=(6, 4))
plt.contourf(lloc1, lloc2, stat_grid)
plt.colorbar()
max_stat = np.max(stat_grid)
plt.xlabel('loc2 x')
plt.ylabel('loc2 y')
plt.title('fixed loc1=t1. $\lambda(t1, t2)$ for all t2.')
#ax.view_init(elev=max_stat*2, azim=90)
plt.show()
In [ ]:
from __future__ import print_function
from ipywidgets import interact, interactive, fixed
from IPython.display import display
import ipywidgets as widgets
# interactively select test locations
def me_test_plot_interact(X, Y, loc1x=0, loc1y=0, loc2x=1, loc2y=1, gamma=1):
locs = np.array([[loc1x, loc1y], [loc2x, loc2y]])
me_test_plot(X, Y, locs, gamma)
loc1 = np.array([loc1x, loc1y])
best_loc2_testpower(X, Y, gamma, loc1)
loc1_bnd = (-6, 6, 0.1)
loc2_bnd = (-6, 6, 0.1)
vs = interactive(me_test_plot_interact, X=fixed(X), Y=fixed(Y), loc1x=loc1_bnd,
loc1y=loc1_bnd, loc2x=loc2_bnd, loc2y=loc2_bnd,
gamma=(0.5, 20, 0.1));
display(vs)
In [ ]:
import theano
import theano.tensor as tensor
import theano.tensor.nlinalg as nlinalg
import theano.tensor.slinalg as slinalg
def akgauss_theano(X, T, gamma):
"""Asymmetric kernel for the two sample test. Theano version.
:param gamma: width of the Gaussian kernel.
:return kernel matrix X.shape[0] x T.shape[0]
"""
n, d = X.shape
X = X/gamma
D2 = (X**2).sum(1).reshape((-1, 1)) - 2*X.dot(T.T) + tensor.sum(T**2, 1).reshape((1, -1))
K = tensor.exp(-D2)
return K
seed = 4
J = 2
#def opt_me_lambda_kgauss(X, Y, J, seed):
"""
Optimize the empirical version of Lambda(T) i.e., the criterion used
to optimize the test locations, for the test based
on difference of mean embeddings with Gaussian kernel.
Also optimize the Gaussian width.
:param J: the number of test locations
:return a theano function T |-> Lambda(T)
"""
st0 = np.random.get_state()
np.random.seed(seed)
n, d = X.shape
# initialize Theano variables
T = theano.shared(np.random.randn(J, d), name='T')
#T = tensor.dmatrix(name='T')
Xth = tensor.dmatrix('X')
Yth = tensor.dmatrix('Y')
it = theano.shared(1, name='iter')
# Gaussian width
gamma_init = 0.5
gammath = theano.shared(gamma_init, name='gamma')
g = akgauss_theano(Xth, T, gammath)
h = akgauss_theano(Yth, T, gammath)
# Z: n x J
Z = g-h
W = Z.sum(axis=0)/n
# covariance
Z0 = Z - W
Sig = Z0.T.dot(Z0)/n
# gradient computation does not support solve()
#s = slinalg.solve(Sig, W).dot(n*W)
s = nlinalg.matrix_inverse(Sig).dot(W).dot(W)*n
gra_T, gra_gamma = tensor.grad(s, [T, gammath])
func = theano.function(inputs=[Xth, Yth], outputs=s,
updates=[(T, T+0.1*gra_T/it**0.5), (gammath, gammath+0.05*gra_gamma/it**0.5), (it, it+1) ] )
#updates=[(T, T+0.1*gra_T), (it, it+1) ] )
np.random.set_state(st0)
In [ ]:
# run gradient ascent
max_iter = 400
S = np.zeros(max_iter)
Ts = np.zeros((max_iter, J, d))
gams = np.zeros(max_iter)
for t in range(max_iter):
# stochastic gradient descent
ind = np.random.choice(n, min(n, n), replace=False)
S[t] = func(X[ind, :], Y[ind, :])
Ts[t] = T.get_value()
gams[t] = gammath.get_value()
In [ ]:
plt.plot(S)
plt.xlabel('gradient ascent iteration')
plt.ylabel('$\lambda(T)$')
besti = np.argmax(S)
print('highest objective: %.4g'%S[besti])
print('best Locations: ')
print(Ts[besti])
In [ ]:
# plot Gaussian widths
plt.plot(gams)
plt.xlabel('iteration')
plt.ylabel('Gaussian width')
In [ ]: