This script is designed to provide a general purpose tool for producing descriptive statistics and visualizations for ATLES data. The intent is that this notebook will provide a basic framework for you to build on.
Provide experiment details in the 'Parameters' section below, then execute notebook to generate stats.
Everytime an experiment is run ATLeS generates three files.
Broadly this notebook will:
In [1]:
from pathlib import Path
import configparser
import numpy as np
import pandas as pd
import seaborn
import matplotlib.pyplot as plt
import pingouinparametrics as pp
# add src/ directory to path to import ATLeS code
import os
import sys
module_path = os.path.abspath(os.path.join('..', 'src'))
if module_path not in sys.path:
sys.path.append(module_path)
# imported from ATLeS
from analysis.process import TrackProcessor
from analysis.plot import TrackPlotter
# displays plots in notebook output
%matplotlib inline
In [2]:
experimentname = 'ACTEST2'
trackdirectory = '../data/tracks'
experimenttype = 'extinction' # Set to 'extinction' or 'none'. Supplemental analyses are generated for extinction experiments.
Set analysis options here:
In [3]:
acquisitionlevel = .85 # Sets cut off level for excluding tracks based on poor tracking.
notriggerexclude = True # If True, excludes tracks where the trigger was never triggered. If False, includes tracks where no trigger occurred
In [4]:
framelist = [] # Collects frames generated for eventual combination
In [5]:
trackfiles = list(Path(trackdirectory).glob(f'**/*{experimentname}*track.csv'))
settingsfiles = list(Path(trackdirectory).glob(f'**/*{experimentname}*setup.txt'))
print(f'{len(trackfiles)} track files were found with the name {experimentname}')
print(f'{len(settingsfiles)} settings files were found with the name {experimentname}\n')
if len(trackfiles) != len(settingsfiles):
print('WARNING: Mismatched track and settings files.')
The number of experimental phases varies across experiments. This block identifies the phases used for the current experiment and verfies that all tracks have the same phase information.
The settings may vary between tracks within an experiment. This block also identifies the settings for each track and writes them to a dictionary.
In [6]:
Config = configparser.ConfigParser()
settingsdic ={} # Dictionary used to store all settings information.
phaselist = [] # List of phases used to verify phases are consistent across tracks.
# reads and organizes information from each settings file
for file in settingsfiles:
Config.read(file)
# generate clean list of stimuli
stiminfo = Config.get('experiment', 'stimulus') #gets stim info
stiminfo = stiminfo.replace('(', ',').replace(')', '').replace(' ', '').split(',')[1:] #cleans stim list
# generate clean list of phases
phaselisttemp = Config.get('phases', 'phases_argstrings') # gets phase info
phaselisttemp = phaselisttemp.replace('-p ', '').replace(' ', '').split(',')[:-1] #cleans phase list
# compare each phase list with the list from the previous settings file
if len(phaselist) == 0:
phaselist = phaselisttemp
elif phaselist != phaselisttemp:
print('Warning: Inconsistent phases between settings files.')
else:
pass
# counts phases and generates phase variable names
phasenumber = len(phaselist)//2
phasenames = []
for i in range(phasenumber):
p, t, s = 'phase', 'time', 'stim'
phase = p+str(i+1)
phasetime = phase + t
phasestim = phase + s
phasenames.extend((phasetime, phasestim))
# gets settings info from filename (track/box)
trackname = file.parts[-1].replace("-setup.txt", "")
box = file.parts[-2]
# gets settings info from setting file
controller = Config.get('experiment', 'controller')
trigger = Config.get('experiment', 'trigger')
settings = [phaselisttemp, controller, trigger, stiminfo, box, str(file)]
# puts all settings in dic keyed to trackname
settingsdic[trackname] = settings
# creates settings dataframe from settingsdic
dfsettings = pd.DataFrame(settingsdic).transpose()
dfsettings.columns = ['phases', 'controller', 'trigger', 'stimulus', 'box', 'file']
dfsettings['track'] = dfsettings.index
# creates stimulus dataframe, splits up and names stims
dfstim = pd.DataFrame(dfsettings.stimulus.values.tolist(), index=dfsettings.index).fillna('-')
for col in range(dfstim.shape[1]):
dfstim=dfstim.rename(columns = {col:('stim_setting' + str(col))})
framelist.append(dfsettings)
dfsettings.head(3)
Out[6]:
This block extracts phase info from settings w. trackname and calculates phasetimes.
This code currently assummes all phase time are the same across tracks within the experiment. This will need to be rewritten if we want to start running analyses across multiple studies with different phase times.
In [7]:
phaseinfo = settingsdic.get(trackname)[0]
phaseinfo = [x for x in phaseinfo if any(c.isdigit() for c in x)]
phaseinfo = list(map(int, phaseinfo))
phaseinfo = [i * 60 for i in phaseinfo]
phaselen = len(phaseinfo)
phaset = []
for i in range(phaselen):
times = sum(phaseinfo[0:i+1])
phaset.append(times)
# moves 0 to the first entry of phaset (works, but find a cleaner way to do this)
a = 0
phaset[0:0] = [a]
phasedic = {}
for i in range(phaselen):
phasedic[i+1] = [phaset[i], phaset[i+1]]
# splits up and names the phases
dfphase = pd.DataFrame(dfsettings.phases.values.tolist(), index=dfsettings.index).fillna('-')
dfphase.columns = phasenames
phasenum = len(dfphase.columns)//2
framelist.append(dfphase)
dfphase.head(3)
Out[7]:
In [8]:
dfstats = pd.DataFrame()
for track in trackfiles:
# gets track from file name
trackname = track.parts[-1].replace("-track.csv", "")
# gets stats from TrackProcessor (ATLeS analysis class)
processor = TrackProcessor(str(track), normalize_x_with_trigger='xpos < 0.50')
tempstatsdic = processor.get_stats(include_phases=True) # gets stats from track object
# flattens dictionary into dataframe, from https://stackoverflow.com/questions/13575090/
dftemp = pd.DataFrame.from_dict({(i,j): tempstatsdic[i][j] for i in tempstatsdic.keys() for j in tempstatsdic[i].keys()}, orient='index')
#transposes dataframe and adds track as index
dftemp = dftemp.transpose()
dftemp['track'] = trackname
dftemp.set_index('track', inplace=True)
dfstats = dfstats.append(dftemp, sort=True)
if 'phase 0' in dfstats.columns:
dfstats.rename({'phase 0': 'p1', 'phase 1': 'p2', 'phase 2': 'p3'}, axis='columns', inplace = True)
dfstats.columns = dfstats.columns.map('|'.join)
framelist.append(dfstats)
dfstats.head(3)
Out[8]:
In [9]:
if experimenttype == 'extinction':
dfextstats = pd.DataFrame()
for track in trackfiles:
# gets track from file name
trackname = track.parts[-1].replace("-track.csv", "")
# gets advances stats from TrackProcessor (ATLeS analysis class)
processor = TrackProcessor(str(track)) # passes track to track processor and returns track object
tempstatsdic = processor.get_exp_stats('extinction') # gets stats from track object
dftemp3 = pd.DataFrame(tempstatsdic, index=[0])
dftemp3['track'] = trackname
dftemp3.set_index('track', inplace=True)
dfextstats = dfextstats.append(dftemp3, sort=True)
framelist.append(dfextstats)
else:
print('Extinction experiment not selected in Parameters section.')
dfextstats.head(3)
Out[9]:
In [10]:
df = pd.concat(framelist, axis=1, sort=False) # combines all frames
df.dropna(axis=0, how='all', inplace=True) # drops any rows where all values are missing
df.head(3)
Out[10]:
In [11]:
print(f'Dataframe Shape:{df.shape}')
print()
print('Column Names by DataType')
for dt in df.dtypes.unique():
print(f'Data Type, {dt}:')
print(*list(df.select_dtypes(include=[dt]).columns), sep = ', ')
print()
# print('Number of Tracks with Null Data by Column:') #fix this
# print(df[df.isnull().any(axis=1)][df.columns[df.isnull().any()]].count())
# print()
In [12]:
print(f'''Track Times: Mean {df['all|Total time (sec)'].mean()}, Minimum {df['all|Total time (sec)'].min()}, Maximum {df['all|Total time (sec)'].max()}, Count {df['all|Total time (sec)'].count()}''')
fig, ax = plt.subplots(1, 1, figsize=(6, 6))
ax.ticklabel_format(useOffset=False) # prevents appearance of scientific notation on y axis
df.boxplot(column='all|Total time (sec)', by='box', ax=ax)
Out[12]:
In [13]:
print(f'''Valid Datapoints: Mean {df['all|%Valid datapoints'].mean()}, Minimum {df['all|%Valid datapoints'].min()}, Maximum {df['all|%Valid datapoints'].max()}, Count {df['all|%Valid datapoints'].count()}''')
fig, ax = plt.subplots(1, 1, figsize=(6, 6))
df.boxplot(column='all|%Valid datapoints', by='box', ax=ax)
Out[13]:
In [14]:
print(f'''Number of Triggers: Mean {df['phase 2|#Triggers'].mean()}, Minimum {df['all|#Triggers'].min()}, Maximum {df['all|#Triggers'].max()}, Count {df['all|#Triggers'].count()}''')
fig, ax = plt.subplots(1, 1, figsize=(6, 6))
df.boxplot(column='phase 2|#Triggers', by='box', ax=ax)
Out[14]:
In [15]:
print(f'Raw Track Number: {df.shape[0]}')
df = df.drop(df[df['all|Total time (sec)'] < (df['all|Total time (sec)'].mean())* .75].index) # drops rows if any data is missing, this will remove early termination tracks
print(f'Modified Track Number: {df.shape[0]} (following removal of tracks less than 75% the length of the experiment mean)')
df = df.drop(df[df['all|%Valid datapoints'] < acquisitionlevel].index)
print(f'Modified Track Number: {df.shape[0]} (following removal for poor tracking set at less than {acquisitionlevel}% valid datapoints)')
if notriggerexclude == True:
df = df.drop(df[df['phase 2|#Triggers'] == 0].index) # drops rows if there was no trigger during phase 2; NOTE: fix this so it works if learning phase is not 2
print(f'Modified Track Number: {df.shape[0]} (following removal of tracks with no triggers during the learning)')
In [16]:
dftrig = df.groupby('box')['trigger'].describe()
dftrig
Out[16]:
In [17]:
boxlist = df.box.unique().tolist() #creates a list of all boxes in the experiment
onetriglist = dftrig.index[dftrig.unique < 2].tolist() # creates a list of boxes with less than 2 trigger conditions
boxlist = [x for x in boxlist if x not in onetriglist] # removes boxes with less than 2 trigger conditions
if len(onetriglist) > 0:
print(f'WARNING: The following boxes had only one trigger condition: {onetriglist}. These boxes removed from trigger analyses below.')
else:
pass
In [18]:
print(f'Trigger Conditions: {df.trigger.unique()}')
print()
from scipy.stats import ttest_ind
# performs welch's t-test (does not assume equal variances) on all floats and prints any that are signficantly different as a function of trigger
for i in df.select_dtypes(include=['float64']).columns:
for b in boxlist:
dfbox = df[df.box == b]
ttest_result = ttest_ind(dfbox[dfbox.trigger == dfbox.trigger.unique()[0]][i], dfbox[dfbox.trigger == dfbox.trigger.unique()[1]][i], equal_var=False, nan_policy='omit')
if ttest_result.pvalue < (.05/len(df.select_dtypes(include=['float64']).columns)):
print(i)
print(f' {b}: Welchs T-Test indicates significant difference by trigger condition, p = {ttest_result.pvalue}')
print(f' Trigger Condition 1 Mean: {dfbox[dfbox.trigger == dfbox.trigger.unique()[0]][i].mean()}')
print(f' Trigger Condition 2 Mean: {dfbox[dfbox.trigger == dfbox.trigger.unique()[1]][i].mean()}')
print()
In [31]:
def betweensubjectANOVA (dependentvar, betweenfactor, suppress):
try:
anovaresult = pp.anova(dv=dependentvar, between=betweenfactor, data=df, detailed=True, export_filename=None)
pvalue = anovaresult.loc[anovaresult.Source==betweenfactor]['p-unc'].values[0]
if pvalue >= .05/len(df.select_dtypes(include=['float64']).columns):
if suppress == False:
print(f'{dependentvar}')
print(f' NOT significant: One-way ANOVA conducted testing {betweenfactor} as significant predictor of {dependentvar}. P = {pvalue}')
print()
else:
pass
else:
print(f'{dependentvar}')
print(f' SIGNIFICANT: One-way ANOVA conducted testing {betweenfactor} as significant predictor of {dependentvar}. P = {pvalue}')
fig, ax = plt.subplots(1, 1, figsize=(6, 6))
df.boxplot(column=dependentvar, by=betweenfactor, ax=ax)
print()
except:
print(f'{dependentvar} analysis failed. Check descriptives.')
for col in df.select_dtypes(include=['float64']).columns:
betweensubjectANOVA(col,'box', True)
In [33]:
fig, ax = plt.subplots(1, 3, figsize=(15, 6), sharey=True)
df.boxplot(column=['phase 1|Avg. normed x coordinate', 'phase 2|Avg. normed x coordinate', 'phase 3|Avg. normed x coordinate'], by='box', ax=ax)
Out[33]:
In [21]:
fig, ax = plt.subplots(1, 3, figsize=(15, 6), sharey=True)
df.boxplot(column=['phase 1|Avg. y coordinate', 'phase 2|Avg. y coordinate', 'phase 3|Avg. y coordinate'], by='box', ax=ax)
Out[21]:
In [22]:
fig, ax = plt.subplots(1, 1, figsize=(15, 6))
df.boxplot(column=['phase 1|Avg. normed x coordinate', 'phase 2|Avg. normed x coordinate', 'phase 3|Avg. normed x coordinate'], ax=ax)
Out[22]:
In [34]:
fig, ax = plt.subplots(1, 1, figsize=(15, 6))
df.boxplot(column=['phase 1|Avg. y coordinate', 'phase 2|Avg. y coordinate', 'phase 3|Avg. y coordinate'], ax=ax)
Out[34]:
In [26]:
plotter = TrackPlotter(processor)
plotter.plot_heatmap(plot_type='per-phase')
In [28]:
# 'phase 1|Avg. normed x coordinate', 'phase 2|Avg. normed x coordinate', 'phase 3|Avg. normed x coordinate'
# aov = rm_anova(dv='DV', within='Time', data=df, correction='auto', remove_na=True, detailed=True, export_filename=None)
# print_table(aov)
In [29]:
phasenumcount = 1
dependentvar = 'Avg. normed x coordinate'
dfanova = pd.DataFrame()
while phasenumcount <= phasenum:
colname = f'phase {str(phasenumcount)}|{dependentvar}'
dftemp = df[[colname]].copy()
dftemp.columns.values[0] = dependentvar
dftemp['phase'] = phasenumcount
dfanova = dfanova.append(dftemp)
phasenumcount +=1
pp.rm_anova(dv='Avg. normed x coordinate', within='phase', data=dfanova, correction='auto', remove_na=True, detailed=False, export_filename=None)
Out[29]: