In [2]:
from cogrecon.core.data_flexing.time_travel_task.time_travel_task_binary_reader import get_items_solutions
In [ ]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import KernelDensity
from scipy.stats.distributions import norm
from sklearn.grid_search import GridSearchCV
def tuples_to_xy(tuples):
tmp = np.transpose(tuples)
return tmp[0], tmp[1]
def norm_on_range(x, y, x_range=None, y_range=None):
if x_range is None:
x_range = (min(x), max(x))
if y_range is None:
y_range = (min(y), max(y))
x_measured_range = [min(x), max(x)]
y_measured_range = [min(y), max(y)]
x = [(xx - x_measured_range[0]) / (x_measured_range[1] - x_measured_range[0]) for xx in x]
y = [(yy - y_measured_range[0]) / (y_measured_range[1] - y_measured_range[0]) for yy in y]
x = [(xx + x_range[0]) * (x_range[1] - x_range[0]) for xx in x]
y = [(yy + y_range[0]) * (y_range[1] - y_range[0]) for yy in y]
return x, y
def KDEiPosition(points, title=None, show=False, save_to=None, bandwidth=None, show_points=False):
x, y = tuples_to_xy(points)
x, y = norm_on_range(x, y, x_range=(0, 1), y_range=(0, 1))
x = np.array(x)
y = np.array(y)
xy_train = np.vstack([y, x]).T
if bandwidth is None:
grid = GridSearchCV(KernelDensity(kernel='gaussian'), # 'tophat' is other option that allows sampling
{'bandwidth': np.linspace(0.1, 1.0, 5)},
cv=20) # 20-fold cross-validation
grid.fit(xy_train)
print(grid.best_params_)
kde = grid.best_estimator_
else:
grid = GridSearchCV(KernelDensity(kernel='gaussian'), # 'tophat' is other option that allows sampling
{'bandwidth': np.linspace(bandwidth, bandwidth, 1)},
cv=20) # 20-fold cross-validation
grid.fit(xy_train)
print(grid.best_params_)
kde = grid.best_estimator_
# Create grid of sample locations (default: 100x100)
xx, yy = np.mgrid[0:1:100j,
0:1:100j]
xy_sample = np.vstack([yy.ravel(), xx.ravel()]).T
pdf = np.exp(kde.score_samples(xy_sample))
if show or save_to is not None:
with plt.rc_context():
ax = plt.subplot(111)
plt.pcolormesh(xx, yy, np.reshape(pdf, xx.shape))
if show_points:
plt.scatter(x, y, s=0.5, color='white')
plt.xlim(0, 1)
plt.ylim(0, 1)
plt.axis([0., 1., 0., 1.])
#plt.axis('equal')
if title is not None:
plt.title(title)
if save_to is not None:
plt.savefig(save_to, dpi=600)
if show:
plt.show()
return kde
def KDEiPosition3D(points, bandwidth=None):
x, y, z = np.transpose([[xx, yy, zz] for xx, yy, zz in points])
x, _ = norm_on_range(x, [0, 1], x_range=(0, 1), y_range=(0, 1))
y, _ = norm_on_range(y, [0, 1], x_range=(0, 1), y_range=(0, 1))
z, _ = norm_on_range(z, [0, 1], x_range=(0, 2), y_range=(0, 1))
x = np.array(x)
y = np.array(y)
z = np.array(z)
xyz_train = np.vstack([z, y, x]).T
if bandwidth is None:
grid = GridSearchCV(KernelDensity(kernel='gaussian'), # 'tophat' is other option that allows sampling
{'bandwidth': np.linspace(0.1, 1.0, 5)},
cv=20) # 20-fold cross-validation
grid.fit(xyz_train)
print(grid.best_params_)
kde = grid.best_estimator_
else:
grid = GridSearchCV(KernelDensity(kernel='gaussian'), # 'tophat' is other option that allows sampling
{'bandwidth': np.linspace(bandwidth, bandwidth, 1)},
cv=20) # 20-fold cross-validation
grid.fit(xyz_train)
print(grid.best_params_)
kde = grid.best_estimator_
# Create grid of sample locations (default: 100x100)
# xx, yy, zz = np.mgrid[0:1:50j,
# 0:1:50j,
# 0:2:100j]
# xyz_sample = np.vstack([zz.ravel(), yy.ravel(), xx.ravel()]).T
# pdf = np.exp(kde.score_samples(xyz_sample))
return kde
def save_model(model, filename):
from sklearn.externals import joblib
joblib.dump(model, filename)
In [ ]:
_, times, _ = get_items_solutions({'inverse': '0', 'phase': '1'})
print(times)
bin_boundaries = [[x - 1, x + 1] for x in times]
print(bin_boundaries)
In [ ]:
from cogrecon.core.data_flexing.time_travel_task_binary_reader import read_binary_file, find_data_files_in_directory, get_filename_meta_data
import numpy as np
import datetime
def bin_iterations(iterations, bins, bin_width=2):
bin_boundaries = [[x - bin_width/2, x + bin_width/2] for x in bins]
points = [[] for _ in bins]
for i in iterations:
p = (i['x'], i['z'])
t = i['time_val']
for idx, b in enumerate(bin_boundaries):
if b[0] <= t <= b[1]:
points[idx].append(p)
return points, bins
def get_raw_points_from_iterations(iterations, flip=False):
points = []
for i in iterations:
if flip:
p = (i['x'], i['z'], 60. - i['time_val'])
else:
p = (i['x'], i['z'], i['time_val'])
points.append(p)
return points
# TODO: Invert timelines as appropriate
def get_points_from_file_paths(files, bins):
all_points = [[] for _ in bins]
for idx, f in enumerate(files):
print('Parsing file {0}/{1}.'.format(idx, len(files)))
data = read_binary_file(f)
points, bins = bin_iterations(data, bins)
meta = get_filename_meta_data(f)
if meta['inverse'] == '1':
points = np.flip(points, axis=0).tolist()
for idx, ps in enumerate(points):
all_points[idx].extend(ps)
return all_points
def get_raw_points_from_file_paths(files):
all_points = []
for idx, f in enumerate(files):
print('Parsing file {0}/{1}.'.format(idx, len(files)))
meta = get_filename_meta_data(f)
data = read_binary_file(f)
points = get_raw_points_from_iterations(data, flip=(meta['inverse'] == '1'))
all_points.extend(points)
return all_points
def generate_kdes(points, bandwidth=None, start_idx=0, directory=''):
kdes = []
for idx, ps in enumerate(points):
if idx < start_idx:
continue
print('{2} : Finding KDE for bin {0}/{1}.'.format(idx, len(points), str(datetime.datetime.now())))
kde = KDEiPosition(ps, title=str(idx), save_to='{1}/{0}.png'.format(idx, directory), bandwidth=bandwidth)
save_model(kde, '{1}/{0}.pkl'.format(idx, directory))
kdes.append(kde)
return kdes
In [ ]:
trial4study = find_data_files_in_directory(r'C:\Users\Kevin\Desktop\Work\Time Travel Task\v2', file_regex="\d\d\d_4_1_\d_\d\d\d\d-\d\d-\d\d_\d\d-\d\d-\d\d.dat")
In [ ]:
get_items_solutions({'inverse': '0', 'phase': '1'})[1]
In [ ]:
# all_points = get_points_from_file_paths(trial4study, get_items_solutions({'inverse': '0', 'phase': '1'})[1])
all_points = get_raw_points_from_file_paths(trial4study)
In [ ]:
import pickle
pickle.dump(all_points, open('all_points_trial4.p', 'wb'))
print(len(all_points))
In [ ]:
import pickle
all_points = pickle.load(open('all_points_item_bins_width2s_trial2.p', 'rb'))
In [ ]:
print(len(all_points))
for p in all_points:
print(len(p))
In [ ]:
import numpy as np
from sklearn.cluster import MeanShift, estimate_bandwidth
from sklearn.datasets.samples_generator import make_blobs
# The following bandwidth can be automatically detected using
bandwidth = 1.
X = np.array([[x, y] for x, y in all_points[0]])
ms = MeanShift(bandwidth=bandwidth, bin_seeding=True)
ms.fit(X)
labels = ms.labels_
cluster_centers = ms.cluster_centers_
labels_unique = np.unique(labels)
n_clusters_ = len(labels_unique)
print("number of estimated clusters : %d" % n_clusters_)
In [ ]:
import matplotlib.pyplot as plt
from itertools import cycle
plt.figure(1)
plt.clf()
colors = cycle('bgrcmykbgrcmykbgrcmykbgrcmyk')
for k, col in zip(range(n_clusters_), colors):
my_members = labels == k
cluster_center = cluster_centers[k]
#plt.plot(X[my_members, 0], X[my_members, 1], col + '.')
plt.plot(cluster_center[0], cluster_center[1], 'o', markerfacecolor=col,
markeredgecolor='k', markersize=14)
plt.title('Estimated number of clusters: %d' % n_clusters_)
plt.show()
In [ ]:
import datetime
def generate_kdes(points, bandwidth=None, start_idx=0, directory=''):
kdes = []
for idx, ps in enumerate(points):
if idx < start_idx:
continue
print('{2} : Finding KDE for bin {0}/{1}.'.format(idx, len(points), str(datetime.datetime.now())))
kde = KDEiPosition(ps, title=str(idx), save_to='{1}/{0}.png'.format(idx, directory), bandwidth=bandwidth)
save_model(kde, '{1}/{0}.pkl'.format(idx, directory))
kdes.append(kde)
return kdes
In [ ]:
generate_kdes(all_points, bandwidth=0.1, directory='temporal_item_kde')
In [ ]:
kde = KDEiPosition3D(all_points, bandwidth=0.1)
save_model(kde, '3d_kde_trial4.pkl')
In [ ]: