In [ ]:
# import the modules
import os
import GPy
import csv
import sys
import numpy as np
import cPickle as pickle
import scipy.stats as stats
import sklearn.metrics as metrics
from matplotlib import pyplot as plt
%matplotlib notebook
In [ ]:
# load the dataset from pickle file
Data = pickle.load(open('../Data/Data.p','rb'))
In [ ]:
# set the overall parameters for bgplvm
qDim = 15
# dimensions for kinect and mocap
qDims = [10,5]
qDVals = [np.arange(0,qDims[0]), np.arange(qDims[0],qDims[0]+qDims[1])]
# set the number of inducing inputs
nTrials = 6
nInducing = 100
# main loop
samplingFreq = 2
# optimization variables
SNR1 = 100
SNR0 = 1000
trainIters = 1500
initMod0Iters = 500
initMod1Iters = 500
initVardistIters = 1500
# define the observation spaces
kinectExt = 'C'
kinectDim = 7500
kinectKey = 'Cloud'
mocapDim = 8
mocapExt = 'T'
mocapKey = 'TopCoord'
# variables for initialization
keys = [kinectKey,mocapKey]
dims = [kinectDim, mocapDim]
YNames = [kinectKey, mocapKey]
names = ['K1S1P1T1','K1S1P2T1','K1S1P3T1','K1S1P4T1','K1S1P5T1','K1S1P6T1']
In [ ]:
# initialize training, test data
valData = {}
testData = {}
trainData = {}
testInd = 0
trainInd = range(nTrials)
del trainInd[testInd]
valInd = (testInd+1)%nTrials
print valInd, testInd, trainInd
for key,dim in zip(keys,dims):
trD = np.empty((0,dim))
for ind in trainInd:
trD = np.concatenate((trD,Data[names[ind]][key][::samplingFreq,:]),axis=0)
trainData[key] = trD
valData[key] = Data[names[valInd]][key]
testData[key] = Data[names[testInd]][key]
# choosing the training dataset
nSamples = trainData[kinectKey].shape[0]
trainList = [trainData[kinectKey], trainData[mocapKey]]
print nSamples
In [ ]:
# initializing the latent space
scales = []
inputX = np.zeros((nSamples,qDim))
for qD,qDV,Y in zip(qDims, qDVals, trainList):
x,frcs = GPy.util.initialization.initialize_latent('PCA',qD, Y)
scales.extend(frcs)
inputX[:,qDV] = x
scales = np.asarray(scales)
# setting up the kernel
mrdKernels = []
for Y in trainList:
mrdKernels.append(GPy.kern.RBF(qDim, variance=1., lengthscale=1./scales, ARD = True))
# initializing MRD model
mrdModel = GPy.models.MRD(trainList, input_dim=qDim, num_inducing=nInducing, kernel=mrdKernels, X=inputX)
print 'Setup Model!'
# Phase 1: Optimizaition by fixing variance parameters
var0 = mrdModel.Y0.Y.var()
var1 = mrdModel.Y1.Y.var()
mrdModel.Y0.rbf.variance.fix(var0)
mrdModel.Y1.rbf.variance.fix(var1)
mrdModel.Y0.Gaussian_noise.variance.fix(var0/SNR0)
mrdModel.Y1.Gaussian_noise.variance.fix(var1/SNR1)
mrdModel.optimize(messages=True, max_iters=initVardistIters)
# Phase 2: Optimize each model individually
# constrain space 0
mrdModel.Y1.constrain_fixed()
mrdModel.optimize(messages=True, max_iters=initMod0Iters)
# constrain space 1
mrdModel.Y0.constrain_fixed()
mrdModel.Y1.unconstrain_fixed()
mrdModel.Y1.rbf.variance.fix(var1)
mrdModel.Y1.Gaussian_noise.variance.fix(var1/SNR1)
mrdModel.optimize(messages=True, max_iters=initMod1Iters)
# Phase 3: Optimize the model without any constraints
# training without constraints
mrdModel.Y0.unconstrain_fixed()
mrdModel.Y1.unconstrain_fixed()
mrdModel.optimize(messages=True, max_iters=trainIters)
print 'Training Done!'
# save the model
dName = '../Models/Exp4'
if not os.path.exists(dName):
os.makedirs(dName)
pickle.dump(mrdModel,open('../Models/Exp4/model.p','wb'))
In [ ]:
import GPy.plotting.Tango as Tango
def plotScales(scales1, scales2, options, yThresh=0.05):
fSize = 20
fig = plt.figure()
ax = fig.add_subplot(111)
x = np.arange(1,scales1.shape[0]+1)
c1 = Tango.colorsHex['mediumBlue']
c2 = Tango.colorsHex['darkGreen']
h1 = ax.bar(x, height=scales1, width=0.8, align='center', color=c1, edgecolor='k', linewidth=1.3)
h2 = ax.bar(x, height=scales2, width=0.5, align='center', color=c2, edgecolor='k', linewidth=0.7)
ax.plot([0.4, scales1.shape[0]+0.6], [yThresh, yThresh], '--', linewidth=3, color=Tango.colorsHex['mediumRed'])
# setting the bar plot parameters
ax.set_xlim(.4, scales1.shape[0]+.6)
ax.tick_params(axis='both', labelsize=fSize)
ax.set_xticks(xrange(1,scales1.shape[0]+1))
ax.set_title(options['title'], fontsize=fSize)
ax.set_ylabel(options['ylabel'], fontsize=fSize)
ax.set_xlabel('Latent Dimensions', fontsize=fSize)
ax.legend([h1,h2],options['labels'], loc=2, fontsize=fSize)
plt.tight_layout()
return ax
In [ ]:
from matplotlib import cm
import GPy.plotting.Tango as Tango
def plotLatent(inX, title, model=None, which_indices=[0,1], plot_inducing=False, plot_variance=False, max_points=[800,300]):
s = 100
fSize = 20
marker = 'o'
resolution = 50
fig = plt.figure()
ax = fig.add_subplot(111)
Tango.reset()
input1, input2 = which_indices
if inX[0].shape[0] > max_points[0]:
print("Warning".format(inX[0].shape))
subsample = np.random.choice(inX[0].shape[0], size=max_points[0], replace=False)
inX[0] = inX[0][subsample]
if inX[1].shape[0] > max_points[1]:
print("Warning".format(inX[1].shape))
subsample = np.random.choice(inX[1].shape[0], size=max_points[1], replace=False)
inX[1] = inX[1][subsample]
xmin, ymin = inX[0][:, [input1, input2]].min(0)
xmax, ymax = inX[0][:, [input1, input2]].max(0)
x_r, y_r = xmax-xmin, ymax-ymin
xmin -= .1*x_r
xmax += .1*x_r
ymin -= .1*y_r
ymax += .1*y_r
print xmin, xmax, ymin, ymax
if plot_variance:
def plotFunction(x):
Xtest_full = np.zeros((x.shape[0], qDim))
Xtest_full[:, [input1, input2]] = x
_, var = model.predict(np.atleast_2d(Xtest_full))
var = var[:, :1]
return -np.log(var)
qDim = model.X.mean.shape[1]
x, y = np.mgrid[xmin:xmax:1j*resolution, ymin:ymax:1j*resolution]
gridData = np.hstack((x.flatten()[:, None], y.flatten()[:, None]))
gridVariance = (plotFunction(gridData)).reshape((resolution, resolution))
varianceHandle = plt.imshow(gridVariance.T, interpolation='bilinear', origin='lower', cmap=cm.gray,
extent=(xmin, xmax, ymin, ymax))
trainH = ax.scatter(inX[0][:, input1], inX[0][:, input2], marker=marker, s=s, c=Tango.colorsHex['mediumBlue'], linewidth=.2, edgecolor='k', alpha=1.)
testH = ax.scatter(inX[1][:, input1], inX[1][:, input2], marker=marker, s=s, c=Tango.colorsHex['mediumRed'], linewidth=.2, edgecolor='k', alpha=0.9)
ax.grid(b=False)
ax.set_aspect('auto')
ax.tick_params(axis='both', labelsize=20)
ax.legend([trainH,testH],['Train','Test'],loc=2,fontsize=fSize)
ax.set_xlabel('Latent Dimension %i' % (input1+1), fontsize=fSize)
ax.set_ylabel('Latent Dimension %i' % (input2+1), fontsize=fSize)
if plot_inducing:
Z = model.Z
ax.scatter(Z[:, input1], Z[:, input2], c='w', s=25, marker="^", edgecolor='k', linewidth=.3, alpha=.6)
ax.set_xlim((xmin, xmax))
ax.set_ylim((ymin, ymax))
plt.tight_layout()
return ax
In [ ]:
# function to compute reconstruction error
def reconstructionError(model, valData, testData, mKey, kKey, optimizeFlag=False):
nSamplesVal = valData[mKey].shape[0]
nSamplesTest = testData[mKey].shape[0]
nDimIn = valData[kKey].shape[1]
nDimOut = valData[mKey].shape[1]
qDim = model.X.mean.shape[1]
# computing reconstruction error for test1, test2 with variances
predictVal = np.zeros((nSamplesVal,nDimOut))
predictTest = np.zeros((nSamplesTest,nDimOut))
latentVal = np.zeros((nSamplesVal,qDim))
latentTest = np.zeros((nSamplesTest,qDim))
for n in range(nSamplesVal):
yIn = valData[kKey][n,:]
yTrueOut = valData[mKey][n,:]
[xPredict, infX] = model.Y0.infer_newX(yIn[None,:], optimize=False)
yOut = model.predict(xPredict.mean, Yindex=1)
sys.stdout.write('.')
predictVal[n,:] = yOut[0]
latentVal[n,:] = xPredict.mean
sys.stdout.write('\n')
for n in range(nSamplesTest):
yIn = testData[kKey][n,:]
yTrueOut = testData[mKey][n,:]
[xPredict, infX] = model.Y0.infer_newX(yIn[None,:], optimize=optimizeFlag)
yOut = model.predict(xPredict.mean, Yindex=1)
sys.stdout.write('.')
predictTest[n,:] = yOut[0]
latentTest[n,:] = xPredict.mean
sys.stdout.write('\n')
results = {}
valResults = {}
testResults = {}
valResults['pred'] = predictVal
testResults['pred'] = predictTest
valResults['latent'] = latentVal
testResults['latent'] = latentTest
valErrors = np.sqrt(metrics.mean_squared_error(valData[mKey],predictVal,multioutput='raw_values'))
testErrors = np.sqrt(metrics.mean_squared_error(testData[mKey],predictTest,multioutput='raw_values'))
valNormErrors = np.divide(np.sqrt(metrics.mean_squared_error(valData[mKey],predictVal,multioutput='raw_values')),
valData[mKey].max(axis=0) - valData[mKey].min(axis=0))
testNormErrors = np.divide(np.sqrt(metrics.mean_squared_error(testData[mKey],predictTest,multioutput='raw_values')),
testData[mKey].max(axis=0) - testData[mKey].min(axis=0))
valCorr = np.zeros((1,nDimOut))
testCorr = np.zeros((1,nDimOut))
for d in range(dims[1]):
valCorr[0,d],_ = stats.pearsonr(valData[mKey][:,d],predictVal[:,d])
testCorr[0,d],_ = stats.pearsonr(testData[mKey][:,d],predictTest[:,d])
valResults['rmse'] = valErrors
testResults['rmse'] = testErrors
valResults['nrmse'] = valNormErrors
testResults['nrmse'] = testNormErrors
valResults['corr'] = valCorr
testResults['corr'] = testCorr
results['train'] = valResults
results['test'] = testResults
return results
In [ ]:
# load main data
Data = pickle.load(open('../Data/FeatureData.p','rb'))
# create train, val and test data
valNames = ['K1S1P2T1']
testNames = ['K1S1P1T1']
dims = [7500,8,2]
samplingFreq = 2
keys = ['Cloud','TopCoord','Time']
valData = {}
testData = {}
for key,dim in zip(keys,dims):
vaD = np.empty((0,dim))
teD = np.empty((0,dim))
for fileName in valNames:
vaD = np.concatenate((vaD,Data[fileName][key]),axis=0)
for fileName in testNames:
teD = np.concatenate((teD,Data[fileName][key]),axis=0)
valData[key] = vaD
testData[key] = teD
In [ ]:
# perform test inference for models
mrdModel = pickle.load(open('../Models/Exp4/model.p', 'rb'))
mrdResults = reconstructionError(mrdModel,valData,testData,'TopCoord','Cloud',optimizeFlag=True)
pickle.dump(mrdResults, open('Result/Results.p', 'wb'))
In [ ]:
mrdModel = pickle.load(open('../Models/Exp4/model.p','rb'))
mrdResults = pickle.load(open('Result/Results.p','rb'))
In [ ]:
# plot the latent scales and latent space
scales1 = mrdModel.Y0.kern.input_sensitivity(summarize=False)
scales2 = mrdModel.Y1.kern.input_sensitivity(summarize=False)
scales1 = scales1/scales1.max()
scales2 = scales2/scales2.max()
options = {'title':'','ylabel':'ARD Weight','labels':['PointCloud','TopCoord']}
plotScales(scales1,scales2,options)
plt.savefig('Result/LatentScales.pdf', format='pdf')
testX = mrdResults['test']['latent']
trainX = mrdModel.X.mean
mrdX = [trainX, testX]
plotLatent(mrdX, 'Latent Space', model=mrdModel.Y0, which_indices=[11,12], plot_variance=True, max_points=[2000,700])
plt.savefig('Result/LatentSpace.pdf', format='pdf')
In [ ]:
# visualize predictive performance of MRD for test trajectory
fSize = 20
lWidth = 4
mKey = 'TopCoord'
time = testData['Time'][:,1]
nSamplesTest = testData[mKey].shape[0]
predictTest = mrdResults['test']['pred']
fig = plt.figure()
plt.plot(time,testData[mKey][:,1],c='k',linewidth=lWidth)
plt.plot(time,predictTest[:,1],c='b',linewidth=lWidth)
plt.title('Collar-Head', fontsize=fSize)
plt.xlabel('Time (secs)', fontsize=fSize)
plt.tick_params(axis='both', labelsize=fSize)
plt.ylabel('Center of Twist', fontsize=fSize)
plt.legend(['True','Predict'], fontsize=fSize-5, loc=4)
plt.tight_layout()
plt.savefig('Result/Inference1.pdf', format='pdf')
fig = plt.figure()
plt.plot(time,testData[mKey][:,2],c='k',linewidth=lWidth)
plt.plot(time,predictTest[:,2],c='b',linewidth=lWidth)
plt.ylabel('Writhe', fontsize=fSize)
plt.title('Collar-Body', fontsize=fSize)
plt.xlabel('Time (secs)', fontsize=fSize)
plt.tick_params(axis='both', labelsize=fSize)
#plt.legend(['True','Predict'], fontsize=fSize)
plt.tight_layout()
plt.savefig('Result/Inference2.pdf', format='pdf')
fig = plt.figure()
plt.plot(time,testData[mKey][:,5],c='k',linewidth=lWidth)
plt.plot(time,predictTest[:,5],c='b',linewidth=lWidth)
plt.xlabel('Time (secs)', fontsize=fSize)
plt.title('Left Sleeve-Arm', fontsize=fSize)
plt.tick_params(axis='both', labelsize=fSize)
plt.ylabel('Center of Twist', fontsize=fSize)
#plt.legend(['True','Predict'], fontsize=fSize, loc=5)
plt.tight_layout()
plt.savefig('Result/Inference3.pdf', format='pdf')
fig = plt.figure()
plt.plot(time,testData[mKey][:,7],c='k',linewidth=lWidth)
plt.plot(time,predictTest[:,7],c='b',linewidth=lWidth)
plt.xlabel('Time (secs)', fontsize=fSize)
plt.title('Right Arm-Sleeve', fontsize=fSize)
plt.tick_params(axis='both', labelsize=fSize)
plt.ylabel('Center of Twist', fontsize=fSize)
#plt.legend(['True','Predict'], fontsize=fSize, loc=5)
plt.tight_layout()
plt.savefig('Result/Inference4.pdf', format='pdf')