This notebook contains the analyses necessary to generate the results for the Time Travel Task. The time travel task outputs a binary format log file for space efficiency which first needs to be converted to the appropriate intermediate representations (the iPosition format for test results and an intermediate navigation format for study/test navigation results).
In order to run most cells in this notebook, these data directories need to be defined (and filled with the appropriate data - see the Data Conversion section below to generate the data_output_directory contents).
In [1]:
data_directory = r'C:\Users\Kevin\Documents\GitHub\msl-iposition-pipeline\examples\saved_data\Paper Data (cleaned)'
data_output_directory = r'C:\Users\Kevin\Documents\GitHub\msl-iposition-pipeline\examples\saved_data\Paper Data (iPosition Format)'
generate_intermediate_files = False # If True, the cells which generate intermediate files will run (they are very slow)
generate_embedded_animations = False # If True, the embedded animations will be generated such that they are saved in the file
In [2]:
import cogrecon.core.data_flexing.time_travel_task.time_travel_task_to_iposition as ttt2i
In [3]:
if 'generate_intermediate_files' in vars() and generate_intermediate_files:
ttt2i.time_travel_task_to_iposition(data_directory, data_output_directory)
For test, there are several primary analyses to be run. First, a basic analysis for the purpose of determining the test-time performance in space or in time are performed. This can be done quite simply with the basic batch processing functions.
Note that these files contain all the output metrics necessary to perform the statistics on Space/Time Misplacement, Number of Incorrect Event Types, Miassignments in Space vs. Time, and Context Boundary Effects.
In [4]:
from cogrecon.core.batch_pipeline import batch_pipeline
from cogrecon.core.data_flexing.time_travel_task.time_travel_task_analytics import summarize_test_data
In [5]:
if 'generate_intermediate_files' in vars() and generate_intermediate_files:
summarize_test_data(search_directory=data_directory) # For Time Travel Task specific analyses
This subsection contains the analyses which generate basic statistics of interest for the following values:
In [6]:
# For Time vs. Space Reconstruction Analyses
if 'generate_intermediate_files' in vars() and generate_intermediate_files:
batch_pipeline(str(data_output_directory), 'TimeTravelTask_SpaceOnly.csv', data_shape=(4, 10, 3),
collapse_trials=False, trial_by_trial_accuracy=False,
dimension=3, removal_dim_indicies=[2], actual_coordinate_prefixes=True)
batch_pipeline(str(data_output_directory), 'TimeTravelTask_TimeOnly.csv', data_shape=(4, 10, 3),
collapse_trials=False, trial_by_trial_accuracy=False,
dimension=3, removal_dim_indicies=[0, 1], actual_coordinate_prefixes=True)
batch_pipeline(str(data_output_directory), 'TimeTravelTask_CombinedSpaceAndTime.csv', data_shape=(4, 10, 3),
collapse_trials=False, trial_by_trial_accuracy=False,
dimension=3, actual_coordinate_prefixes=True)
In [7]:
test_summary_filename = 'time_travel_task_test_summary.csv'
test_cogrecon_space_only_filename = 'TimeTravelTask_SpaceOnly.csv'
test_cogrecon_time_only_filename = 'TimeTravelTask_TimeOnly.csv'
In [8]:
import pandas
import numpy as np
import matplotlib.pyplot as plt
import rpy2.robjects.packages as rpackages
from rpy2.robjects.vectors import StrVector
from rpy2.robjects import r, pandas2ri
import rpy2.robjects
pandas2ri.activate()
utils = rpackages.importr('utils')
utils.chooseCRANmirror(ind=1)
packnames = ['afex']
names_to_install = [x for x in packnames if not rpackages.isinstalled(x)]
if len(names_to_install) > 0:
utils.install_packages(StrVector(names_to_install))
afex = rpackages.importr('afex')
def rANOVA(_data, column, significance_level=0.05, verbose=False):
r_data = pandas2ri.py2ri(_data) # Convert the data
ez = r.aov_ez('subID', column, r_data, within='trial') # Run the anova
p_value = np.array(np.array(ez)[0])[-1][0] # Store the p-value
print('_'*50)
if p_value < significance_level: # Check for significance
print("Significant (p={0}) change in {1} overall.\r\n".format(p_value, column))
em = r.emmeans(ez, r.formula('~trial')) # Calculate the trial statistics
forward_difference_anova_result = r.pairs(em) # Generate the Tukey corrected pairwise comparisons
forward_difference_anova_summary = r.summary(forward_difference_anova_result) # Summarize the results
adjacent_p_values = np.array(forward_difference_anova_summary)[5][[True, False, False, True, False, True]]
for p, l in zip(adjacent_p_values, ['First vs. Second', 'Second vs. Third', 'Third vs. Fourth']):
if p < significance_level:
print("Significant (p={0}) change in {1}.".format(p, l))
else:
print("No Significant (p={0}) change in {1}.".format(p, l))
if verbose:
print(ez) # Print the basic anova result
print(forward_difference_anova_summary) # Print the pairwise comparisons
else:
print("No Significant (p={0}) change in {1} overall.".format(p_value, column))
print('_'*50)
In [9]:
import pandas
import numpy as np
import matplotlib.pyplot as plt
def visualize_columns(data, column_names, titles, ylabels, fig_size=(15, 5), separate_plots=True, legend_labels=None, subplot_shape=None, verbose_anova=False):
# Extract the columns of interest
trial_num = data['trial']
columns = [data[c_name] for c_name in column_names]
# Generate useful constants
trials = list(set(trial_num))
num_items = 10
num_participants = len(trial_num)
means = [[np.mean(column[trial_num == i]) for i in trials] for column in columns]
std_errs = [[np.std(column[trial_num == i])/np.sqrt(num_participants) for i in trials] for column in columns]
# Visualize each trial-over-trial mean in a subplot
if separate_plots:
if subplot_shape is None:
f, axarr = plt.subplots(1, len(column_names))
else:
f, axarr = plt.subplots(*subplot_shape)
axarr = [j for i in axarr for j in i]
if len(column_names) == 1:
axarr = [axarr]
f.set_size_inches(fig_size)
for ax, title, mean, std_err, ylabel in zip(axarr, titles, means, std_errs, ylabels):
ax.errorbar(trials, mean, std_err)
ax.set_title(title)
ax.grid(True)
ax.set_xlabel('Trials')
ax.set_ylabel(ylabel)
ax.xaxis.set_ticks(trials)
plt.show()
else:
f = plt.figure()
f.set_size_inches(fig_size)
for idx, (title, mean, std_err, ylabel) in enumerate(zip(titles, means, std_errs, ylabels)):
label = ''
if legend_labels is not None:
label = legend_labels[idx]
plt.errorbar(trials, mean, std_err, label=label)
plt.title(title)
plt.grid(True)
plt.xlabel('Trials')
plt.ylabel(ylabel)
plt.gca().xaxis.set_ticks(trials)
plt.legend()
plt.show()
for column in column_names:
rANOVA(data, column.replace(' ', '.'), verbose=verbose_anova)
In [10]:
import pandas
In [11]:
# Load the data
data = pandas.read_csv(test_summary_filename)
columns = ['space_misplacement', 'time_misplacement', 'event_type_correct_count']
titles = ['Space Misplacement', 'Time Misplacement', 'Event Type Correct']
ylabels = ['Misplacement (meters)', 'Misplacement (seconds)', 'Incorrect Events']
visualize_columns(data, columns, titles, ylabels)
In [12]:
# Load the data
data_time = pandas.read_csv(test_cogrecon_time_only_filename, skiprows=1)
data_space = pandas.read_csv(test_cogrecon_space_only_filename, skiprows=1)
visualize_columns(data_time, ['Accurate Misassignment'], ['Time Misassignment'], ['Number of Items'], verbose_anova=True)
visualize_columns(data_space, ['Accurate Misassignment'], ['Space Misassignment'], ['Number of Items'], verbose_anova=True)
In [13]:
# Load the data
data_time = pandas.read_csv(test_summary_filename)
columns = ['context_crossing_dist_exclude_wrong_color_pairs', 'context_noncrossing_dist_exclude_wrong_color_pairs']
titles = ['', 'Within vs. Across']
legend_labels = ['Across', 'Within']
ylabels = ['Normalized Distance (1 is perfect)', 'Normalized Distance (1 is perfect)']
visualize_columns(data_time, columns, titles, ylabels, separate_plots=False, legend_labels=legend_labels)
The analysis of the effect of context on misassignment is not included in any of the packages directly. As a result, we need to do some custom computation to get out these numbers.
First, we need to read the data from the Accurate Misassignment Pairs column in the temporal only test output file. Because we're running this in one file, we can simply extract this from the output result rather than reading from file.
In [14]:
import ast
import pandas
import numpy as np
from cogrecon.core.batch_pipeline import get_header_labels
In [15]:
# Load the data
data = pandas.read_csv('TimeTravelTask_TimeOnly.csv', skiprows=1)
misassignment_pairs = [ast.literal_eval(row) for row in data['Accurate Misassignment Pairs']]
Next we'll process the list, counting the number of within vs. across context pairs.
In [16]:
# The pairs which share a context (note that order doesn't matter for this)
within_key = [[0, 1], [1, 0], [2, 3], [3, 2], [4, 5], [5, 4], [6, 7], [7, 6]]
# The items to exclude because they had no contextual information
# thus if they were given temporal information, they would not be a valid misassignment
exclusion_items = [8, 9]
within_list = []
across_list = []
totals_list = []
for i, a in enumerate(misassignment_pairs):
totals_list.append(len(a))
within_list.append(0)
across_list.append(0)
for el in a:
if all([el_i not in exclusion_items for el_i in el]):
if el in within_key:
within_list[-1] += 1
else:
across_list[-1] += 1
within_list_proportion = [float(x)/float(y) if y is not 0 else np.nan for x, y in zip(within_list, totals_list)]
across_list_proportion = [float(x)/float(y) if y is not 0 else np.nan for x, y in zip(across_list, totals_list)]
In [17]:
data = pandas.read_csv('TimeTravelTask_TimeOnly.csv', skiprows=1)
num_elements = len(within_list_proportion)
data['within_list_proportion'] = pandas.Series(within_list_proportion)
data['across_list_proportion'] = pandas.Series(across_list_proportion)
data['within'] = pandas.Series(within_list)
data['across'] = pandas.Series(across_list)
data['top'] = pandas.Series([6./7.]*num_elements)
data['bottom'] = pandas.Series([1./7.]*num_elements)
columns_proportion = ['top', 'bottom', 'within_list_proportion', 'across_list_proportion']
titles_proportion = ['', '', '', 'Within vs. Across Proportion']
legend_labels_proportion = ["Across Chance", "Within Chance", "Within", "Across"]
ylabels_proportion = ['', '', '', 'Proportion of Misassignments']
columns = ['within', 'across']
titles = ['', 'Within vs. Across']
legend_labels = ["Within", "Across"]
ylabels = ['', 'Number of Misassigned Items']
visualize_columns(data, columns_proportion, titles_proportion, ylabels_proportion, separate_plots=False, legend_labels=legend_labels_proportion, verbose_anova=True)
visualize_columns(data, columns, titles, ylabels, separate_plots=False, legend_labels=legend_labels, verbose_anova=True)
Next we can save these results to an intermediate file for running statistics.
In [18]:
if 'generate_intermediate_files' in vars() and generate_intermediate_files:
with open('misassignments_by_context.csv', 'w') as fp:
fp.write('subID,trial,total_misassignments,within_misassignments,across_misassignments,within_misassignment_proportions,across_misassignment_proportions\n')
for sid, tr, t, w, a, wp, ap in zip(data['subID'], data['trial'], totals_list,within_list ,across_list,within_list_proportion,across_list_proportion):
fp.write('{0},{1},{2},{3},{4},{5},{6}\n'.format(sid, tr, t, w, a, wp, ap))
In [19]:
from cogrecon.core.data_flexing.time_travel_task.time_travel_task_analytics import summarize_navigation_data
See the FD_Lacunarity.ipynb for details on how fd_indicies were determined. They can be calculated for individuals by setting fd_indicies_domain=None. Otherwise a list of scale indicies is expected (in this case, deteremined by the average window across participants).
In [20]:
if 'generate_intermediate_files' in vars() and generate_intermediate_files:
summarize_navigation_data(search_directory=data_directory,
fd_indicies_time=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15],
fd_indicies_space=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14],
fd_indicies_spacetime=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) # For Time Travel Task specific analyses
In [21]:
import pandas
# Load the data
data_nav = pandas.read_csv('time_travel_task_navigation_summary.csv')
In [22]:
columns = ['time_travelled', 'space_travelled', 'context_boundary_crossings',
'fd_time', 'fd_space', 'fd_spacetime',
'lacunarity_time', 'lacunarity_space', 'lacunarity_spacetime']
titles = ['Temporal Distance', 'Spatial Distance', 'Boundary Crossings',
'Fractal Dimension Time', 'Fractal Dimension Space', 'Fractal Dimension Spacetime',
'Lacunarity Time', 'Lacunarity Space', 'Lacunarity Spacetime']
ylabels = ['Distance Travelled (seconds)', 'Distance Travelled (meters)', 'Number of Instances',
'', '', '', '', '', '']
visualize_columns(data_nav, columns, titles, ylabels, separate_plots=True, subplot_shape=[3, 3], fig_size=(15, 15), verbose_anova=True)
This section contains the analysis for examining changes in click order during study. First we define our primary metrics of evaluation. ktcd will give us a tuple of the two distance metrics for a given permutation from the root permutation (i.e. [1,2,3,...,n]).
This next section of code is fairly elaborate. It defines the two distance metrics, a function for generating a look-up-table for the probabilities of the given distance coordinates, and a visualization function which animates the trial-over-trial points.
In [43]:
from math import factorial
import scipy.stats as stats
import scipy.spatial as spatial
import numpy as np
import itertools as itools
from tqdm import tqdm
from IPython.display import HTML
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib import rc, cm
# Decompose a list into contiguous sections (i.e. 2,3,4,1,5,6,8,7 becomes [[2,3,4],[1],[5,6],[8],[7]])
def continuous_decomposition(seq):
decomposition = [[]]
for i in seq:
if len(decomposition[0]) == 0:
decomposition[0].append(i)
elif decomposition[-1][-1] == i - 1:
decomposition[-1].append(i)
else:
decomposition.append([i])
return decomposition
# Returns the normalized contiguous decomposition of a permutation
# (i.e. (the number of contiguous sublists - 1)/(the length of the permutation - 1))
# If this function returns 0, the list was the elements [1,2,...,len(p)]. If 1, the list had no contiguous sublists.
def contiguous_distance(p):
return (float(len(continuous_decomposition(p))) - 1) / (len(p) - 1)
# Wraps scipy.stats.kendalltau, setting the target list to [1,2,...,len(p)] and returning just the correlation coefficient
def kendall_tau(p):
r, _ = stats.kendalltau([x+1 for x in range(0, len(p))], p)
return r
# Helper function which returns the kendall_tau and contiguous_distance values for a permutation
def kendall_tau_contiguity_distance(p):
return kendall_tau(p), contiguous_distance(p)
# Helper function which shortens the name of kendall_tau_contiguity_distance to ktcd
def ktcd(p):
return kendall_tau_contiguity_distance(p)
# This function returns a probability lookup table given a particular size of permutation for the ktcd methods.
# Note that it can take a VERY long time to execute on set sizes larger than 10.
def get_ktcd_probability_lut(perm_size, verbose=True):
if perm_size > 10:
print("Warning: Permutation sizes greater than 10 can take a very long time to execute. Set verbose=False to disable this message.")
target = [i+1 for i in range(0, perm_size)]
rs0 = []
rs1 = []
perm_length = np.prod(list(range(1, len(target) + 1)))
for idx, perm in enumerate(tqdm(itools.permutations(target), total=factorial(len(target)), desc="Computing Permutations for LUT")):
r, cr = kendall_tau_contiguity_distance(perm)
rs0.append(r)
rs1.append(cr)
cs = []
rs_zipped = zip(rs0, rs1)
unique = []
unique_count = []
for r0, r1 in zip(rs0, rs1):
if not (r0, r1) in unique:
unique.append((r0, r1))
unique_count.append(rs_zipped.count((r0, r1)))
rs0_unique, rs1_unique = np.transpose(unique)
probability_dictionary = {}
for u, uc in zip(unique, unique_count):
probability_dictionary[u] = float(uc)/float(perm_length)
return probability_dictionary
# Function for visualizing a combination of Kentall Tau and Cycle Rearrangement Distance
def visualize_trial_over_trial_3d(root_lut, multi_data, n_interpolated_points=50, interval=33, weights=None, fontsize='x-large', animate=True):
lutA = root_lut
if animate:
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.set_xlabel('Kendall Tau Distance')
ax.set_ylabel('Contiguity Distance')
ax.set_zlabel('Probability')
else:
fig = plt.figure()
ax1 = fig.add_subplot(221, projection='3d')
ax2 = fig.add_subplot(222, projection='3d')
ax3 = fig.add_subplot(223, projection='3d')
ax4 = fig.add_subplot(224, projection='3d')
axs = [ax1, ax2, ax3, ax4]
ax = axs[0]
for ax in axs:
ax.set_xlabel('Kendall Tau Distance', fontsize=15, labelpad=10)
ax.set_ylabel('Contiguity Distance', fontsize=15, labelpad=10)
ax.set_zlabel('Probability', fontsize=15, labelpad=10)
keys = lut.keys()
k0, k1 = np.transpose(keys)
if animate:
ax.scatter(k0, k1, lut.values(), c=lut.values(), s=16)
else:
for ax in axs:
ax.scatter(k0, k1, lut.values(), c=lut.values(), s=16)
# This function is used for animation
def update_plot(i, data, scat, txt):
txt.set_text('Trial:' + str(float(i)/float(n_interpolated_points) + 1), fontsize=20)
pts = []
for d in data:
pts.append(list(d[i]))
scat._offsets3d = np.transpose(pts)
return scat,
rgb = cm.get_cmap('viridis')(np.linspace(0.0, 1.0, 4))[np.newaxis, :, :3][0]
points_data = []
start_points = []
distances = [[], [], [], []]
for data in tqdm(multi_data, desc="Processing Simulated Data"):
dists = [kendall_tau_contiguity_distance(p) for p in data]
probs = [lut[d] for d in dists if d in lut.keys()]
d0, d1 = np.transpose(dists)
for i, (d0t, d1t) in enumerate(zip(d0, d1)):
distances[i].append((d0t, d1t))
lerp_points_x = []
lerp_points_y = []
lerp_points_z = []
for idx in range(0, len(d0) - 1):
lerp_points_x += list(np.linspace(d0[idx], d0[idx + 1], n_interpolated_points, endpoint=True))
lerp_points_y += list(np.linspace(d1[idx], d1[idx + 1], n_interpolated_points, endpoint=True))
lerp_points_z += list(np.linspace(probs[idx], probs[idx + 1], n_interpolated_points, endpoint=True))
point_data = zip(lerp_points_x, lerp_points_y, lerp_points_z)
points_data.append(point_data)
if animate:
ax.scatter(d0, d1, probs, c=rgb, marker='s', s=50)
else:
for ax, x, y, z in zip(axs, d0, d1, probs):
ax.scatter([x], [y], [z], c=(1.0, 0.0, 0.0, 0.1), marker='s', s=50, edgecolors='k', linewidths=0.5)
start_points.append([d0[0], d1[0], probs[0]])
x, y, z = np.transpose(start_points)
if animate:
scatter_plot = ax.scatter(x, y, z, c=['r'], marker='s', s=50)
x_means = [np.mean(np.transpose(dist)[0]) for dist in distances]
y_means = [np.mean(np.transpose(dist)[1]) for dist in distances]
z_means = [np.mean(pps) for pps in [[lut[d] for d in dists if d in lut.keys()] for dists in distances]]
if animate:
mean_scatter_plot = ax.scatter(x_means, y_means, z_means, c='b', marker='o', s=150)
txt = ax.text2D(0.05, 0.95, "Trial 4", transform=ax.transAxes, size=fontsize)
ani = animation.FuncAnimation(fig, update_plot, frames=len(lerp_points_x), interval=interval,
fargs=(points_data, scatter_plot, txt))
rc('animation', html='html5')
ani.to_html5_video()
plt.show()
return ani
else:
for idx, (ax, x, y, z) in enumerate(zip(axs, x_means, y_means, z_means)):
txt = ax.text2D(0.05, 0.95, "Trial {0}".format(idx+1), transform=ax.transAxes, size=fontsize)
ax.scatter([x], [y], [z], c='b', marker='o', s=150)
ax.plot([x, x], [0, 1], [0, 0], c='b', linestyle='--')
ax.plot([-1, 1], [y, y], [0, 0], c='b', linestyle='--')
plt.tight_layout()
plt.show()
In [24]:
import pandas
import ast
# Load the data
data_nav = pandas.read_csv('time_travel_task_navigation_summary.csv')
inverse = data_nav['inverse']
# Process the lists of indicies
raw_indicies = [ast.literal_eval(row) for row in data_nav['click_order'].values]
sorted_indicies = []
indicies = []
# Simplify the indicies to their ordinal position (i.e. [0, 2, 3, 1, 5, ...])
for row, inv in zip(raw_indicies, inverse):
if inv:
sorted_list, index_list = zip(*sorted(zip(row, list(reversed(range(0, len(row)))))))
else:
sorted_list, index_list = zip(*sorted(zip(row, list(range(0, len(row))))))
sorted_indicies.append(sorted_list)
indicies.append(index_list)
# Remove items 8 and 9 as they had no temporal information
temporal_only_indicies = []
for row in indicies:
index_list = list(row)
index_list.remove(8)
index_list.remove(9)
temporal_only_indicies.append(index_list)
trials = data_nav['trial']
distances = [ktcd(perm) for perm in temporal_only_indicies]
In [25]:
vis_data = [np.array(temporal_only_indicies)[list(trials.values == i)] for i in [1, 2, 3, 4]]
vis_data = np.transpose(vis_data, axes=(1,0,2))
lut = get_ktcd_probability_lut(8)
In [26]:
if 'generate_intermediate_files' in vars() and generate_intermediate_files:
with open('click_order_data.csv', 'w') as fp:
fp.write('subID,trial,kendall_tau_distance,contiguity_distance\n')
for sid, tr, kt, c in zip(data_nav['subID'], data_nav['trial'], *np.transpose(distances)):
fp.write('{0},{1},{2},{3}\n'.format(sid, tr, kt, c))
Finally, call the visualization function. This function interpolates over the trials where the height is the probability given a random permutation, the lighter dots are the earlier trials, and the darker dots are the later trials, and the red dots are an animated interpolation across the trials. Deviating away from the peak probability region indicates a preference towards some permutation with low probability.
In [27]:
# Note: you'll need to change the ffmpeg path to use this on your machine
def embed_animation_as_gif(_animation, ffmpeg_path=r'C:\mingw2\bin\ffmpeg.exe'):
if not os.path.exists(ffmpeg_path):
return _animation
_animation.save('animation.mp4')
os.system("ffmpeg -i animation.mp4 animation.gif -y")
data='0'
IMG_TAG = '<img src="data:image/gif;base64,{0}">'
with open('animation.gif', 'rb') as f:
data = f.read()
data = data.encode('base64')
return HTML(IMG_TAG.format(data))
In [28]:
data_dist = pandas.DataFrame()
data_dist['subID'] = data_nav['subID']
data_dist['trial'] = trials
data_dist['kendall_tau_distance'] = pandas.Series(list(np.transpose(distances)[0]))
data_dist['contiguity_distance'] = pandas.Series(list(np.transpose(distances)[1]))
columns = ['kendall_tau_distance', 'contiguity_distance']
titles = ['Kendal Tau', 'Contiguity']
ylabels = ['Distance', 'Distance']
visualize_columns(data_dist, columns, titles, ylabels, separate_plots=True, verbose_anova=True)
In [44]:
import matplotlib
%matplotlib inline
matplotlib.rcParams['figure.figsize'] = (20,10)
anim = visualize_trial_over_trial_3d(lut, vis_data, animate=False)
In [46]:
dists = [[kendall_tau_contiguity_distance(p) for p in data] for data in vis_data]
In [49]:
matplotlib.rcParams['figure.figsize'] = (20, 5)
prob = lut[(0.99999999999999978, 0.0)]
plt.bar([1, 2, 3, 4], [len([x for x in dat if np.isclose(x[0], 1.0) and np.isclose(x[1], 0.0)])/43. for dat in np.transpose(dists, axes=(1,0,2))], label='Proportion per trial')
plt.xlabel('Trial', fontsize=15)
plt.ylabel('Proportion of Participants', fontsize=15)
plt.title('Proportion of Participants on the (1, 0) Point', fontsize=15)
plt.xticks([1, 2, 3, 4])
plt.plot([0.5, 4.5], [prob, prob], '--', c='r', label='Change Level')
plt.legend(fontsize=15)
plt.show()
In [52]:
prob = lut[(0.99999999999999978, 0.0)]
print('{:0.6f}'.format(prob))
In [53]:
%%capture
import matplotlib
matplotlib.rcParams['figure.figsize'] = (20,10)
anim = visualize_trial_over_trial_3d(lut, vis_data)
In [32]:
import IPython.display as display
display_obj = anim
if 'generate_embedded_animations' in vars() and generate_embedded_animations and 'embed_animation_as_gif' in vars():
display_obj = embed_animation_as_gif(display_obj)
display.display_html(display_obj)
In [54]:
import pandas
# Load all data
data_summary = pandas.read_csv('time_travel_task_test_summary.csv')
data_time = pandas.read_csv('TimeTravelTask_TimeOnly.csv', skiprows=1)
data_space = pandas.read_csv('TimeTravelTask_SpaceOnly.csv', skiprows=1)
data_nav = pandas.read_csv('time_travel_task_navigation_summary.csv')
data_missassignment_by_context = pandas.read_csv('misassignments_by_context.csv')
data_order = pandas.read_csv('click_order_data.csv')
# Fix inconsistent indexing in some files
data_summary['trial'] = [x-1 for x in data_summary['trial'].values]
data_nav['trial'] = [x-1 for x in data_nav['trial'].values]
data_order['trial'] = [x-1 for x in data_order['trial'].values]
# Confirm subID and trial match across all data
assert all([a==b==c==d==e==f for a,b,c,d,e,f in zip(data_order['subID'].values, data_missassignment_by_context['subID'].values, data_nav['subID'].values, data_time['subID'].values, data_space['subID'].values, data_summary['subID'].values)]), 'subIDs do not match in intermediate files'
assert all([a==b==c==d==e==f for a,b,c,d,e,f in zip(data_order['trial'].values, data_missassignment_by_context['trial'].values, data_nav['trial'].values, data_time['trial'].values, data_space['trial'].values, data_summary['trial'].values)]), 'trials do not match in intermediate files'
data = pandas.DataFrame()
# Random Factors
data['subID'] = data_summary['subID']
data['trial'] = data_summary['trial']
# Study Time Factors (independent variables)
# AS) Simple Path Factors
data['space_travelled'] = data_nav['space_travelled']
data['time_travelled'] = data_nav['time_travelled']
# BS) Complex Path Factors
data['fd_time'] = data_nav['fd_time']
data['fd_space'] = data_nav['fd_space']
data['fd_spacetime'] = data_nav['fd_spacetime']
data['lacunarity_time'] = data_nav['lacunarity_time']
data['lacunarity_space'] = data_nav['lacunarity_space']
data['lacunarity_spacetime'] = data_nav['lacunarity_spacetime']
# CS) Context Path Factors
data['context_boundary_crossings'] = data_nav['context_boundary_crossings']
# DS) Order Factors
data['kendall_tau_distance'] = data_order['kendall_tau_distance']
data['contiguity_distance'] = data_order['contiguity_distance']
# Test Time Factors (dependent variables)
# AT) Simple Factors
data['space_misplacement'] = data_summary['space_misplacement']
data['time_misplacement'] = data_summary['time_misplacement']
data['event_type_correct_count'] = data_summary['event_type_correct_count']
# BT) Context Factors
data['across_context_boundary_effect'] = data_summary['context_crossing_dist_exclude_wrong_color_pairs']
data['within_context_boundary_effect'] = data_summary['context_noncrossing_dist_exclude_wrong_color_pairs']
data['context_boundary_effect'] = data_summary['context_crossing_dist_exclude_wrong_color_pairs'] - data_summary['context_noncrossing_dist_exclude_wrong_color_pairs']
# CT) Relational Memory Factors
data['accurate_misassignment_space'] = data_space['Accurate Misassignment']
data['accurate_misassignment_time'] = data_time['Accurate Misassignment']
# DT) Relational Memory and Context Factors
data['within_misassignments'] = data_missassignment_by_context['within_misassignments']
data['across_misassignments'] = data_missassignment_by_context['across_misassignments']
if 'generate_intermediate_files' in vars() and generate_intermediate_files:
data.to_csv('full_dataset.csv')
In [55]:
from rpy2.robjects import pandas2ri
import rpy2.robjects as robjects
import numpy as np
pandas2ri.activate()
Just in case nlme is not properly installed, we can try to install it. If prompts appear, you'll need to accept the install in order to use the next code sections.
In [56]:
import rpy2.robjects.packages as rpackages
from rpy2.robjects.vectors import StrVector
from rpy2.robjects import r
utils = rpackages.importr('utils')
utils.chooseCRANmirror(ind=1)
# R package names
packnames = ['nlme', 'stats', 'reghelper', 'MuMIn', 'afex']
names_to_install = [x for x in packnames if not rpackages.isinstalled(x)]
if len(names_to_install) > 0:
utils.install_packages(StrVector(names_to_install))
Now we can convert the data and import the nlme package.
In [57]:
r_dataframe = pandas2ri.py2ri(data)
nlme = rpackages.importr('nlme')
rstats = rpackages.importr('stats')
reghelper = rpackages.importr('reghelper')
mumin = rpackages.importr('MuMIn')
afex = rpackages.importr('afex')
Next we can run the model and print out the t-Table.
In [58]:
# correlation=nlme.corSymm(form=r.formula('~1 | subID/trial'))
random_effects_model = '~ trial | subID'
navigation = 'time_travelled + space_travelled + fd_time + fd_space + fd_spacetime + lacunarity_time + lacunarity_space + lacunarity_spacetime'
order = 'kendall_tau_distance + contiguity_distance'
model_formulas = [
# Misplacement vs. Navigation Models
'time_misplacement ~ ' + navigation,
'space_misplacement ~ ' + navigation,
# Misassignment vs. Navigation Models
'accurate_misassignment_space ~ ' + navigation,
'accurate_misassignment_time ~ ' + navigation,
# Context Boundary Crossing Models
'within_misassignments ~ context_boundary_crossings',
'across_misassignments ~ context_boundary_crossings',
'context_boundary_effect ~ context_boundary_crossings', # Note: na.omit is needed due to CBE having nan values
# Order Models (Note all output variables are used because no prior hypotheses were made about order's influence)
'time_misplacement ~ ' + order,
'space_misplacement ~ ' + order,
'accurate_misassignment_space ~ ' + order,
'accurate_misassignment_time ~ ' + order,
'within_misassignments ~ ' + order,
'across_misassignments ~ ' + order,
'context_boundary_effect ~ ' + order # Note: na.omit is needed due to CBE having nan values
]
transform_functions = [np.sqrt, lambda x: np.log(np.log(np.log(x))),
lambda x: x, lambda x: x, ## these are never normal
lambda x: x, np.cbrt, lambda x: x]
# np.sqrt, np.cbrt, np.log, lambda x: 1/x, lambda x: x
data_transformed = data.copy(True)
for c, tf in zip(['time_misplacement', 'space_misplacement',
'accurate_misassignment_space', 'accurate_misassignment_time',
'within_misassignments', 'across_misassignments', 'context_boundary_effect'], transform_functions):
data_transformed[c] = data_transformed[c].apply(tf)
r_transformed_dataframe = pandas2ri.py2ri(data_transformed)
models = [nlme.lme(r.formula(model_formula),
random=r.formula(random_effects_model),
data=r_transformed_dataframe,
control=nlme.lmeControl(maxIter=100, msMaxIter=100, opt='optim'), # Note: 'optim' is needed to avoid failure to converge
**{'na.action': 'na.omit'} # Other options can be found here: https://stat.ethz.ch/R-manual/R-devel/library/stats/html/na.fail.html
)
for model_formula in model_formulas]
# Uncomment this to view all the possible keys
# print(r.summary(models[0]).names)
# We can pick certain keys to ignore during printing
ignore_keys = ['modelStruct', 'dims', 'contrasts', 'coefficients', 'varFix',
'sigma', 'apVar', 'numIter', 'groups', 'logLik',
'call', 'terms', 'fitted', 'method', 'residuals',
'fixDF', 'na.action', 'data' , 'corFixed', # 'tTable'
'BIC', 'AIC'
]
for model, name in zip(models, model_formulas):
for key in r.summary(model).names:
if key not in ignore_keys:
print("_"*len(name))
print(name)
print("_"*len(name))
print(r.summary(model).rx2(key))
print(reghelper.beta(model).rx2('tTable'))
We can see the overall fit of the various models in this next section.
In [59]:
r_squared_values = np.array([np.array(mumin.r_squaredGLMM_lme(model)) for model in models]).transpose()
values = [model_formulas, r_squared_values[0], r_squared_values[1]]
columns = ["Model Formulas", "Marginal R^2 (Fixed Effects)", "Conditional R^2 (Fixed + Random Effects)"]
r_squared_data_frame = pandas.DataFrame({c: v for c, v in zip(columns, values)})
r_squared_data_frame
Out[59]:
We can also plot the random coefficients for subID and trial. First we'll extract the data then plot.
In [60]:
import matplotlib.pyplot as plt
%matplotlib inline
num_rows = int(np.ceil(len(models)/3.))
fig, axes = plt.subplots(num_rows, 3)
fig.set_size_inches((15, num_rows*5))
axes = [item for sublist in axes for item in sublist]
for idx, (model, ax) in enumerate(zip(models, axes)):
terms = r.summary(model).rx2('terms').r_repr()
c = str(terms.split('~')[0].strip())
title = 'model #{0} ({1})'.format(idx, terms, c)
ax.set_title(title[:30] + '...')
ax.set_ylabel(c)
ax.set_xlabel('trial')
#yd = np.transpose([np.array(pandas.Series(data[c][data['trial'] == n]).fillna(method='backfill')) for n in [0, 1, 2, 3]])
yd = np.transpose([data_transformed[c][data_transformed['trial'] == n] for n in [0, 1, 2, 3]])
# Get the subject coefficients
subject_lines = pandas2ri.ri2py(r.summary(model).rx2('coefficients').rx2('random').rx2('subID'))
x = np.array([0, 1, 2, 3])
ys = [x*slope + intercept for intercept, slope in subject_lines]
# Get the mean line from the t table
# Note: This section is commented out because I didn't properly look for the variable of interest,
# so the index may change depending on the model
#
# mean_line = pandas2ri.ri2py(r.summary(model).rx2('tTable'))
# mean = x*mean_line[1][0] + mean_line[1][1]
# plt.plot(list(x), list(mean), c='r', marker='>')
# Plot all the lines
for y, label, y2 in zip(ys, list(np.linspace(0.0, 1.0, len(ys))), yd):
ax.plot(list(x), list(y), c='b', marker='>', alpha=0.25)
ax.plot(list(x), y2 - np.nanmean(yd, axis=0), c='g', alpha=0.1)
# Clean up empty extra subplots
for idx in range(1, 3-len(models)%3 + 1):
axes[-idx].axis('off')
plt.show()
We can visualize the correlations as well.
In [61]:
import matplotlib.pyplot as plt
num_rows = int(np.ceil(len(models)/3.))
fig, axes = plt.subplots(num_rows, 3)
fig.set_size_inches((15, num_rows*5))
axes = [item for sublist in axes for item in sublist]
print_buffer = ''
for model, name, ax in zip(models, model_formulas, axes):
print_buffer += str("_"*len(name)) +'\r\n'
print_buffer += str(name)+'\r\n'
print_buffer += str("_"*len(name))+'\r\n'
print_buffer += str(r.summary(model).rx2('corFixed'))+'\r\n'
correlations = pandas2ri.ri2py(r.summary(model).rx2('corFixed'))
ax.imshow(correlations)
# Clean up empty extra subplots
for idx in range(1, 3-len(models)%3 + 1):
axes[-idx].axis('off')
plt.show()
print(print_buffer)
And we can compare multiple models of interest. For instance, if we create a larger model that combines all simple and complex navigation variables, we can see if the fit is better than if we use restricted models.
In [62]:
# Note: One interesting example of model comparisons is comparing the spatial and temporal path metrics predictive power
# over spatial or temporal misplacement. i.e.:
#
# model_0 = 'space_misplacement ~ (space_travelled + fd_space + lacunarity_space)'
# model_1 = 'space_misplacement ~ (time_travelled + fd_time + lacunarity_time)'
# and
# model_0 = 'time_misplacement ~ (space_travelled + fd_space + lacunarity_space)'
# model_1 = 'time_misplacement ~ (time_travelled + fd_time + lacunarity_time)'
#
# In both cases, the temporal path metrics resulted in a better fit than the spatial ones,
# even when the test metric was spatial.
# Define two models to compare
model_0 = 'time_misplacement ~ (space_travelled + fd_space + lacunarity_space)'
model_1 = model_formulas[0]
# Make a list of formulas to pass to the model function
step_mf = [model_0, model_1]
step_models = [nlme.lme(r.formula(model_formula),
random=r.formula(random_effects_model),
data=r_transformed_dataframe,
control=nlme.lmeControl(maxIter=100, msMaxIter=100, opt='optim'), # Note: 'optim' is needed to avoid failure to converge
**{'na.action': 'na.omit'} # Other options can be found here: https://stat.ethz.ch/R-manual/R-devel/library/stats/html/na.fail.html
)
for model_formula in step_mf]
# Get the model results and run an anova to compare the models
m0, m1 = step_models
result = r.anova(m0, m1)
# Convert the output of the anova into a Pandas DataFrame for convenience
summary_frame = pandas.DataFrame(np.transpose(np.concatenate([[[step_mf[0], step_mf[1]]], np.array(result[2:])])))
for idx, c in enumerate(summary_frame): # Coerce to numeric all but the first column
if idx != 0: summary_frame[c] = pandas.to_numeric(summary_frame[c], errors='coerce')
# Name the columns for convenience
summary_frame.columns = ['Model Formula', 'Df', 'AIC', 'BIC', 'logLik', 'Model', 'Chisq', 'P Value'][0:len(summary_frame.columns)]
print("_"*50 + "\r\nANOVA Summary")
print(summary_frame)
print("_"*50 + "\r\nResult\r\n" + "_"*50)
# If the P Value is significant, check the BIC to determine which model was a better fit (lower BIC is better fit)
best_model_idx = -1
if len(summary_frame.columns) == 5:
print('The Models are identitcal.')
elif summary_frame['P Value'][1] < 0.05:
print('The models were significantly different.')
if summary_frame['BIC'][0] > summary_frame['BIC'][1]:
print("The second model ({0}) is significantly better than the first.".format(step_mf[1]))
best_model_idx = 1
else:
print("The first model ({0}) is significantly better than the second.".format(step_mf[0]))
best_model_idx = 0
else:
print('The models were NOT significantly different.')
print("_"*50 + "\r\nBest Model t Table(s)\r\n" + "_"*50)
if best_model_idx == -1:
print(r.summary(m0).rx2('tTable'))
print(r.summary(m1).rx2('tTable'))
print("_"*50 + "\r\nBest Model Beta Table(s)\r\n" + "_"*50)
print(reghelper.beta(m0).rx2('tTable'))
print(reghelper.beta(m1).rx2('tTable'))
best_model = step_models[0]
else:
print(r.summary(step_models[best_model_idx]).rx2('tTable'))
print("_"*50 + "\r\nBest Model Beta Table(s)\r\n" + "_"*50)
print(reghelper.beta(step_models[best_model_idx]).rx2('tTable'))
best_model = step_models[best_model_idx]
Note that the Beta coefficients can be used to determine the relative imporance of the various predictor variables in the chosen model. The larger the beta coefficient (i.e. the first column in the Beta Tables) the better a predictor the independent variable is of the dependent variable.
Finally, we can test for normality of our residuals to confirm that the model is appropriately capturing the effect.
In [63]:
%matplotlib inline
import matplotlib.pyplot as plt
import scipy.stats as stats
import matplotlib.mlab as mlab
def residual_plot_1d(model, level=0, label=''):
f, (ax0, ax1) = plt.subplots(1, 2)
f.set_size_inches(15., 5.)
data = np.array(r.residuals(model, level=level))
n, bins, patches = ax0.hist(data, normed=1,)
(mu, sigma) = stats.norm.fit(data)
y = mlab.normpdf( bins, mu, sigma)
l = ax0.plot(bins, y, 'r--', linewidth=1)
res = stats.probplot(data, dist=stats.loggamma, sparams=(2.5,), plot=ax1)
ax0.set_xlabel(label)
ax0.set_ylabel('Probability')
ax0.set_title(r'$\mathrm{Histogram\ of\ IQ:}\ ' + '\mu={0},\ \sigma={1}$'.format(mu, sigma))
ax0.grid(True)
ax1.grid(True)
return data
mfs = model_formulas # [model_formulas[x] for x in [0, 1, 3, 5, 6]]
norm_models = [nlme.lme(r.formula(model_formula),
random=r.formula(random_effects_model),
data=r_transformed_dataframe,
control=nlme.lmeControl(maxIter=100, msMaxIter=100, opt='optim'), # Note: 'optim' is needed to avoid failure to converge
**{'na.action': 'na.omit'} # Other options can be found here: https://stat.ethz.ch/R-manual/R-devel/library/stats/html/na.fail.html
)
for model_formula in mfs]
for model, form in zip(norm_models, mfs):
model_under_test = model
print(form)
res0 = residual_plot_1d(model_under_test, level=0)
res1 = residual_plot_1d(model_under_test, level=1)
k2, p = stats.normaltest(res0)
print('k2 = {0}, p = {1}'.format(k2, p))
if p < 0.05:
print('The residuals were NOT normal.')
else:
print('The residuals were normal.')
k2, p = stats.normaltest(res1)
print('k2 = {0}, p = {1}'.format(k2, p))
if p < 0.05:
print('The residuals were NOT normal.')
else:
print('The residuals were normal.')
plt.show()
In [80]:
%%capture
import matplotlib.pyplot as plt
from matplotlib import animation, rc
from IPython.display import HTML
import scipy.stats as stats
import os
import IPython.display as display
import statsmodels.api as smapi
from statsmodels.formula.api import ols
import statsmodels.graphics as smgraphics
rc('animation', html='html5')
%matplotlib inline
relations = [
['across_misassignments', 'context_boundary_crossings'],
['context_boundary_effect', 'context_boundary_crossings'],
['accurate_misassignment_time', 'fd_space'],
['accurate_misassignment_time', 'lacunarity_space'],
['accurate_misassignment_time', 'lacunarity_spacetime'],
['time_misplacement', 'time_travelled'],
['space_misplacement', 'space_travelled']
]
def get_animation_from_data(xs, ys, title='', x_label='', y_label='', animate=True):
def update_plot(t):
if t+1 >= len(xs): # For pausing at the end
t = 2.99
index = int(np.floor(t))
nt = t % 1.0
newPoints = np.transpose([xs[index+1], ys[index+1]])
originalPoints = np.transpose([xs[index], ys[index]])
interpolation = originalPoints*(1-nt) + newPoints*nt
scat.set_offsets(interpolation)
line_x = np.linspace(np.min(xs), np.max(xs), 10)
newLine = np.transpose([line_x, np.array([fits[index+1][0]*x + fits[index+1][1] for x in line_x])])
originalLine = np.transpose([line_x, np.array([fits[index][0]*x + fits[index][1] for x in line_x])])
interpolation = originalLine*(1-nt) + newLine*nt
line_plot[0].set_xdata(np.transpose(interpolation)[0])
line_plot[0].set_ydata(np.transpose(interpolation)[1])
r = fits[index][2]*(1-nt) + fits[index+1][2]*nt
p = fits[index][3]*(1-nt) + fits[index+1][3]*nt
txt.set_text('p=' + "{:.2f}".format(p) + '\n' + 'r=' + "{:.2f}".format(r))
return scat,
if animate:
fig = plt.figure()
ax = plt.gca()
else:
fig = plt.figure()
ax1 = fig.add_subplot(221)
ax2 = fig.add_subplot(222)
ax3 = fig.add_subplot(223)
ax4 = fig.add_subplot(224)
axs = [ax1, ax2, ax3, ax4]
ax = axs[0]
for ax in axs:
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
masks = [[ ~np.isnan(varx) & ~np.isnan(vary) for varx, vary in zip(x, y)] for x, y in zip(xs, ys)]
for idx, (x, y, mask) in enumerate(zip(xs, ys, masks)):
regression = ols("data ~ x", data=dict(data=y[mask], x=x[mask])).fit()
test = regression.outlier_test()
outliers = []
try:
outliers = ((i, x[i],y[i]) for i,t in enumerate(test['bonf(p)']) if t < 0.5)
print 'Outliers: ', list(outliers)
except:
pass
for outlier in outliers:
del masks[idx][outerliers[0]]
del xs[idx][outerliers[0]]
del xs[idx][outerliers[0]]
fits = [stats.linregress(x[mask], y[mask]) for x, y, mask in zip(xs, ys, masks)]
scat = plt.scatter([], [], color='white', edgecolors ='black')
if animate:
txt = plt.text(1, 0, '', fontsize=12, ha='right', va='bottom', transform=ax.transAxes)
line_plot = plt.plot([], [], color='black')
anim = animation.FuncAnimation(fig, update_plot, frames=np.arange(0, 4, 0.01), interval=33)
plt.title(title)
[plt.scatter(x, y, label=idx+1, alpha=0.5) for idx, (x, y) in enumerate(zip(xs, ys))]
plt.legend(loc=2)
plt.tight_layout()
else:
plt.suptitle(title)
fig.set_size_inches((15, 8))
for idx, (x, y, ax, fit) in enumerate(zip(xs, ys, axs, fits)):
ax.set_title('Trial {0}'.format(idx+1))
txt = plt.text(1, 0, '', fontsize=12, ha='right', va='bottom', transform=ax.transAxes)
ax.scatter(x, y, label=idx+1, alpha=0.5)
line_plot = plt.plot([], [], color='black')
line_x = np.linspace(np.min(x), np.max(x), 10)
line_y = np.array([fit[0]*x + fit[1] for x in line_x])
ax.plot(line_x, line_y, 'k')
r = fit[2]
p = fit[3]
txt.set_text('p=' + "{:.2f}".format(p) + '\n' + 'r=' + "{:.2f}".format(r))
plt.tight_layout()
if animate:
plt.close()
return anim
def get_animation(relation, animate=True):
xs = [data[relation[1]][data['trial'] == n] for n in [0, 1, 2, 3]]
ys = [data[relation[0]][data['trial'] == n] for n in [0, 1, 2, 3]]
return get_animation_from_data(xs, ys, title=relation[1].replace('_',' ').title() + ' vs. ' + relation[0].replace('_',' ').title(), x_label=relation[1].replace('_',' ').title(), y_label=relation[0].replace('_',' ').title(), animate=animate)
# Note: you'll need to change the ffmpeg path to use this on your machine
def embed_animation_as_gif(_animation, ffmpeg_path=r'C:\mingw2\bin\ffmpeg.exe'):
if not os.path.exists(ffmpeg_path):
return _animation
_animation.save('animation.mp4')
os.system("ffmpeg -i animation.mp4 animation.gif -y")
data='0'
IMG_TAG = '<img src="data:image/gif;base64,{0}">'
with open('animation.gif', 'rb') as f:
data = f.read()
data = data.encode('base64')
return HTML(IMG_TAG.format(data))
In [81]:
for relation in relations:
display_obj = get_animation(relation, animate=False)
if 'generate_embedded_animations' in vars() and generate_embedded_animations and 'embed_animation_as_gif' in vars():
display_obj = embed_animation_as_gif(display_obj)
display.display_html(display_obj)
In [60]:
import matplotlib.pyplot as plt
relations = [
['across_misassignments', 'context_boundary_crossings'],
['context_boundary_effect', 'context_boundary_crossings'],
['accurate_misassignment_time', 'fd_space'],
['accurate_misassignment_time', 'lacunarity_space'],
['accurate_misassignment_time', 'lacunarity_spacetime'],
['time_misplacement', 'time_travelled'],
['space_misplacement', 'space_travelled']
]
relation_models = [models[x] for x in [5, 6, 3, 3, 3, 0, 1]]
def plot_residuals(r0, r1, model, title=''):
# Make new figure
plt.figure()
# Get the subject coefficients
subject_lines = pandas2ri.ri2py(r.summary(model).rx2('coefficients').rx2('random').rx2('subID'))
x = np.array([0, 1, 2, 3])
ys = [x*slope + intercept for intercept, slope in subject_lines]
# Get the relation data
r0_data = [data[r0][data['trial'] == n] for n in list(x)]
r1_data = [data[r1][data['trial'] == n] for n in list(x)]
# Compute the residuals of the target variable
r0_resid = np.transpose(np.array(ys)) - np.array(r0_data)
return get_animation_from_data(r0_resid, r1_data, title=title)
In [61]:
for relation in relations:
r0, r1 = relation
display_obj = plot_residuals(r0, r1, relation_models[0], title=r0 + ' vs. ' + r1)
if 'generate_embedded_animations' in vars() and generate_embedded_animations and 'embed_animation_as_gif' in vars():
display_obj = embed_animation_as_gif(display_obj)
display.display_html(display_obj)
In [8]:
from cogrecon.core.data_flexing.time_travel_task.time_travel_task_binary_reader import read_binary_file
import os
import numpy as np
In [17]:
#data_directory = r'C:\Users\Kevin\Documents\GitHub\msl-iposition-pipeline\examples\saved_data\Paper Data (cleaned)'
#data_file = '022_4_1_0_2016-10-12_16-10-25.dat'
data_directory = r'C:\Users\Kevin\Desktop\v6.0'
data_file = r'999_1_15_0_2018-04-09_19-13-32.dat'
iterations = read_binary_file(os.path.join(data_directory, data_file))
In [18]:
ts = [iterations[i]['time_val'] for i in range(0, len(iterations))]
xs = [iterations[i]['x'] for i in range(0, len(iterations))]
ys = [iterations[i]['z'] for i in range(0, len(iterations))]
atds = [(iterations[i]['datetime'] - iterations[0]['datetime']).total_seconds() for i in range(0, len(iterations))]
In [19]:
from math import atan2,degrees
def GetAngleOfLineBetweenTwoPoints(p1, p2):
xDiff = p2['x'] - p1['x']
yDiff = p2['y'] - p1['y']
return degrees(atan2(yDiff, xDiff))
In [20]:
import matplotlib.pyplot as plt
from matplotlib.patches import Wedge, Rectangle, Circle
fov = 60.0
idx = int(len(iterations)/1.14)
window = 2.
time_points = [4, 10, 16, 25, 34, 40, 46, 51]
space_points = [[18, -13], [-13, 9], [-10, -2], [6, -2], [17, -8], [-2, -7], [-15, -15], [6, 18], [14, 6], [-2, 10]]
e = 'k'
plt.figure(figsize=(10, 10))
plt.xlabel('Space X')
plt.ylabel('Space Y')
angle = GetAngleOfLineBetweenTwoPoints({'x':xs[idx], 'y': ys[idx]}, {'x': xs[idx+10], 'y': ys[idx+10]})
plt.gca().add_artist(Rectangle((-20,-20),40,40, color='k', alpha=0.4, fill=False))
plt.gca().add_artist(Wedge((xs[idx], ys[idx]), 10, angle-fov, angle+fov, color='r', alpha=0.4))
plt.gca().set_aspect('equal')
for i, sp in enumerate(space_points):
if i == len(time_points) - 1:
plt.gca().add_artist(Circle((sp[0], sp[1]), 10, fill=False, alpha=1, edgecolor='k'))
plt.gca().add_artist(Circle((sp[0], sp[1]), 10, color='g', alpha=0.1, edgecolor=e))
plt.scatter(np.transpose(space_points)[0], np.transpose(space_points)[1], c='g')
plt.scatter([xs[idx]], [ys[idx]], c='k', alpha=1, zorder=10)
plt.plot(xs, ys)
plt.figure(figsize=(10, 10))
plt.xlabel('Participant Time')
plt.ylabel('Timeline Time')
plt.gca().add_artist(Rectangle((0,0), atds[-1], 60, color='k', alpha=0.4, fill=False))
plt.gca().add_artist(Rectangle((0, ts[idx]-window/2.), atds[-1], window, color='r', alpha=0.4))
for i, tp in enumerate(time_points):
if i == len(time_points) - 1:
plt.gca().add_artist(Rectangle((0, tp-window/2.), atds[-1], window, fill=False, alpha=1, edgecolor='k'))
plt.gca().add_artist(Rectangle((0, tp-window/2.), atds[-1], window, color='g', alpha=0.1))
plt.scatter([atds[idx]], [ts[idx]], c='k', alpha=1, zorder=10)
plt.plot(atds, ts)
plt.gca().set_aspect('equal')
plt.show()
In [21]:
import matplotlib.pyplot as plt
from matplotlib.patches import Wedge, Rectangle
from mpl_toolkits.mplot3d import Axes3D
import mpl_toolkits.mplot3d.art3d as pl3d
fov = 60.0
idx = int(len(iterations)/1.13)
window = 2.
fig = plt.figure(figsize=(10, 10))
ax = fig.gca(projection='3d')
ax.set_xlabel('Space X')
ax.set_ylabel('Space Y')
ax.set_zlabel('Participant Time')
angle = GetAngleOfLineBetweenTwoPoints({'x':xs[idx], 'y': ys[idx]}, {'x': xs[idx+10], 'y': ys[idx+10]})
w = Wedge((xs[idx], ys[idx]), 10, angle-fov, angle+fov, color='r', alpha=0.4)
# plt.gca().add_artist(Rectangle((-20,-20),40,40, color='k', alpha=0.4, fill=False))
# plt.gca().add_artist(Wedge((xs[idx], ys[idx]), 10, angle-fov, angle+fov, color='r', alpha=0.4))
plt.gca().set_aspect('equal')
# plt.scatter([xs[idx]], [ys[idx]], c='k', alpha=1, zorder=10)
ax.plot(xs, ys, ts)
# plt.gca().add_artist(Rectangle((0, ts[idx]-window/2.), atds[-1], window, color='r', alpha=0.4))
plt.gca().set_aspect('equal')
window = 2
def extrude_polygon(pts, zmid, zmin, zmax, slices=10):
x, y = np.transpose(pts)
zs = [[zmid-offset] * len(x) for offset in np.linspace(zmin, zmax, slices)]
_polys = [zip(x, y, z) for z in zs]
return _polys
def graph_polygons_with_highlighted_edges(_polys, ax, color='r', alpha=0.1, edge_color='k', edge_alpha=0.4):
coll = pl3d.Poly3DCollection(_polys)
coll_ends = pl3d.Poly3DCollection([_polys[0], _polys[-1]])
coll.set_alpha(alpha)
coll_ends.set_alpha(edge_alpha)
coll.set_edgecolor(edge_color)
coll_ends.set_edgecolor(edge_color)
coll.set_facecolor(color)
coll_ends.set_facecolor(color)
ax.add_collection3d(coll)
ax.add_collection3d(coll_ends)
polys = extrude_polygon(w.get_path().vertices, ts[idx], -window/2.,window/2.)
graph_polygons_with_highlighted_edges(polys, ax)
for s, t in zip(space_points, time_points):
c = Circle((s[0], s[1]), 10, color='g', alpha=0.1, edgecolor=e)
pts = (c.get_path().vertices*10 + s)
polys = extrude_polygon(pts, t, -window/2.,window/2.)
graph_polygons_with_highlighted_edges(polys, ax, color='g', alpha=0.01, edge_alpha=0.1)
plt.show()
In [31]:
import cogrecon.core.data_flexing.time_travel_task.time_travel_task_to_iposition as ttt2i
In [32]:
data_directory = r'C:\Users\Kevin\Desktop\v6.0\tmp'
data_output_directory = r'C:\Users\Kevin\Desktop\v6.0'
In [40]:
ttt2i.time_travel_task_to_iposition(data_directory, data_output_directory, file_regex="\d\d\d_\d_14_\d_\d\d\d\d-\d\d-\d\d_\d\d-\d\d-\d\d.dat")
In [41]:
from cogrecon.core.batch_pipeline import batch_pipeline
from cogrecon.core.data_flexing.time_travel_task.time_travel_task_analytics import summarize_test_data
In [42]:
summarize_test_data(search_directory=data_directory, file_regex="\d\d\d_\d_14_\d_\d\d\d\d-\d\d-\d\d_\d\d-\d\d-\d\d.dat") # For Time Travel Task specific analyses
Out[42]:
In [43]:
batch_pipeline(str(data_output_directory), 'TimeTravelTask_TimeOnly.csv', data_shape=(4, 10, 3),
collapse_trials=False, trial_by_trial_accuracy=False,
dimension=3, removal_dim_indicies=[0, 1], actual_coordinate_prefixes=True)
In [ ]: