In [ ]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import KernelDensity
from scipy.stats.distributions import norm
from sklearn.grid_search import GridSearchCV
def tuples_to_xy(tuples):
tmp = np.transpose(tuples)
return tmp[0], tmp[1]
def norm_on_range(x, y, x_range=None, y_range=None):
if x_range is None:
x_range = (min(x), max(x))
if y_range is None:
y_range = (min(y), max(y))
x_measured_range = [min(x), max(x)]
y_measured_range = [min(y), max(y)]
x = [(xx - x_measured_range[0]) / (x_measured_range[1] - x_measured_range[0]) for xx in x]
y = [(yy - y_measured_range[0]) / (y_measured_range[1] - y_measured_range[0]) for yy in y]
x = [(xx + x_range[0]) * (x_range[1] - x_range[0]) for xx in x]
y = [(yy + y_range[0]) * (y_range[1] - y_range[0]) for yy in y]
return x, y
def KDEiPosition(points, title=None, show=False, save_to=None, bandwidth=None, show_points=False):
x, y = tuples_to_xy(points)
x, y = norm_on_range(x, y, x_range=(0, 1), y_range=(0, 1))
x = np.array(x)
y = np.array(y)
xy_train = np.vstack([y, x]).T
if bandwidth is None:
grid = GridSearchCV(KernelDensity(kernel='gaussian'), # 'tophat' is other option that allows sampling
{'bandwidth': np.linspace(0.1, 1.0, 5)},
cv=20) # 20-fold cross-validation
grid.fit(xy_train)
print(grid.best_params_)
kde = grid.best_estimator_
else:
grid = GridSearchCV(KernelDensity(kernel='gaussian'), # 'tophat' is other option that allows sampling
{'bandwidth': np.linspace(bandwidth, bandwidth, 1)},
cv=20) # 20-fold cross-validation
grid.fit(xy_train)
print(grid.best_params_)
kde = grid.best_estimator_
# Create grid of sample locations (default: 100x100)
xx, yy = np.mgrid[0:1:100j,
0:1:100j]
xy_sample = np.vstack([yy.ravel(), xx.ravel()]).T
pdf = np.exp(kde.score_samples(xy_sample))
if show or save_to is not None:
with plt.rc_context():
ax = plt.subplot(111)
plt.pcolormesh(xx, yy, np.reshape(pdf, xx.shape))
if show_points:
plt.scatter(x, y, s=0.5, color='white')
plt.xlim(0, 1)
plt.ylim(0, 1)
plt.axis([0., 1., 0., 1.])
#plt.axis('equal')
if title is not None:
plt.title(title)
if save_to is not None:
plt.savefig(save_to, dpi=600)
if show:
plt.show()
return kde
def save_model(model, filename):
from sklearn.externals import joblib
joblib.dump(model, filename)
In [ ]:
from cogrecon.core.data_flexing.time_travel_task_binary_reader import read_binary_file, find_data_files_in_directory, get_filename_meta_data
import numpy as np
import datetime
def bin_iterations(iterations, exclude_repeated_points=False):
bins = np.linspace(0, 60, 61, endpoint=True)
points = [[] for _ in bins]
prev_point = None
for i in iterations:
p = (i['x'], i['z'])
bin_idx = int(np.floor(i['time_val']))
if not exclude_repeated_points:
points[bin_idx].append(p)
else:
if len(points[bin_idx]) == 0:
points[bin_idx].append(p)
elif points[bin_idx][-1] == p:
print("removed point")
else:
points[bin_idx].append(p)
return points, bins
# TODO: Invert timelines as appropriate
def get_points_from_file_paths(files, exclude_repeated_points=False):
bins = np.linspace(0, 60, 61, endpoint=True)
all_points = [[] for _ in bins]
for idx, f in enumerate(files):
print('Parsing file {0}/{1}.'.format(idx, len(files)))
data = read_binary_file(f)
points, bins = bin_iterations(data, exclude_repeated_points=exclude_repeated_points)
meta = get_filename_meta_data(f)
if meta['inverse'] == '1':
points = np.flip(points, axis=0).tolist()
for idx, ps in enumerate(points):
all_points[idx].extend(ps)
return all_points
def generate_kdes(points, bandwidth=None, start_idx=0, directory=''):
kdes = []
for idx, ps in enumerate(points):
if idx < start_idx:
continue
print('{2} : Finding KDE for bin {0}/{1}.'.format(idx, len(points), str(datetime.datetime.now())))
kde = KDEiPosition(ps, title=str(idx), save_to='{1}/{0}.png'.format(idx, directory), bandwidth=bandwidth)
save_model(kde, '{1}/{0}.pkl'.format(idx, directory))
kdes.append(kde)
return kdes
In [ ]:
trial4study = find_data_files_in_directory(r'C:\Users\Kevin\Desktop\Work\Time Travel Task\v2', file_regex="\d\d\d_1_1_\d_\d\d\d\d-\d\d-\d\d_\d\d-\d\d-\d\d.dat")
In [ ]:
all_points = get_points_from_file_paths(trial4study, exclude_repeated_points=False)
In [ ]:
print(len(all_points))
print(len(all_points[3]))
x, y = tuples_to_xy(all_points[1])
x, y = norm_on_range(x, y, x_range=(0., 1.), y_range=(0., 1.))
plt.scatter(x, y)
plt.show()
x, y = tuples_to_xy(all_points[1])
plt.scatter(x, y)
plt.show()
In [ ]:
generate_kdes(all_points, bandwidth=0.1, directory=r'C:\Users\Kevin\Documents\GitHub\msl-iposition-pipeline\tests\all_points_trial_1_study')
In [ ]: