Imports!
In [ ]:
%matplotlib inline
import h5py
from keras.models import Sequential, Graph, model_from_json
from keras.layers.core import Flatten, Dense
from keras.layers.convolutional import Convolution2D
from keras.optimizers import SGD
from keras.utils.visualize_util import to_graph
from IPython.display import SVG
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import hsv_to_rgb
from scipy.ndimage.interpolation import rotate
from scipy.io import loadmat
from scipy.stats import entropy
from collections import namedtuple
import copy
import cPickle
from pprint import pprint
from os import path
import train
import evaluate
from train import infer_sizes
import models
from utils import get_centroids, label_to_coords
Now load up our data H5 and grab some trained weights for our model.
In [ ]:
# Load data and get a model
cache_dir = '../cache/mpii-cooking/'
train_h5_path = path.join(cache_dir, 'train-patches/samples-000001.h5')
train_neg_h5_path = path.join(cache_dir, 'train-patches/negatives.h5')
val_h5_path = path.join(cache_dir, 'val-patches/samples-000001.h5')
val_neg_h5_path = path.join(cache_dir, 'val-patches/negatives.h5')
train_h5 = h5py.File(train_h5_path, 'r')
train_neg_h5 = h5py.File(train_neg_h5_path, 'r')
val_h5 = h5py.File(val_h5_path, 'r')
val_neg_h5 = h5py.File(val_neg_h5_path, 'r')
In [ ]:
train_images, train_flow = train_h5['images'], train_h5['flow']
train_neg_images, train_neg_flow = train_neg_h5['images'], train_neg_h5['flow']
val_images, val_flow = val_h5['images'], val_h5['flow']
val_neg_images, val_neg_flow = val_neg_h5['images'], val_neg_h5['flow']
ds_shape = infer_sizes(train_h5_path)
In [ ]:
#sgd = SGD(lr=0.0001, nesterov=True, momentum=0.9)
#poselet_model = models.vggnet16_poselet_class_flow(ds_shape, sgd, 'glorot_normal')
#poselet_model.load_weights('../cache/kcnn-flow-rgb-tripose-from-3840-plus-1024/model-iter-10240-r181250.h5')
Now write some functions to look at our data and also a few utilities for doing forward prop. These will be useful for inspecting activations and gradients, as well as verifying that I've written what I wanted to write to the file.
Note that some of these images will look weird because they've been padded (where necessary) with their edge pixel values. This is true of the flow as well.
In [ ]:
centroids = loadmat(path.join(cache_dir, 'centroids.mat'))['centroids'][0].tolist()
In [ ]:
def _reshape_im(im):
# images are stored channels-first, but numpy expects
# channels-last
return np.transpose(im, axes=(1, 2, 0))
def _vis_flow(flow):
# clean visualisation of flow with angle of movement as
# hue, magnitude as saturation and a constant V of 1
x, y = flow
# normed-log makes things stand out quite a bit
mags = np.log(np.sqrt(x**2 + y**2) + 1)
norm_mags = mags / max(mags.flatten())
angles = (np.arctan2(x, y) + np.pi) / (2 * np.pi)
ones = np.ones_like(angles)
hsv = np.stack((angles, norm_mags, ones), axis=2)
return hsv_to_rgb(hsv)
def _plot_coords(coords):
# plot a label corresponding to a flattened joint vector
for idx, coord in enumerate(coords):
plt.plot(coord[0], coord[1], marker='+')
plt.text(coord[0], coord[1], str(idx))
def show_datum(image, flow, label=None):
# First frame
im1 = _reshape_im(image[:3])
plt.subplot(131)
plt.imshow(im1)
plt.axis('off')
plt.text(-10, -10, 'frame1')
if label is not None:
if label.ndim == 1:
coords = label_to_coords(label)
else:
coords = label
first_coords = coords[:len(coords)//2]
_plot_coords(first_coords)
# Second frame
im2 = _reshape_im(image[3:6])
plt.subplot(132)
plt.imshow(im2)
plt.axis('off')
plt.text(-10, -10, 'frame2')
if label is not None:
second_coords = coords[len(coords)//2:]
_plot_coords(second_coords)
# Optical flow
if flow is not None:
im_flow = _vis_flow(flow)
plt.subplot(133)
plt.imshow(im_flow)
plt.axis('off')
plt.text(-10, -10, 'flow')
plt.show()
def get_joints(fp, index, ds_order=('left', 'right', 'head')):
class_num = np.argmax(fp['class'][index])
ds_name = ds_order[(class_num-1)%3]
return fp[ds_name][index]
for i in np.random.permutation(len(train_images))[:3]:
# Just visualise the input data so that I know I'm writing it out correctly
print 'Training ground truth (NOT prediction)', i
j = get_joints(train_h5, i)
show_datum(train_images[i], train_flow[i], j)
for ds in ('left', 'right', 'head'):
jx = train_h5[ds][i]
print('{}: {}'.format(ds, jx))
print('Class: {}'.format(train_h5['class'][i]))
for i in np.random.permutation(len(train_neg_images))[:3]:
# Just visualise the input data so that I know I'm writing it out correctly
print 'Training negative', i
show_datum(train_neg_images[i], train_neg_flow[i])
for ds in ('left', 'right', 'head'):
jx = train_neg_h5[ds][i]
print(jx.shape)
print('Class: {}'.format(train_neg_h5['class'][i]))
train_centroid_classes = train_h5['poselet']
train_centroids = get_centroids(train_centroid_classes[:], centroids)
for i in np.random.permutation(len(train_images))[:3]:
print 'Training ground truth poselet (NOT prediction)', i
cls, pslt, coords = train_centroids[i]
show_datum(train_images[i], train_flow[i], coords)
true_cls = np.argmax(train_h5['class'][i])
assert true_cls == cls, '%i (true) vs. %i (from gc)' % (true_cls, cls)
print('Class: {}, poselet: {}\n\n\n'.format(true_cls, pslt))
for i in np.random.permutation(len(val_neg_images))[:3]:
print 'Validation negative', i
show_datum(val_neg_images[i], val_neg_flow[i])
for ds in ('left', 'right', 'head'):
jx = val_neg_h5[ds][i]
print(jx.shape)
print('Class: {}'.format(val_neg_h5['class'][i]))
val_centroid_classes = val_h5['poselet']
val_centroids = get_centroids(val_centroid_classes[:], centroids)
for i in np.random.permutation(len(val_images))[:3]:
print 'Validation ground truth (NOT prediction)', i
cls, pslt, coords = val_centroids[i]
show_datum(val_images[i], val_flow[i], coords)
true_cls = np.argmax(val_h5['class'][i])
assert true_cls == cls, '%i (true) vs. %i (from gc)' % (true_cls, cls)
print('Class: {}, poselet: {}\n\n\n'.format(true_cls, pslt))
Now we can try evaluating the CNN on some of our training and evaluation data, just to see whether it's learning anything useful.
In [ ]:
def evaluate_on_datum(data, model):
batch_size = 64
mps = train.read_mean_pixels('../cache/mean_pixel.mat')
rv = None
# We're manually handling batches because this way we can deal with
# mean pixel subtraction as we go. This is important for HDF5 files
# which we can't fit into memory all at once (and hence need to perform
# iterative mean subtraction on).
dataset_len = len(data[data.keys()[0]])
for start_idx in xrange(0, dataset_len, batch_size):
print('Evaluating on batch {}'.format(start_idx / batch_size + 1))
this_batch = {}
for k in data.keys():
this_batch[k] = data[k][start_idx:start_idx+batch_size]
batch_data = train.sub_mean_pixels(mps, this_batch)
preds = model.predict(batch_data, batch_size=batch_size)
if rv is None:
rv = preds
else:
assert set(rv.keys()) == set(preds.keys())
for k in rv:
rv[k] = np.concatenate((rv[k], preds[k]), axis=0)
return rv
In [ ]:
def evaluate_random_samples(images, flow, true_classes, num_samples, title='Sample'):
for i in np.random.permutation(len(images))[:num_samples]:
print('\n\n\n{} {}'.format(title, i))
# Evaluate
preds = evaluate_on_datum({
'images': images[i:i+1], 'flow': flow[i:i+1]
}, poselet_model)
# Get class info
class_names = ('background', 'left', 'right', 'head')
tc_idx = np.argmax(true_classes[i])
out_probs = preds['class'][0]
pc_idx = np.argmax(preds['class'][0])
pc_prob = out_probs[pc_idx] * 100
print('Class confidences: {}'.format(preds['class'][0]))
print('True class: {}; Predicted class: {} ({}%)'.format(
class_names[tc_idx],
class_names[pc_idx], pc_prob
))
print(u'\u2713 Correct class' if pc_idx == tc_idx
else u'\u2717 Incorrect class')
# Visualise
if tc_idx > 0:
label = preds[class_names[tc_idx]]
else:
label = None
show_datum(images[i], flow[i], label=label)
# Get error
# pos_mask = true_classes[i].astype('bool')
# cross_entropy = -np.log(out_probs[pos_mask]).sum() - np.log(out_probs[~pos_mask]).sum()
# tc_name = class_names[tc_idx]
# l1_dist = preds[class_names[tc_idx]]
# print('# Validation images')
# evaluate_random_samples(val_images, val_flow, val_h5['class'], 100, title='Validation datum')
# print('\n\n\n# Training images')
# evaluate_random_samples(train_images, train_flow, train_h5['class'], 100, title='Training datum')
# These are much less interesting because the classifier is good at picking out background patches.t
# print('\n\n\n# Validation negatives')
# evaluate_random_samples(val_neg_images, val_neg_flow, val_neg_h5['class'], 20, title='Validation negative')
In [ ]:
def dist_valid(dist):
return (dist >=0).all() and (dist <= 1).all() and abs(dist.sum() - 1) < 1e-5
def evaluate_random_poselet_scrapes(images, flow, true_pslts, centroids, num_samples, title='Sample'):
for i in np.random.permutation(len(images))[:num_samples]:
print('\n\n\n{} {}'.format(title, i))
preds = evaluate_on_datum({
'images': images[i:i+1], 'flow': flow[i:i+1]
}, poselet_model)
upgraded_preds = evaluate_on_datum({
'images': images[i:i+1], 'flow': flow[i:i+1]
}, upgraded_poselet_model)
# Did my FC net screw things up (hint: yes)
normal_pslt = preds['poselet'][0, :]
assert dist_valid(normal_pslt)
fc_pslt_raw = upgraded_preds['poselet']
assert fc_pslt_raw.ndim == 4 and ((1, 1, 1) == np.array(fc_pslt_raw.shape)[[0, 2, 3]]).all()
fc_pslt = fc_pslt_raw[0, :, 0, 0]
assert dist_valid(fc_pslt)
kl_div = entropy(normal_pslt, fc_pslt)
print('D_KL(normal || fc) = {}'.format(kl_div))
preds = {'poselet': upgraded_preds['poselet'][:, :, 0, 0,]} # XXX: Just seeing what happens :)
# Get class info
class_names = ('background', 'head', 'left', 'right')
pred_cls, pred_pslt, pred_coords = get_centroids(preds['poselet'][0:1], centroids)[0]
print('Max confidence: {}'.format(preds['poselet'][0].max()))
if true_pslts is not None:
true_cls, true_pslt, true_coords = get_centroids(true_pslts[i:i+1], centroids)[0]
print(u'{} True class: {}; Predicted class: {}'.format(
u'\u2713' if true_cls == pred_cls else u'\u2717',
class_names[true_cls],
class_names[pred_cls],
))
print(u'{} True poselet: {}; Predicted poselet: {}'.format(
u'\u2713' if true_cls == pred_cls and true_pslt == pred_pslt else u'\u2717',
true_pslt,
pred_pslt,
))
else:
print('Predicted class: {}; predicted poselet: {}'.format(
class_names[pred_cls],
pred_pslt
))
# Visualise
if pred_cls > 0:
label = centroids[pred_cls-1][pred_pslt]
else:
label = None
show_datum(images[i], flow[i], label=label)
In [ ]:
evaluate_random_poselet_scrapes(train_images, train_flow, train_h5['poselet'], centroids, 3, 'Train sample')
evaluate_random_poselet_scrapes(val_images, val_flow, val_h5['poselet'], centroids, 3, 'Validation sample')
evaluate_random_poselet_scrapes(val_neg_images, val_neg_flow, None, centroids, 3, 'Negative sample')
In [ ]:
Predictions = namedtuple('Predictions', ['type', 'results', 'classes', 'coords'])
def get_all_evaluations_poselet(images, flow, centroids):
print('Beginning poselet evaluation')
all_evaluations = evaluate_on_datum({'images': images, 'flow': flow}, poselet_model)
cls_pslt_coord_tuples = get_centroids(all_evaluations['poselet'], centroids)
poselet_coords = [t[2] for t in cls_pslt_coord_tuples]
classes = np.array([t[0] for t in cls_pslt_coord_tuples])
return Predictions(
type='poselet', results=all_evaluations, classes=classes, coords=poselet_coords
)
def get_all_evaluations_regressor(images, flow):
print('Beginning regressor evaluation')
all_evaluations = evaluate_on_datum({'images': images, 'flow': flow}, model)
classes = np.argmax(all_evaluations['class'], axis=1)
true_coords = []
for idx, cls in enumerate(classes):
if cls == 0:
coord = None
else:
ds_name = ('left', 'right', 'head')[cls-1]
coord = label_to_coords(all_evaluations[ds_name][idx])
true_coords.append(coord)
return Predictions(
type='regressor', results=all_evaluations, classes=classes, coords=true_coords
)
In [ ]:
poselet_evals = get_all_evaluations_poselet(val_images, val_flow, centroids)
In [ ]:
with open('../cache/poselet_model_preds-30k-instead-of-14k.pickle', 'wb') as fp:
cPickle.dump(poselet_evals, fp)
In [ ]:
regressor_evals = get_all_evaluations_regressor(val_images, val_flow)
In [ ]:
with open('../cache/regressor_model_preds.pickle', 'wb') as fp:
cPickle.dump(regressor_evals, fp)
A brief explanation of each of the measures used below:
In [ ]:
# Complete list of indices giving the endpoints of limbs,
# arranged in a dictionary according to which sub-pose the
# limbs belong to. Used for PCP calculations.
limbs = {
'left': {
'indices': [
# First frame
(0, 1),
(1, 2),
(2, 3),
# Second frame
(4, 5),
(5, 6),
(6, 7)
],
'names': [
'luarm1',
'lfarm1',
'lhand1',
'luarm2',
'lfarm2',
'lhand2',
],
'partnames': [
'lhand1',
'lwrist1',
'lelb1',
'lshol1',
'lhand2',
'lwrist2',
'lelb2',
'lshol2'
]
},
'right': {
'indices': [
# First frame
(0, 1),
(1, 2),
(2, 3),
# Second frame
(4, 5),
(5, 6),
(6, 7)
],
'names': [
'ruarm1',
'rfarm1',
'rhand1',
'ruarm2',
'rfarm2',
'rhand2',
],
'partnames': [
'rhand1',
'rwrist1',
'relb1',
'rshol1',
'rhand2',
'rwrist2',
'relb2',
'rshol2'
]
},
'head': {
'indices': [
# First frame
(0, 3),
(1, 3),
(2, 3),
# Second frame
(4, 7),
(5, 7),
(6, 7),
],
'names': [
'rshol1',
'lshol1',
'head1',
'rshol2',
'lshol2',
'head2',
],
'partnames': [
'rshol1',
'lshol1',
'head1',
'chin1',
'rshol2',
'lshol2',
'head2',
'chin2',
]
}
}
_lr_12 = [(s, str(f)) for s in ('l', 'r') for f in 1, 2]
_avg_names = lambda n: (n, {s + n + f for s, f in _lr_12})
# Equivalent limbs for the purposes of PCP calculation (can average PCP)
pcps_to_average = [
_avg_names('hand'),
_avg_names('uarm'),
_avg_names('farm'),
_avg_names('shol'),
('head', {'head1', 'head2'})
]
# Equivalent parts for the purposes of accuracy calculations (can combine accuracies)
equiv_parts = [
_avg_names('shol'),
_avg_names('elb'),
_avg_names('wrist'),
_avg_names('hand'),
('head', {'head1', 'head2'}),
('chin', {'chin1', 'chin2'})
]
# Now we can invert equiv_parts to map part names to combined part names
aggregate_part_table = {}
for agg_name, partname_set in equiv_parts:
for partname in partname_set:
aggregate_part_table[partname] = agg_name
In [ ]:
def plot_acc(thresholds, accs_dict, plot_title):
for label, acc in accs_dict.iteritems():
plt.plot(thresholds, acc, label=label)
plt.ylim((0, 1))
plt.xlim((min(thresholds), max(thresholds)))
plt.ylabel('accuracy')
plt.xlabel('threshold (px)')
plt.title(plot_title)
plt.legend(loc='lower right')
plt.grid()
plt.show()
# test_in_vals = np.linspace(0, 50, 100)
# plot_acc(test_in_vals, -1 / (test_in_vals + 1) + 1, 'Example plot', 'foo')
def get_classification_acc(true_class_nums, pred_class_nums):
assert true_class_nums.shape == pred_class_nums.shape
assert true_class_nums.ndim == 1
return (pred_class_nums == true_class_nums).sum() / float(len(true_class_nums))
def get_class_split(class_nums, num_classes):
rv = np.zeros((num_classes,))
total = float(len(class_nums))
for i in xrange(num_classes):
rv[i] = (class_nums == i).sum() / total
return rv
def get_reg_mae(ground_truth, predictions, class_names=(None, 'left', 'right', 'head')):
num_classes = len(class_names)
rv = np.zeros((num_classes,))
gt_class_nums = np.argmax(ground_truth['class'][:], axis=1)
pred_class_nums = predictions.classes
for i in xrange(num_classes):
if i == 0:
# This is the background class, so there are no regressor outputs
rv[i] = 0
continue
class_mask = (gt_class_nums == i) & (gt_class_nums == pred_class_nums)
cls_name = class_names[i]
assert cls_name is not None
true_locs = label_to_coords(ground_truth[cls_name][class_mask, :])
pred_locs_lists = np.array(predictions.coords)[class_mask]
pred_locs = np.array(pred_locs_lists.tolist(), dtype='float')
num_samples = float(class_mask.sum())
rv[i] = np.abs(true_locs - pred_locs).sum() / num_samples
return rv
def get_all_pcps(ground_truth, predictions, limbs=limbs):
# use evaluation.score_predictions_pcp(gt_joints, predictions, limbs)
all_pcps = {}
subpose_indices = {
'left': 1,
'right': 2,
'head': 3
}
for subpose_name in limbs:
names = limbs[subpose_name]['names']
assert set(names).isdisjoint(set(all_pcps.keys())), \
"Duplicate names detected"
indices = limbs[subpose_name]['indices']
# Next calculate accuracy and a mask to select only predictions which
# are correct. We will feed the PCP calculator only correctly
# classified poses, but we will then multiply the returned PCP
# values by the accuracy to account for the incorrect poses.
gt_classes = np.argmax(ground_truth['class'][:], axis=1)
pred_classes = predictions.classes
class_num = subpose_indices[subpose_name]
pos_samples = float((gt_classes == class_num).sum())
correct_mask = (gt_classes == class_num) & (pred_classes == class_num)
accuracy = correct_mask.sum() / pos_samples
gt_joints = label_to_coords(ground_truth[subpose_name][correct_mask, ...])
masked_pred_joints = np.array(predictions.coords)[correct_mask, ...]
pred_joints = np.array(masked_pred_joints.tolist())
assert gt_joints.shape == pred_joints.shape
subpose_pcps = evaluate.score_predictions_pcp(
gt_joints, pred_joints, indices
)
assert len(subpose_pcps) == len(names)
named_subpose_pcps = dict(zip(names, (accuracy * p for p in subpose_pcps)))
all_pcps.update(named_subpose_pcps)
return all_pcps
def show_pcp(pcp_dict):
sorted_items = sorted(pcp_dict.items())
print('name' + ''.join('{:>10}'.format(l) for l, _ in sorted_items))
print('pcp ' + ''.join('{:>10.4f}'.format(v) for _, v in sorted_items))
def per_class_show(values, classes=('bkgnd', 'left', 'right', 'head')):
return ', '.join('{:>5}: {:>7.3f}'.format(c, v) for c, v in zip(classes, values))
return rv
def get_all_accs(ground_truth, predictions, thresholds):
all_accs = {}
# TODO: subpose_indices should be factored out, since it's also used in get_all_pcps
subpose_indices = {
'left': 1,
'right': 2,
'head': 3
}
for subpose_name in limbs:
part_names = limbs[subpose_name]['partnames']
indices = limbs[subpose_name]['indices']
# We need a class mask just like we used for the PCP detector
gt_classes = np.argmax(ground_truth['class'][:], axis=1)
pred_classes = predictions.classes
class_num = subpose_indices[subpose_name]
pos_samples = float((gt_classes == class_num).sum())
correct_mask = (gt_classes == class_num) & (pred_classes == class_num)
accuracy = correct_mask.sum() / pos_samples
gt_joints = label_to_coords(ground_truth[subpose_name][correct_mask, ...])
masked_pred_joints = np.array(predictions.coords)[correct_mask, ...]
pred_joints = np.array(masked_pred_joints.tolist())
subpose_accs = np.vstack(evaluate.score_predictions_acc(
gt_joints, pred_joints, thresholds
)).T
# Make sure that we collect all relevant accuracies for each part, then
# average over them later.
assert len(part_names) == len(subpose_accs)
for part_name, accs in zip(part_names, subpose_accs):
agg_pn = aggregate_part_table.get(part_name, part_name)
true_acc = accuracy * accs
all_accs.setdefault(agg_pn, []).append(true_acc)
combined_accs = {}
for part_name, accs_list in all_accs.iteritems():
combined_accs[part_name] = np.mean(accs_list, axis=0)
return combined_accs
def average_pcps(pcp_dict, to_average):
removed_keys = set().union(*(s for n, s in to_average))
rv = {
k: v for k, v in pcp_dict.iteritems() if k not in removed_keys
}
for combined_name, components in to_average:
rv[combined_name] = np.mean([pcp_dict[k] for k in components])
return rv
def print_evaluation_summary(ground_truth, predictions):
assert {'class', 'left', 'right', 'head'}.issubset(set(ground_truth.keys()))
# Classification accuracy
class_nums = np.argmax(ground_truth['class'][:], axis=1)
class_acc = get_classification_acc(class_nums, predictions.classes)
# Comparison of class split
pred_class_split = get_class_split(predictions.classes, 4)
true_class_split = get_class_split(class_nums, 4)
# Regressor MAE
reg_mae = get_reg_mae(ground_truth, predictions)
# PCP
pcp_dict = get_all_pcps(ground_truth, predictions)
# Accuracy (variable pixel threshold)
thresholds = np.linspace(0, 80, 80)
accs_dict = get_all_accs(ground_truth, predictions, thresholds)
# Display everything
print('Evaluation summary for {} model'.format(predictions.type))
print(
'Classifier accuracy: {}\n'
'Class split in predictions: {}\n'
'Class split in training ground truths: {}\n'
'MAE for correct classifications: {}\n'
'PCPs (class-sensitive):'.format(
class_acc, per_class_show(pred_class_split),
per_class_show(true_class_split),
per_class_show(reg_mae)
))
show_pcp(average_pcps(pcp_dict, pcps_to_average))
plot_acc(thresholds, accs_dict, 'Accuracies ({})'.format(predictions.type))
In [ ]:
val_gt = {
'left': val_h5['left'],
'right': val_h5['right'],
'head': val_h5['head'],
'class': val_h5['class']
}
print_evaluation_summary(val_gt, poselet_evals)
print_evaluation_summary(val_gt, regressor_evals)
In [ ]:
def poselet_weighted_average(poselet_model_results, centroids=centroids, num_subposes=3):
"""Instead of picking the highest-scoring poselet
and returning the centroid of that, we take a
weighted average of poselets that have the same
class. Hopefully this produces better results."""
poselet_probs = poselet_model_results['poselet']
ppc = (poselet_probs.shape[1] - 1) / num_subposes
num_classes = num_subposes + 1
class_probs = np.zeros((len(poselet_probs), num_classes))
class_probs[:, 0] = poselet_probs[:, 0]
all_weighted_means = []
for subpose_num in xrange(num_subposes):
start_idx = subpose_num * ppc + 1
end_idx = start_idx + ppc
# Start by figuring out the probability that this subpose is the correct one
subpose_probs = np.sum(
poselet_probs[:, start_idx:end_idx], axis=1
)
class_probs[:, subpose_num+1] = subpose_probs
# Now get poselet probs and find joint locations by taking expectation of
# centroids.
sp_centroids = centroids[subpose_num]
# XXX: What if subpose_probs is 0 somewhere? Should probably just make the
# numerator zero, since if subpose_probs[i] is 0 then the given subpose
# is almost certainly not in sample i.
norm_probs = poselet_probs[:, start_idx:end_idx] / subpose_probs[:, np.newaxis]
norm_probs = np.nan_to_num(norm_probs)
# Make sure all of our probabilities are normalised
assert (np.abs(norm_probs.sum(axis=1) - 1) < 0.01).all()
# Let N be the number of samples and P be the number of poselets.
# We now have an N*P array of probabilities and a P*J array of centroids.
# What we want is an N*J array of means. Broadcasting to the rescue!
centroids_bc = sp_centroids[np.newaxis, :, :]
pprobs_bc = norm_probs[:, :, np.newaxis]
# 'combined' should be N*P*J
combined = centroids_bc * pprobs_bc
assert combined.shape == norm_probs.shape + sp_centroids.shape[1:]
true_means = np.sum(combined, axis=1)
assert true_means.shape == (len(poselet_probs), sp_centroids.shape[1])
all_weighted_means.append(true_means)
# Select only the weighted means corresponding to the best class
class_nums = np.argmax(class_probs, axis=1)
np_awm = np.array(all_weighted_means)
num_samples = len(poselet_probs)
best_coords = np.ndarray((num_samples,), dtype='object')
best_coords[class_nums == 0] = None
for subpose_num in xrange(num_subposes):
mask = class_nums == subpose_num + 1
# We prepend None and drop it again so that numpy gives us a
# 1D object array, each entry of which is a JD float array giving
# joint coordinates. If we don't do this then Numpy gives us a
# 2D array which doesn't play nice with our 1D return value
# array.
label_slice = np_awm[subpose_num, mask, ...]
coord_slice = label_to_coords(label_slice)
best_coords[mask] = np.array([None] + list(coord_slice))[1:]
return Predictions(
type='weighted poselet', results=poselet_model_results,
classes=class_nums, coords=list(best_coords)
)
weighted_average_preds = poselet_weighted_average(poselet_evals.results)
In [ ]:
# Yep, classification error goes down too because now I'm marginalising
# over poselets properly to find the right subpose class (thought I was
# doing that before, but it turned out that I wasn't)
print_evaluation_summary(val_gt, weighted_average_preds)