A notebook to demonstrate our new goodness-of-fit test on various models defined on $\mathbb{R}$ or $\mathbb{R}^2$.


In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

import kgof
import kgof.data as data
import kgof.density as density
import kgof.goftest as gof
import kgof.kernel as kernel
import kgof.plot as plot
import kgof.util as util

import matplotlib
import matplotlib.pyplot as plt
import autograd.numpy as np
# import scipy.stats as stats

In [ ]:
import kgof.plot
kgof.plot.set_default_matplotlib_options()

In [ ]:
%%html
<style>
/* highlight */
.hl {
    background-color: #eeee00;
    padding: 1px 5px 1px 5px;
    margin: 4px;
}
</style>

Define some convenient functions that we will use many times later.


In [ ]:
def prob2d_pqgauss(qmean=np.array([0, 0]), QVar=np.eye(2), seed=2):
    """
    Construct a problem where p = N(0, I), q = N(0, QVar).
    
    Return p, a DataSource (for q)
    """
    p = density.Normal(np.array([0, 0]), np.eye(2))
    ds = data.DSNormal(qmean, QVar)
    return p, ds

def plot2d_pq(p, X, n_max=300, n_xdengrid=50, n_ydengrid=50, 
            xlim=None, ylim=None, margin=0, V=None, V0=None, 
              figw=8, figh=5):
    """
    A function to plot the model p, and the sample from q, along with other
    information such as the learned test locations V.
    
    p: UnnormalizedDensity (model)
    X: nx2 data matrix
    n_max: do not plot the data more than this number of points
    n_xdengrid, n_ydengrid: number of points to use to plot the density contour 
    V: J x 2 matrix of J features (test locations) to show
    V0: J x 2 matrix of J features. Initial values before optimization.
    """
    plt.figure(figsize=(figw, figh))
    # plot the data
#     plt.plot(X[:, 0], X[:, 1], 'o', alpha=0.7, color='#8282FF', markersize=8);
    n = X.shape[0]
    n_sub = min(n, n_max)
    plt.plot(X[:n_sub, 0], X[:n_sub, 1], 'bo', alpha=0.8, markeredgecolor='white', markersize=6);
    xmin, ymin = np.min(X, 0)
    xmax, ymax = np.max(X, 0)
    if xlim is None:
        xlim = [xmin-margin, xmax+margin]
    if ylim is None:
        ylim = [ymin-margin, ymax+margin]
    
    
    try: 
        flogden = lambda Xt: p.log_normalized_den(Xt)
        # call it to see if it is implemented.
        flogden(np.array([[1,2]]))
    except NotImplementedError as e:
        #print 'Use log of the unnormalized density.'
        flogden = lambda Xt: p.log_den(Xt)
    # get a mesh to plot the contour of the density p
    XX, YY, Logden = plot.box_meshgrid(flogden, xlim, ylim, nx=n_xdengrid, ny=n_ydengrid)
    
    # plot the unnormalized density
    Den = np.exp(Logden)
#     list_colors = plt.cm.datad['Reds']
#     list_colors = list(list_colors)
#     list_colors[-1] = (1, 1, 1)
#     lscm = matplotlib.colors.LinearSegmentedColormap("my_Reds", list_colors)

    plt.contourf(XX, YY, Den, 7, 
                cmap=plot.get_density_cmap(),
                 alpha = 0.7
               );
    if V is not None:
        # feature
        plt.plot(V[:, 0], V[:, 1], 'm*', markeredgecolor='white', 
                 label='Optimized', markersize=34)
    if V0 is not None:
        # plot the initial test locations (before the optimization)
        plt.plot(V0[:, 0], V0[:, 1], 'k*', markeredgecolor='white', 
                 label='Initial', markersize=34)
    plt.xlim(xlim)
    plt.ylim(ylim)

Interactive 1D mixture model problem


In [ ]:
def func_fssd_power_criterion(p, X, k, V):
    """
    Return the value of the power criterion of FSSD.
    p: model density
    X: n x d data matrix
    k: a Kernle
    V: J x d numpy array. J test locations
    """
    dat = data.Data(X)
    return gof.FSSD.power_criterion(p, dat, k, V, reg=1e-5, use_unbiased=False)

def plot1d_pq(p, X, func_obj=None, rescale_obj=False, xlim=None, n_dom=200, n_bins=20,
              margin=0, V=None, V0=None, 
              figw=8, figh=5):
    """
    A function to plot the model p, and the sample from q, along with other
    information such as the learned test locations V. 
    Create a 1D plot.
    
    p: UnnormalizedDensity (model)
    func_obj: a function: m x 1 -> m-array for computing the optimization 
        objective at m locations.
    rescale_obj: if true, rescale the objective function to fit the height
        of the histogram.
    n_dom: number of points to use to plot the density
    n_bins: number of bins to use for the histogram
    X: nx2 data matrix
    V: J x 1 matrix of J features (test locations) to show
    V0: J x 1 matrix of J features. Initial values before optimization.
    """
    n, d = X.shape
    assert d==1
    assert V is None or V.shape[1] == 1
    assert V0 is None or V0.shape[1] == 1
    
    plt.figure(figsize=(figw, figh))
    xmin = np.min(X, 0)
    xmax = np.max(X, 0)
    if xlim is None:
        xlim = [xmin-margin, xmax+margin]
    
    # plot the data as a histogram
    hist_result = plt.hist(X, normed=True, bins=n_bins, alpha=0.8, label='Data')
    hist_heights = hist_result[0]
    hist_parts = hist_result[1]
    
    try: 
        is_normalized = True
        flogden = lambda Xt: p.log_normalized_den(Xt)
        # call it to see if it is implemented.
        flogden(np.array([[1]]))
    except NotImplementedError as e:
        #print 'Use log of the unnormalized density.'
        flogden = lambda Xt: p.log_den(Xt)
        is_normalized = False
        
    dom = np.linspace(xlim[0], xlim[1], 200)
    den = np.exp(flogden(dom[:, np.newaxis]))
    if not is_normalized:
        # If p is an unnormalized density, then 
        # normalize it to fit the height of the histogram
        # (this is not technically correct. This is just a 
#         simple way to plot.)
        hist_height = np.max(hist_heights)
        max_den = np.max(den)
        # renormalize
        den = den/max_den*hist_height
     
    # plot the density (may be unnormalized)
    plt.plot(dom, den, 'r-', label=r'$p$')
    
    if func_obj is not None:
        # plot the optimization objective
        objs = func_obj(dom[:, np.newaxis])
        if rescale_obj:
            # rescale to match the height of the histogram
            max_obj = np.max(objs)
            objs = objs/max_obj*hist_height*0.8
        plt.plot(dom, objs, 'k-', label=r'Score function')
    
    if V is not None:
        # feature
        plt.plot(V[:, 0], V[:, 1], 'm*', markeredgecolor='white', 
                 label='Optimized', markersize=34)
    if V0 is not None:
        # plot the initial test locations (before the optimization)
        plt.plot(V0[:, 0], V0[:, 1], 'k*', markeredgecolor='white', 
                 label='Initial', markersize=34)
    plt.xlim(xlim)

In [ ]:
def prob_1dmixture(pm=0, pv=1, qm1=0, qv1=1, qm2=1, qv2=1, seed=3):
    """
    A 1D problem where both p and q are 2-component Gaussian mixture models.
    p(x) = 0.5*N(0, 1) + 0.5*N(pm, pv)
    q(x) = 0.5*N(qm1, qv1) + 0.5*N(qm2, qv2)
    
    Return p and q (both are UnnormalizedDensity)
    """
    assert pv > 0
    assert qv1 > 0
    assert qv2 > 0
    p = density.IsoGaussianMixture(means=np.array([[0], [pm]]), 
                                  variances=np.array([1, pv]))
    q = density.IsoGaussianMixture(means=np.array([[qm1], [qm2]]),
                                  variances=np.array([qv1, qv2]))
    return p, q

def func_interactive_1dmixture(pm=0, pv=1, qm1=0, qv1=1, qm2=1, qv2=1):
    
    seed = 84
    p, q = prob_1dmixture(pm=pm, pv=pv, qm1=qm1, qv1=qv1, qm2=qm2,
                          qv2=qv2, seed=seed)
    
    # n = sample size to draw from q
    n = 600
    gwidth2 = 1.5**2
    # generate data from q
    ds = q.get_datasource()
    dat = ds.sample(n, seed=seed+3)
    Xs = dat.data()
    # kernel
    k = kernel.KGauss(sigma2=gwidth2)
    def score_function(Vs):
        """
        Vs: m x d test locations. 
        Evaluate the score at m locations
        """
        m = Vs.shape[0]
        objs = np.zeros(m)
        for i in range(m):
            v = Vs[i, :]
            obj = func_fssd_power_criterion(p, Xs, k, v[np.newaxis, :])
            objs[i] = obj
        return objs
    
    # plot the problem
    plot1d_pq(p, Xs, func_obj=score_function, rescale_obj=False,
              margin=3, n_dom=100, n_bins=12, figw=8, figh=5)
    print('p = 0.5*N(0, 1) + 0.5*N({}, {})'.format(pm, pv))
    print('q = 0.5*N({}, {}) + 0.5*N({}, {})'.format(qm1, qv1, qm2, qv2))
    plt.legend(loc='upper right', fontsize=20)

In [ ]:
# from __future__ import print_function
from ipywidgets import interact, interactive, fixed
from IPython.display import display
import ipywidgets as widgets

vs = interactive(func_interactive_1dmixture, pm=(0, 10, 1), pv=(1e-3, 5, 1),
                 qm1=(-5, 10, 1), qv1=(1e-3, 5, 1), qm2=(-5, 10, 1),
                 qv2=(1e-3, 5, 1)
                )
display(vs)

Goodness-of-fit test

Given a known probability density $p$ (model) and a sample $\{ \mathbf{x}_i \}_{i=1}^n \sim q$ where $q$ is an unknown density, a goodness-of-fit test proposes the null hypothesis

$H_0: p = q$

against the alternative hypothesis

$H_1: p \neq q$.

In other words, it tests whether or not the sample $\{ \mathbf{x}_i \}_{i=1}^n $ is distributed according to a known $p$.

A Gaussian mean shift problem

Let us assume that the model

$$p(\mathbf{x})=\mathcal{N}\left(\left[\begin{array}{c} 0\\ 0 \end{array}\right],\left[\begin{array}{cc} 1 & 0\\ 0 & 1 \end{array}\right]\right). $$

in $\mathbb{R}^2$ (two-dimensional space).

Construct the model $p$

The model $p$ is specified by its density function. Specifically, our test requires $\nabla_{\mathbf{x}} \log p(\mathbf{x})$ (i.e., the derivative of the log density) which does not depend on the normalizer. So, when specifying the density $p$, it is sufficient to specify it up to the normalizer. The gradient $\nabla_{\mathbf{x}} \log p(\mathbf{x})$ will be automatically computed by autograd. Alternatively, we can also directly specify $\nabla_{\mathbf{x}} \log p(\mathbf{x})$. Here, we will construct our model by specifying $p(\mathbf{x})$.

Assume that we do the following import

import kgof.density as density.

In kgof package, a model $p$ can be specified by implementing the class density.UnnormalizedDensity. Implementing this directly is a bit tedious, however. An easier way is to use the function

density.from_log_den(d, f)

which takes as input 2 arguments:

  1. d: the dimension of the input space
  2. f: a function taking in a 2D numpy array of size n x d and producing a one-dimensional array of size n for the n values of the log unnormalized density.

Let us construct an UnnormalizedDensity which is the object representing a model. All the implemented goodness-of-fit tests take this object as an input.


In [ ]:
# Assume two dimensions.
d = 2
def isogauss_log_den(X):
    """
    Evaluate the log density of the standard isotropic Gaussian 
    at the points (rows) in X.
    Note that the density is NOT normalized. 
    
    X: n x d nd-array
    return a length-n array
    """
    # d = dimension of the input space
    unden = -np.sum(X**2, 1)/2.0
    return unden

# p is an UnnormalizedDensity object
p = density.from_log_den(d, isogauss_log_den)

Generate data from $q$

In practice, the data $\{ \mathbf{x}_i \}_{i=1}^n $ are given to us. However, here as an illustrative example, we will generate them from $q$ which is

$$q(\mathbf{x})=\mathcal{N}\left(\left[\begin{array}{c} m\\ 0 \end{array}\right],\left[\begin{array}{cc} 1 & 0\\ 0 & 1 \end{array}\right]\right), $$

where $m$ specifies the mean of the first coordinate of $q$. From this setting, if $m\neq 0$, then $H_1$ is true and the test should reject $H_0$. If $m=0$, then $p=q$, and the test should not reject $H_0$.


In [ ]:
# Let's assume that m = 1.
m = 1

# Draw n points from q
seed = 5
np.random.seed(seed)
n = 300
X = np.random.randn(n, 2) + np.array([m, 0])

Plot the data and model $p$.


In [ ]:
plot2d_pq(p, X)
plt.axis('equal');
plt.grid()

All the implemented tests take the data in the form of a data.Data object. This is just an encapsulation of the sample X. To construct data.Data we do the following


In [ ]:
# dat will be fed to the test.
dat = data.Data(X)

Optimization of test locations

Now that we have the data, let us randomly split it into two disjoint halves: tr and te. The training set tr will be used for parameter optimization. The testing set te will be used for the actual goodness-of-fit test. tr and te are again of type data.Data.


In [ ]:
# We will use some portion of the data for parameter tuning, and the rest for testing.
tr, te = dat.split_tr_te(tr_proportion=0.5, seed=2)

Let us optimize the parameters of the test on tr. The optimization relies on autograd to compute the gradient. We will use a Gaussian kernel for the test.


In [ ]:
# J is the number of test locations (or features). Typically not larger than 10.
J = 1

# There are many options for the optimization. 
# Almost all of them have default values. 
# Here, we will list a few to give you a sense of what you can control.
# Full options can be found in gof.GaussFSSD.optimize_locs_widths(..)
opts = {
    'reg': 1e-2, # regularization parameter in the optimization objective
    'max_iter': 50, # maximum number of gradient ascent iterations
    'tol_fun':1e-4, # termination tolerance of the objective
}

# make sure to give tr (NOT te).
# do the optimization with the options in opts.
V_opt, gw_opt, opt_info = gof.GaussFSSD.optimize_auto_init(p, tr, J, **opts)

The optimization procedure returns back

  1. V_opt: optimized test locations (features). A $J \times d$ numpy array.
  2. gw_opt: optimized Gaussian width (for the Gaussian kernel). A floating point number.
  3. opt_info: a dictionary containing information gathered during the optimization.

In [ ]:
opt_info

Goodness-of-fit test

Let us use these optimized parameters to construct the FSSD test. Our test using a Gaussian kernels is implemented in kgof.goftest.GaussFSSD.


In [ ]:
# alpha = significance level of the test
alpha = 0.01
fssd_opt = gof.GaussFSSD(p, gw_opt, V_opt, alpha)

In [ ]:
# return a dictionary of testing results
test_result = fssd_opt.perform_test(te)
test_result

Recall that the null hypothesis was $H_0: p=q$. It can be seen that the test correctly rejects $H_0$ with a very small p-value.

Learned test location $\mathbf{v}$

Let us check the optimized test location(s). We will plot the training data, the learned feature(s) and the contour of the unnormalized density $p$ in red.


In [ ]:
Xtr = tr.data()
plot2d_pq(p, Xtr, V=V_opt, margin=0.5,
#         , xlim=[-4, 4]
       )
plt.axis('equal');
# plt.legend()
Exercise

Go back to where we sample the data from $q$, and change `m` to 0 (i.e., the mean of the first coordinate of $q$) . This will make $p=q$ so that $H_0$ is now true. Run the whole procedure again and verify that the test will not reject $H_0$. (Technically, the probability of rejecting is about $\alpha$.)

Note that when the test fails to reject, the learned features are not interpretable. They will be arbitrary.

Gaussian problem. Variance difference.


In [ ]:
def init_params(Xtr, J, seed=40):
    """
    Return V0, gwidth0 where V0 is a J x d numpy array containing
    J initial test locations before optimization. These are initialized
    by drawing from a Gaussian fitted to the training set.
    gwidth0 is the initial Gaussian width initialized by 
    the median heuristic.
    
    Xtr: n x d numpy array of n data points
    J: number of test locations required
    """
    
    # random test locations
    V0 = util.fit_gaussian_draw(Xtr, J, seed=seed+11)
    gwidth0 = util.meddistance(Xtr, subsample=1000)**2
    return V0, gwidth0


def optimize_params(p, tr, V0, gwidth0):
    """
    Optimize the test locations and the Gaussian width on the trainig data.
    
    p: the model. UnnormalizedDensity.
    tr: data.Data object representing the training set
    V0: J x d numpy array of J initial test locations
    gwidth0: initial Gaussian width
    opts: a dictionary containing options to the optimizer
    
    Return V_opt, gw_opt, opt_info
    """
    # There are many options for the optimization. 
    # Almost all of them have default values. 
    # Here, we will list a few to give you a sense of what you can control.
    # Full options can be found in gof.GaussFSSD.optimize_locs_widths(..)
    opts = {
        'reg': 1e-2, # regularization parameter in the optimization objective
        'max_iter': 50, # maximum number of gradient ascent iterations
        'tol_fun':1e-5, # termination tolerance of the objective
        'gwidth_lb': 1**2, #absolute lower bound on the Gaussian width^2
        'gwidth_ub': 10**2,
    }

    # make sure to give tr (NOT te).
    # do the optimization with the options in opts.
    # V_opt, gw_opt, opt_info = gof.GaussFSSD.optimize_auto_init(p, tr, J, **opts)
    V_opt, gw_opt, opt_info = gof.GaussFSSD.optimize_locs_widths(p, tr, gwidth0, V0, **opts)
    return V_opt, gw_opt, opt_info

Create the Gaussian variance difference problem


In [ ]:
# p is an UnnormalizedDensity object
seed = 30
n = 1500
p, ds = prob2d_pqgauss(qmean=np.array([0, 0]), QVar=np.diag([2, 0.5]), seed=seed)
dat = ds.sample(n, seed=seed+2)
X = dat.data()

# plot
plot2d_pq(p, X)
plt.axis('equal');

Optimize the parameters


In [ ]:
# We will use a portion (tr_proportion) of the data for parameter tuning
tr, te = dat.split_tr_te(tr_proportion=0.3, seed=seed+8)
Xtr = tr.data()

# J is the number of test locations (or features). Typically not larger than 10.
J = 2
V0, gwidth0 = init_params(Xtr, J, seed=seed+3)
V_opt, gw_opt, opt_info = optimize_params(p, tr, V0, gwidth0)
opt_info

Goodness-of-fit test


In [ ]:
# alpha = significance level of the test
alpha = 0.01
fssd_opt = gof.GaussFSSD(p, gw_opt, V_opt, alpha)

# Goodness-of-fit test
# return a dictionary of testing results
test_result = fssd_opt.perform_test(te)
test_result

In [ ]:
plot2d_pq(p, Xtr, V0=V0, V=V_opt, margin=0,
#         , xlim=[-4, 4]
       )
plt.axis('equal');
plt.legend(fontsize=20)


A complicated 2D mixture model


In [ ]:
# k x d where k = number of mixture components
pmeans = np.array([[0, 0], [3, 3], [3, 0]])
pvariances = np.ones(3)*1
p = density.IsoGaussianMixture(pmeans, pvariances)

In [ ]:
# q is a Gaussian mixture
qmeans = np.array([[0, 0], [3, 3], [0, 3]])
qvariances = pvariances
q = density.IsoGaussianMixture(qmeans, qvariances)
ds = q.get_datasource()

In [ ]:
# generate some data from q
n = 800
dat = ds.sample(n, seed=seed+23)
X = dat.data()

In [ ]:
plot2d_pq(p, X, margin=1)

In [ ]:
# We will use a portion (tr_proportion) of the data for parameter tuning
tr, te = dat.split_tr_te(tr_proportion=0.5, seed=seed+8)
Xtr = tr.data()

# J is the number of test locations (or features). Typically not larger than 10.
J = 2
V0, gwidth0 = init_params(Xtr, J, seed=seed+3)
V_opt, gw_opt, opt_info = optimize_params(p, tr, V0, gwidth0)
opt_info

In [ ]:
# alpha = significance level of the test
alpha = 0.01
fssd_opt = gof.GaussFSSD(p, gw_opt, V_opt, alpha)

# Goodness-of-fit test
# return a dictionary of testing results
test_result = fssd_opt.perform_test(te)
test_result

In [ ]:
plot2d_pq(p, Xtr, V0=V0, V=V_opt, margin=0,
#         , xlim=[-4, 4]
       )
plt.axis('equal');
plt.legend(fontsize=20)

In [ ]:
V_opt

In [ ]:
gw_opt

Complicated model: 1D Restricted Boltzmann Machine


In [ ]:
def prob_rbm_perturb(B_scale=1, b_scale=1, c_scale=1, 
                     B_perturb=0, b_perturb=0, c_perturb=0,
                    d=1, dh=8, seed=38):
    """
    B_scale, b_scale, c_scale: control scaling of the parameters of the RBMs
    B_perturb, b_perturb, c_perturb: how much q differs from p
    d: dimension of the input
    dh: dimension of the hidden variables
    
    Return p and q (both are UnnormalizedDensity)
    """
    assert B_scale >= 0
    assert b_scale >= 0
    assert c_scale >= 0
    assert B_perturb >= 0
    assert b_perturb >= 0
    assert c_perturb >= 0
    
    with util.NumpySeedContext(seed=seed):
    #     B = np.array([-1, 1])[np.random.randint(0, 2, (d, dh))]
        B = np.random.randn(d, dh)*B_scale
        b = np.random.rand(d)*b_scale - b_scale/2.0
        c = np.random.rand(dh)*c_scale - c_scale/2.0

        # the model p
        p = density.GaussBernRBM(B, b, c)

        # perturb parameters of p to construct q
        Bq = B + np.random.randn(d, dh)*B_perturb
        bq = b + np.random.randn(d)*b_perturb
        cq = c + np.random.randn(dh)*c_perturb
        # construct the density q
        q = density.GaussBernRBM(Bq, bq, cq)
    return p, q

def func_interactive_rbm_problem(B_scale=1, b_scale=1, c_scale=1, 
                     B_perturb=0, b_perturb=0, c_perturb=0):
    dh = 7
    d = 1
    seed = 84
    # n = sample size to draw from q
    n = 500
    gwidth2 = 3**2
    
    p, q = prob_rbm_perturb(B_scale=B_scale, b_scale=b_scale, c_scale=c_scale, 
                     B_perturb=B_perturb, b_perturb=b_perturb, c_perturb=c_perturb,
                    d=d, dh=dh, seed=seed)
    
    # generate data from q
    ds = q.get_datasource()
    # Sampling from the RBM relies on Gibbs. Just to make it cheaper,
    # let us set the burnin iterations to be reasonably small.
    ds.burnin = 1000
    dat = ds.sample(n, seed=seed+37)
    Xs = dat.data()
    # kernel
    k = kernel.KGauss(sigma2=gwidth2)
    def score_function(Vs):
        """
        Vs: m x d test locations. 
        Evaluate the score at m locations
        """
        m = Vs.shape[0]
        objs = np.zeros(m)
        for i in range(m):
            v = Vs[i, :]
            obj = func_fssd_power_criterion(p, Xs, k, v[np.newaxis, :])
            objs[i] = obj
        return objs
    
    # plot the problem
    plot1d_pq(p, Xs, func_obj=score_function, rescale_obj=False,
              margin=1, n_dom=100, n_bins=12, figw=10, figh=6)
    plt.legend()

In [ ]:
vs = interactive(func_interactive_rbm_problem, B_scale=(0, 2, 0.1), 
                 b_scale=(0, 3, 0.2), c_scale=(0, 3, 0.2), 
                     B_perturb=(0, 1, 0.05),
                 b_perturb=fixed(0), c_perturb=fixed(0)
                )
display(vs)

In the widget above, B_scale, b_scale, and c_scale controls the parameters of the model $p$. $q$ is the same as $p$ unless B_perturb is not 0 i.e., the matrix parameter in $p$ is perturbed.


In [ ]:


In [ ]:


In [ ]:


In [ ]: