A notebook to demonstrate our new goodness-of-fit test on various models defined on $\mathbb{R}$ or $\mathbb{R}^2$.
In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import kgof
import kgof.data as data
import kgof.density as density
import kgof.goftest as gof
import kgof.kernel as kernel
import kgof.plot as plot
import kgof.util as util
import matplotlib
import matplotlib.pyplot as plt
import autograd.numpy as np
# import scipy.stats as stats
In [ ]:
import kgof.plot
kgof.plot.set_default_matplotlib_options()
In [ ]:
%%html
<style>
/* highlight */
.hl {
background-color: #eeee00;
padding: 1px 5px 1px 5px;
margin: 4px;
}
</style>
Define some convenient functions that we will use many times later.
In [ ]:
def prob2d_pqgauss(qmean=np.array([0, 0]), QVar=np.eye(2), seed=2):
"""
Construct a problem where p = N(0, I), q = N(0, QVar).
Return p, a DataSource (for q)
"""
p = density.Normal(np.array([0, 0]), np.eye(2))
ds = data.DSNormal(qmean, QVar)
return p, ds
def plot2d_pq(p, X, n_max=300, n_xdengrid=50, n_ydengrid=50,
xlim=None, ylim=None, margin=0, V=None, V0=None,
figw=8, figh=5):
"""
A function to plot the model p, and the sample from q, along with other
information such as the learned test locations V.
p: UnnormalizedDensity (model)
X: nx2 data matrix
n_max: do not plot the data more than this number of points
n_xdengrid, n_ydengrid: number of points to use to plot the density contour
V: J x 2 matrix of J features (test locations) to show
V0: J x 2 matrix of J features. Initial values before optimization.
"""
plt.figure(figsize=(figw, figh))
# plot the data
# plt.plot(X[:, 0], X[:, 1], 'o', alpha=0.7, color='#8282FF', markersize=8);
n = X.shape[0]
n_sub = min(n, n_max)
plt.plot(X[:n_sub, 0], X[:n_sub, 1], 'bo', alpha=0.8, markeredgecolor='white', markersize=6);
xmin, ymin = np.min(X, 0)
xmax, ymax = np.max(X, 0)
if xlim is None:
xlim = [xmin-margin, xmax+margin]
if ylim is None:
ylim = [ymin-margin, ymax+margin]
try:
flogden = lambda Xt: p.log_normalized_den(Xt)
# call it to see if it is implemented.
flogden(np.array([[1,2]]))
except NotImplementedError as e:
#print 'Use log of the unnormalized density.'
flogden = lambda Xt: p.log_den(Xt)
# get a mesh to plot the contour of the density p
XX, YY, Logden = plot.box_meshgrid(flogden, xlim, ylim, nx=n_xdengrid, ny=n_ydengrid)
# plot the unnormalized density
Den = np.exp(Logden)
# list_colors = plt.cm.datad['Reds']
# list_colors = list(list_colors)
# list_colors[-1] = (1, 1, 1)
# lscm = matplotlib.colors.LinearSegmentedColormap("my_Reds", list_colors)
plt.contourf(XX, YY, Den, 7,
cmap=plot.get_density_cmap(),
alpha = 0.7
);
if V is not None:
# feature
plt.plot(V[:, 0], V[:, 1], 'm*', markeredgecolor='white',
label='Optimized', markersize=34)
if V0 is not None:
# plot the initial test locations (before the optimization)
plt.plot(V0[:, 0], V0[:, 1], 'k*', markeredgecolor='white',
label='Initial', markersize=34)
plt.xlim(xlim)
plt.ylim(ylim)
In [ ]:
def func_fssd_power_criterion(p, X, k, V):
"""
Return the value of the power criterion of FSSD.
p: model density
X: n x d data matrix
k: a Kernle
V: J x d numpy array. J test locations
"""
dat = data.Data(X)
return gof.FSSD.power_criterion(p, dat, k, V, reg=1e-5, use_unbiased=False)
def plot1d_pq(p, X, func_obj=None, rescale_obj=False, xlim=None, n_dom=200, n_bins=20,
margin=0, V=None, V0=None,
figw=8, figh=5):
"""
A function to plot the model p, and the sample from q, along with other
information such as the learned test locations V.
Create a 1D plot.
p: UnnormalizedDensity (model)
func_obj: a function: m x 1 -> m-array for computing the optimization
objective at m locations.
rescale_obj: if true, rescale the objective function to fit the height
of the histogram.
n_dom: number of points to use to plot the density
n_bins: number of bins to use for the histogram
X: nx2 data matrix
V: J x 1 matrix of J features (test locations) to show
V0: J x 1 matrix of J features. Initial values before optimization.
"""
n, d = X.shape
assert d==1
assert V is None or V.shape[1] == 1
assert V0 is None or V0.shape[1] == 1
plt.figure(figsize=(figw, figh))
xmin = np.min(X, 0)
xmax = np.max(X, 0)
if xlim is None:
xlim = [xmin-margin, xmax+margin]
# plot the data as a histogram
hist_result = plt.hist(X, normed=True, bins=n_bins, alpha=0.8, label='Data')
hist_heights = hist_result[0]
hist_parts = hist_result[1]
try:
is_normalized = True
flogden = lambda Xt: p.log_normalized_den(Xt)
# call it to see if it is implemented.
flogden(np.array([[1]]))
except NotImplementedError as e:
#print 'Use log of the unnormalized density.'
flogden = lambda Xt: p.log_den(Xt)
is_normalized = False
dom = np.linspace(xlim[0], xlim[1], 200)
den = np.exp(flogden(dom[:, np.newaxis]))
if not is_normalized:
# If p is an unnormalized density, then
# normalize it to fit the height of the histogram
# (this is not technically correct. This is just a
# simple way to plot.)
hist_height = np.max(hist_heights)
max_den = np.max(den)
# renormalize
den = den/max_den*hist_height
# plot the density (may be unnormalized)
plt.plot(dom, den, 'r-', label=r'$p$')
if func_obj is not None:
# plot the optimization objective
objs = func_obj(dom[:, np.newaxis])
if rescale_obj:
# rescale to match the height of the histogram
max_obj = np.max(objs)
objs = objs/max_obj*hist_height*0.8
plt.plot(dom, objs, 'k-', label=r'Score function')
if V is not None:
# feature
plt.plot(V[:, 0], V[:, 1], 'm*', markeredgecolor='white',
label='Optimized', markersize=34)
if V0 is not None:
# plot the initial test locations (before the optimization)
plt.plot(V0[:, 0], V0[:, 1], 'k*', markeredgecolor='white',
label='Initial', markersize=34)
plt.xlim(xlim)
In [ ]:
def prob_1dmixture(pm=0, pv=1, qm1=0, qv1=1, qm2=1, qv2=1, seed=3):
"""
A 1D problem where both p and q are 2-component Gaussian mixture models.
p(x) = 0.5*N(0, 1) + 0.5*N(pm, pv)
q(x) = 0.5*N(qm1, qv1) + 0.5*N(qm2, qv2)
Return p and q (both are UnnormalizedDensity)
"""
assert pv > 0
assert qv1 > 0
assert qv2 > 0
p = density.IsoGaussianMixture(means=np.array([[0], [pm]]),
variances=np.array([1, pv]))
q = density.IsoGaussianMixture(means=np.array([[qm1], [qm2]]),
variances=np.array([qv1, qv2]))
return p, q
def func_interactive_1dmixture(pm=0, pv=1, qm1=0, qv1=1, qm2=1, qv2=1):
seed = 84
p, q = prob_1dmixture(pm=pm, pv=pv, qm1=qm1, qv1=qv1, qm2=qm2,
qv2=qv2, seed=seed)
# n = sample size to draw from q
n = 600
gwidth2 = 1.5**2
# generate data from q
ds = q.get_datasource()
dat = ds.sample(n, seed=seed+3)
Xs = dat.data()
# kernel
k = kernel.KGauss(sigma2=gwidth2)
def score_function(Vs):
"""
Vs: m x d test locations.
Evaluate the score at m locations
"""
m = Vs.shape[0]
objs = np.zeros(m)
for i in range(m):
v = Vs[i, :]
obj = func_fssd_power_criterion(p, Xs, k, v[np.newaxis, :])
objs[i] = obj
return objs
# plot the problem
plot1d_pq(p, Xs, func_obj=score_function, rescale_obj=False,
margin=3, n_dom=100, n_bins=12, figw=8, figh=5)
print('p = 0.5*N(0, 1) + 0.5*N({}, {})'.format(pm, pv))
print('q = 0.5*N({}, {}) + 0.5*N({}, {})'.format(qm1, qv1, qm2, qv2))
plt.legend(loc='upper right', fontsize=20)
In [ ]:
# from __future__ import print_function
from ipywidgets import interact, interactive, fixed
from IPython.display import display
import ipywidgets as widgets
vs = interactive(func_interactive_1dmixture, pm=(0, 10, 1), pv=(1e-3, 5, 1),
qm1=(-5, 10, 1), qv1=(1e-3, 5, 1), qm2=(-5, 10, 1),
qv2=(1e-3, 5, 1)
)
display(vs)
Given a known probability density $p$ (model) and a sample $\{ \mathbf{x}_i \}_{i=1}^n \sim q$ where $q$ is an unknown density, a goodness-of-fit test proposes the null hypothesis
$H_0: p = q$
against the alternative hypothesis
$H_1: p \neq q$.
In other words, it tests whether or not the sample $\{ \mathbf{x}_i \}_{i=1}^n $ is distributed according to a known $p$.
Let us assume that the model
$$p(\mathbf{x})=\mathcal{N}\left(\left[\begin{array}{c} 0\\ 0 \end{array}\right],\left[\begin{array}{cc} 1 & 0\\ 0 & 1 \end{array}\right]\right). $$in $\mathbb{R}^2$ (two-dimensional space).
The model $p$ is specified by its density function. Specifically, our test requires $\nabla_{\mathbf{x}} \log p(\mathbf{x})$ (i.e., the derivative of the log density) which does not depend on the normalizer. So, when specifying the density $p$, it is sufficient to specify it up to the normalizer. The gradient $\nabla_{\mathbf{x}} \log p(\mathbf{x})$ will be automatically computed by autograd
. Alternatively, we can also directly specify $\nabla_{\mathbf{x}} \log p(\mathbf{x})$. Here, we will construct our model by specifying $p(\mathbf{x})$.
Assume that we do the following import
import kgof.density as density
.
In kgof
package, a model $p$ can be specified by implementing the class density.UnnormalizedDensity
. Implementing this directly is a bit tedious, however. An easier way is to use the function
density.from_log_den(d, f)
which takes as input 2 arguments:
d
: the dimension of the input spacef
: a function taking in a 2D numpy array of size n x d
and producing a one-dimensional array of size n
for the n
values of the log unnormalized density.Let us construct an UnnormalizedDensity
which is the object representing a model. All the implemented goodness-of-fit tests take this object as an input.
In [ ]:
# Assume two dimensions.
d = 2
def isogauss_log_den(X):
"""
Evaluate the log density of the standard isotropic Gaussian
at the points (rows) in X.
Note that the density is NOT normalized.
X: n x d nd-array
return a length-n array
"""
# d = dimension of the input space
unden = -np.sum(X**2, 1)/2.0
return unden
# p is an UnnormalizedDensity object
p = density.from_log_den(d, isogauss_log_den)
In practice, the data $\{ \mathbf{x}_i \}_{i=1}^n $ are given to us. However, here as an illustrative example, we will generate them from $q$ which is
$$q(\mathbf{x})=\mathcal{N}\left(\left[\begin{array}{c} m\\ 0 \end{array}\right],\left[\begin{array}{cc} 1 & 0\\ 0 & 1 \end{array}\right]\right), $$where $m$ specifies the mean of the first coordinate of $q$. From this setting, if $m\neq 0$, then $H_1$ is true and the test should reject $H_0$. If $m=0$, then $p=q$, and the test should not reject $H_0$.
In [ ]:
# Let's assume that m = 1.
m = 1
# Draw n points from q
seed = 5
np.random.seed(seed)
n = 300
X = np.random.randn(n, 2) + np.array([m, 0])
Plot the data and model $p$.
In [ ]:
plot2d_pq(p, X)
plt.axis('equal');
plt.grid()
All the implemented tests take the data in the form of a data.Data
object. This is just an encapsulation of the sample X
. To construct data.Data
we do the following
In [ ]:
# dat will be fed to the test.
dat = data.Data(X)
In [ ]:
# We will use some portion of the data for parameter tuning, and the rest for testing.
tr, te = dat.split_tr_te(tr_proportion=0.5, seed=2)
Let us optimize the parameters of the test on tr
. The optimization relies on autograd
to compute the gradient. We will use a Gaussian kernel for the test.
In [ ]:
# J is the number of test locations (or features). Typically not larger than 10.
J = 1
# There are many options for the optimization.
# Almost all of them have default values.
# Here, we will list a few to give you a sense of what you can control.
# Full options can be found in gof.GaussFSSD.optimize_locs_widths(..)
opts = {
'reg': 1e-2, # regularization parameter in the optimization objective
'max_iter': 50, # maximum number of gradient ascent iterations
'tol_fun':1e-4, # termination tolerance of the objective
}
# make sure to give tr (NOT te).
# do the optimization with the options in opts.
V_opt, gw_opt, opt_info = gof.GaussFSSD.optimize_auto_init(p, tr, J, **opts)
The optimization procedure returns back
V_opt
: optimized test locations (features). A $J \times d$ numpy array.gw_opt
: optimized Gaussian width (for the Gaussian kernel). A floating point number.opt_info
: a dictionary containing information gathered during the optimization.
In [ ]:
opt_info
In [ ]:
# alpha = significance level of the test
alpha = 0.01
fssd_opt = gof.GaussFSSD(p, gw_opt, V_opt, alpha)
In [ ]:
# return a dictionary of testing results
test_result = fssd_opt.perform_test(te)
test_result
Recall that the null hypothesis was $H_0: p=q$. It can be seen that the test correctly rejects $H_0$ with a very small p-value.
In [ ]:
Xtr = tr.data()
plot2d_pq(p, Xtr, V=V_opt, margin=0.5,
# , xlim=[-4, 4]
)
plt.axis('equal');
# plt.legend()
Go back to where we sample the data from $q$, and change `m` to 0 (i.e., the mean of the first coordinate of $q$) . This will make $p=q$ so that $H_0$ is now true. Run the whole procedure again and verify that the test will not reject $H_0$. (Technically, the probability of rejecting is about $\alpha$.)
Note that when the test fails to reject, the learned features are not interpretable. They will be arbitrary.
In [ ]:
def init_params(Xtr, J, seed=40):
"""
Return V0, gwidth0 where V0 is a J x d numpy array containing
J initial test locations before optimization. These are initialized
by drawing from a Gaussian fitted to the training set.
gwidth0 is the initial Gaussian width initialized by
the median heuristic.
Xtr: n x d numpy array of n data points
J: number of test locations required
"""
# random test locations
V0 = util.fit_gaussian_draw(Xtr, J, seed=seed+11)
gwidth0 = util.meddistance(Xtr, subsample=1000)**2
return V0, gwidth0
def optimize_params(p, tr, V0, gwidth0):
"""
Optimize the test locations and the Gaussian width on the trainig data.
p: the model. UnnormalizedDensity.
tr: data.Data object representing the training set
V0: J x d numpy array of J initial test locations
gwidth0: initial Gaussian width
opts: a dictionary containing options to the optimizer
Return V_opt, gw_opt, opt_info
"""
# There are many options for the optimization.
# Almost all of them have default values.
# Here, we will list a few to give you a sense of what you can control.
# Full options can be found in gof.GaussFSSD.optimize_locs_widths(..)
opts = {
'reg': 1e-2, # regularization parameter in the optimization objective
'max_iter': 50, # maximum number of gradient ascent iterations
'tol_fun':1e-5, # termination tolerance of the objective
'gwidth_lb': 1**2, #absolute lower bound on the Gaussian width^2
'gwidth_ub': 10**2,
}
# make sure to give tr (NOT te).
# do the optimization with the options in opts.
# V_opt, gw_opt, opt_info = gof.GaussFSSD.optimize_auto_init(p, tr, J, **opts)
V_opt, gw_opt, opt_info = gof.GaussFSSD.optimize_locs_widths(p, tr, gwidth0, V0, **opts)
return V_opt, gw_opt, opt_info
In [ ]:
# p is an UnnormalizedDensity object
seed = 30
n = 1500
p, ds = prob2d_pqgauss(qmean=np.array([0, 0]), QVar=np.diag([2, 0.5]), seed=seed)
dat = ds.sample(n, seed=seed+2)
X = dat.data()
# plot
plot2d_pq(p, X)
plt.axis('equal');
In [ ]:
# We will use a portion (tr_proportion) of the data for parameter tuning
tr, te = dat.split_tr_te(tr_proportion=0.3, seed=seed+8)
Xtr = tr.data()
# J is the number of test locations (or features). Typically not larger than 10.
J = 2
V0, gwidth0 = init_params(Xtr, J, seed=seed+3)
V_opt, gw_opt, opt_info = optimize_params(p, tr, V0, gwidth0)
opt_info
In [ ]:
# alpha = significance level of the test
alpha = 0.01
fssd_opt = gof.GaussFSSD(p, gw_opt, V_opt, alpha)
# Goodness-of-fit test
# return a dictionary of testing results
test_result = fssd_opt.perform_test(te)
test_result
In [ ]:
plot2d_pq(p, Xtr, V0=V0, V=V_opt, margin=0,
# , xlim=[-4, 4]
)
plt.axis('equal');
plt.legend(fontsize=20)
In [ ]:
# k x d where k = number of mixture components
pmeans = np.array([[0, 0], [3, 3], [3, 0]])
pvariances = np.ones(3)*1
p = density.IsoGaussianMixture(pmeans, pvariances)
In [ ]:
# q is a Gaussian mixture
qmeans = np.array([[0, 0], [3, 3], [0, 3]])
qvariances = pvariances
q = density.IsoGaussianMixture(qmeans, qvariances)
ds = q.get_datasource()
In [ ]:
# generate some data from q
n = 800
dat = ds.sample(n, seed=seed+23)
X = dat.data()
In [ ]:
plot2d_pq(p, X, margin=1)
In [ ]:
# We will use a portion (tr_proportion) of the data for parameter tuning
tr, te = dat.split_tr_te(tr_proportion=0.5, seed=seed+8)
Xtr = tr.data()
# J is the number of test locations (or features). Typically not larger than 10.
J = 2
V0, gwidth0 = init_params(Xtr, J, seed=seed+3)
V_opt, gw_opt, opt_info = optimize_params(p, tr, V0, gwidth0)
opt_info
In [ ]:
# alpha = significance level of the test
alpha = 0.01
fssd_opt = gof.GaussFSSD(p, gw_opt, V_opt, alpha)
# Goodness-of-fit test
# return a dictionary of testing results
test_result = fssd_opt.perform_test(te)
test_result
In [ ]:
plot2d_pq(p, Xtr, V0=V0, V=V_opt, margin=0,
# , xlim=[-4, 4]
)
plt.axis('equal');
plt.legend(fontsize=20)
In [ ]:
V_opt
In [ ]:
gw_opt
In [ ]:
def prob_rbm_perturb(B_scale=1, b_scale=1, c_scale=1,
B_perturb=0, b_perturb=0, c_perturb=0,
d=1, dh=8, seed=38):
"""
B_scale, b_scale, c_scale: control scaling of the parameters of the RBMs
B_perturb, b_perturb, c_perturb: how much q differs from p
d: dimension of the input
dh: dimension of the hidden variables
Return p and q (both are UnnormalizedDensity)
"""
assert B_scale >= 0
assert b_scale >= 0
assert c_scale >= 0
assert B_perturb >= 0
assert b_perturb >= 0
assert c_perturb >= 0
with util.NumpySeedContext(seed=seed):
# B = np.array([-1, 1])[np.random.randint(0, 2, (d, dh))]
B = np.random.randn(d, dh)*B_scale
b = np.random.rand(d)*b_scale - b_scale/2.0
c = np.random.rand(dh)*c_scale - c_scale/2.0
# the model p
p = density.GaussBernRBM(B, b, c)
# perturb parameters of p to construct q
Bq = B + np.random.randn(d, dh)*B_perturb
bq = b + np.random.randn(d)*b_perturb
cq = c + np.random.randn(dh)*c_perturb
# construct the density q
q = density.GaussBernRBM(Bq, bq, cq)
return p, q
def func_interactive_rbm_problem(B_scale=1, b_scale=1, c_scale=1,
B_perturb=0, b_perturb=0, c_perturb=0):
dh = 7
d = 1
seed = 84
# n = sample size to draw from q
n = 500
gwidth2 = 3**2
p, q = prob_rbm_perturb(B_scale=B_scale, b_scale=b_scale, c_scale=c_scale,
B_perturb=B_perturb, b_perturb=b_perturb, c_perturb=c_perturb,
d=d, dh=dh, seed=seed)
# generate data from q
ds = q.get_datasource()
# Sampling from the RBM relies on Gibbs. Just to make it cheaper,
# let us set the burnin iterations to be reasonably small.
ds.burnin = 1000
dat = ds.sample(n, seed=seed+37)
Xs = dat.data()
# kernel
k = kernel.KGauss(sigma2=gwidth2)
def score_function(Vs):
"""
Vs: m x d test locations.
Evaluate the score at m locations
"""
m = Vs.shape[0]
objs = np.zeros(m)
for i in range(m):
v = Vs[i, :]
obj = func_fssd_power_criterion(p, Xs, k, v[np.newaxis, :])
objs[i] = obj
return objs
# plot the problem
plot1d_pq(p, Xs, func_obj=score_function, rescale_obj=False,
margin=1, n_dom=100, n_bins=12, figw=10, figh=6)
plt.legend()
In [ ]:
vs = interactive(func_interactive_rbm_problem, B_scale=(0, 2, 0.1),
b_scale=(0, 3, 0.2), c_scale=(0, 3, 0.2),
B_perturb=(0, 1, 0.05),
b_perturb=fixed(0), c_perturb=fixed(0)
)
display(vs)
In the widget above, B_scale
, b_scale
, and c_scale
controls the parameters of the model $p$. $q$ is the same as $p$ unless B_perturb
is not 0 i.e., the matrix parameter in $p$ is perturbed.
In [ ]:
In [ ]:
In [ ]:
In [ ]: