In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

In [ ]:
import sys
import numpy as np
import matplotlib.pyplot as plt

Two-sample test with mean embeddings


In [ ]:
import freqopttest.kernel as ker
import scipy.stats as stats

def me_kernel(X, locs, gamma):
    """Compute a X.shape[0] x locs.shape[0] Gaussian kernel matrix where
    the width gamma is only applied to the data X."""
    n, d = X.shape
    X = X/gamma
    D2 = np.sum(X**2, 1)[:, np.newaxis] - 2*X.dot(locs.T) + np.sum(locs**2, 1)
    K = np.exp(-D2)
    return K

def draw_freqs(J, d, seed=3):
    old = np.random.get_state()
    np.random.seed(seed)
    freqs = np.random.randn(J, d)
    np.random.set_state(old)
    return freqs

def t2_stat(X, Y, locs, gamma):
    """
    locs: J x d
    """
    n = X.shape[0]
    g = me_kernel(X, locs, gamma)
    h = me_kernel(Y, locs, gamma)
    Z = g-h
    Sig = np.cov(Z.T)
    W = np.mean(Z, 0)
    # test statistic
    s = np.linalg.solve(Sig, W).dot(n*W)
    return s

In [ ]:
# locations to test
# J = #locations
J = 3
#cen = (np.mean(X, 0) + np.mean(Y, 0))/2.0
locs = draw_freqs(J, 2)


# two Gaussian datasets
n = 70
d = 2
seed = 1
np.random.seed(seed)
X = np.random.randn(n, d) 
Y = np.random.randn(n, d) + [0.5, 0]
#Y = np.random.randn(n, d) 
#Y = np.random.randn(n, d)*1.2

In [ ]:
def me_test_plot(X, Y, locs, gamma):
    J = locs.shape[0]
    # plot the data
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(X[:, 0], X[:, 1], 'xb', label='X')
    plt.plot(Y[:, 0], Y[:, 1], 'xr', label='Y')
    plt.plot(locs[:, 0], locs[:, 1], '*k', markersize=13, label='Test locs')
    plt.legend()
    
    # compute the test statistic
    #gamma = 1
    s = t2_stat(X, Y, locs, gamma)
    print('test stat s: %.4g' % s)

    # compute the p-value under Chi2 with J degrees of freedom
    p_value = stats.chi2.sf(s, J)
    domain = np.linspace(stats.chi2.ppf(0.001, J), stats.chi2.ppf(0.9999, J), 200)
    plt.subplot(1, 2, 2)
    plt.plot(domain, stats.chi2.pdf(domain, J), label='$\chi^2$ (df=%d)'%J)
    plt.stem([s], [stats.chi2.pdf(J, J)/2], 'or-', label='test stat')
    plt.legend(loc='best', frameon=True)
    plt.title('p-value: %.3g. test stat: %.3g'%(p_value, s))
    plt.show()

In [ ]:
from mpl_toolkits.mplot3d import axes3d
from matplotlib import cm

def best_loc2_testpower(X, Y, gamma, loc1):
    """Show a heatmap of Lambda(T) on many locations of the test points. 
    J=2 (two locations). Assume loc1 is given. Vary loc2 (2d). """
    
    # For simplicity, we will assume that J=2 (two frequencies) 
    # and that one (loc1) is fixed. We will optimize loc2 (2-dimensional).
    XY = np.vstack((X,Y))
    max1, max2 = np.max(XY, 0)
    min1, min2 = np.min(XY, 0)
    #sd1, sd2 = np.std(XY, 0)
    sd1, sd2 = (0, 0)
    # form a frequency grid to try 
    nd1 = 30
    nd2 = 30
    loc1_cands = np.linspace(min1-sd1/2, max1+sd1/2, nd1)
    loc2_cands = np.linspace(min2-sd2/2, max2+sd2/2, nd2)
    lloc1, lloc2 = np.meshgrid(loc1_cands, loc2_cands)
    # nd2 x nd1 x 2
    loc3d = np.dstack((lloc1, lloc2))
    # #candidates x 2
    all_loc2s = np.reshape(loc3d, (-1, 2) )
    
    # all_locs = #candidates x J x 2
    all_locs = np.array( [np.vstack((c, loc1)) for c in all_loc2s] )

    # evaluate Lambda(T) on each candidate T on the grid. Size = (#candidates, )
    stat_grid = np.array([t2_stat(X, Y, T, gamma) for T in all_locs])
    stat_grid = np.reshape(stat_grid, (nd2, nd1) )
    
    #ax = fig.gca(projection='3d')
    #ax.plot_surface(lloc1, lloc2, stat_grid, rstride=8, cstride=8, alpha=0.3)
    #cset = ax.contourf(lloc1, lloc2, stat_grid, zdir='z', offset=0, cmap=cm.coolwarm)
    plt.figure(figsize=(6, 4))
    plt.contourf(lloc1, lloc2, stat_grid)
    plt.colorbar()

    max_stat = np.max(stat_grid)
    plt.xlabel('loc2 x')
    plt.ylabel('loc2 y')
    plt.title('fixed loc1=t1. $\lambda(t1, t2)$ for all t2.')
    #ax.view_init(elev=max_stat*2, azim=90)

    plt.show()

In [ ]:
from __future__ import print_function
from ipywidgets import interact, interactive, fixed
from IPython.display import display
import ipywidgets as widgets

# interactively select test locations
def me_test_plot_interact(X, Y, loc1x=0, loc1y=0, loc2x=1, loc2y=1, gamma=1):
    locs = np.array([[loc1x, loc1y], [loc2x, loc2y]])
    me_test_plot(X, Y, locs, gamma)
    loc1 = np.array([loc1x, loc1y])
    best_loc2_testpower(X, Y, gamma, loc1)

loc1_bnd = (-6, 6, 0.1)
loc2_bnd = (-6, 6, 0.1)
vs = interactive(me_test_plot_interact, X=fixed(X), Y=fixed(Y), loc1x=loc1_bnd, 
        loc1y=loc1_bnd, loc2x=loc2_bnd, loc2y=loc2_bnd, 
        gamma=(0.5, 20, 0.1));
display(vs)

Optimizing test locations


In [ ]:
import theano
import theano.tensor as tensor
import theano.tensor.nlinalg as nlinalg
import theano.tensor.slinalg as slinalg

def akgauss_theano(X, T, gamma):
    """Asymmetric kernel for the two sample test. Theano version.
    :param gamma: width of the Gaussian kernel. 
    :return kernel matrix X.shape[0] x T.shape[0]
    """
    n, d = X.shape
    X = X/gamma

    D2 = (X**2).sum(1).reshape((-1, 1)) - 2*X.dot(T.T) + tensor.sum(T**2, 1).reshape((1, -1))
    K = tensor.exp(-D2)
    return K

seed = 4
J = 2
#def opt_me_lambda_kgauss(X, Y, J, seed):
"""
Optimize the empirical version of Lambda(T) i.e., the criterion used 
to optimize the test locations, for the test based 
on difference of mean embeddings with Gaussian kernel. 
Also optimize the Gaussian width.

:param J: the number of test locations
:return a theano function T |-> Lambda(T)
"""
st0 = np.random.get_state()
np.random.seed(seed)

n, d = X.shape
# initialize Theano variables
T = theano.shared(np.random.randn(J, d), name='T')
#T = tensor.dmatrix(name='T')
Xth = tensor.dmatrix('X')
Yth = tensor.dmatrix('Y')
it = theano.shared(1, name='iter')
# Gaussian width
gamma_init = 0.5
gammath = theano.shared(gamma_init, name='gamma')

g = akgauss_theano(Xth, T, gammath)
h = akgauss_theano(Yth, T, gammath)
# Z: n x J
Z = g-h
W = Z.sum(axis=0)/n
# covariance 
Z0 = Z - W
Sig = Z0.T.dot(Z0)/n

# gradient computation does not support solve()
#s = slinalg.solve(Sig, W).dot(n*W)
s = nlinalg.matrix_inverse(Sig).dot(W).dot(W)*n
gra_T, gra_gamma = tensor.grad(s, [T, gammath])
func = theano.function(inputs=[Xth, Yth], outputs=s, 
                       updates=[(T, T+0.1*gra_T/it**0.5), (gammath, gammath+0.05*gra_gamma/it**0.5), (it, it+1) ] )
                       #updates=[(T, T+0.1*gra_T), (it, it+1) ] )

np.random.set_state(st0)

In [ ]:
# run gradient ascent
max_iter = 400
S = np.zeros(max_iter)
Ts = np.zeros((max_iter, J, d))
gams = np.zeros(max_iter)
for t in range(max_iter):
    # stochastic gradient descent
    ind = np.random.choice(n, min(n, n), replace=False)
    S[t] = func(X[ind, :], Y[ind, :])
    Ts[t] = T.get_value()
    gams[t] = gammath.get_value()

In [ ]:
plt.plot(S)
plt.xlabel('gradient ascent iteration')
plt.ylabel('$\lambda(T)$')
besti = np.argmax(S)
print('highest objective: %.4g'%S[besti])
print('best Locations: ')
print(Ts[besti])

In [ ]:
# plot Gaussian widths
plt.plot(gams)
plt.xlabel('iteration')
plt.ylabel('Gaussian width')

In [ ]: