In [1]:
from __future__ import division
import pandas as pd
import numpy as np
import os
import re
import copy
from pprint import pprint
from glob import glob
import cPickle as pkl
In [2]:
def create_localizer_block_single_effector(n_trials, response_modality='eye',
block_number=None, pseudorandomize=True,
add_timing=True, null_trials=0, TR=2):
# Only two trial types here: 'left' is correct, or 'right' is correct
trial_types = np.repeat([0, 1], repeats=n_trials/2) # left/right
# Initialize arrays
cue_by_trial = np.zeros(n_trials, dtype='<U5')
correct_answers = np.zeros(n_trials, dtype=np.int8)
# Define the cues for every trial
cue_by_trial[(trial_types == 0)] = 'LEFT'
cue_by_trial[(trial_types == 1)] = 'RIGHT'
# cue_by_trial[(trial_types == 2) | (trial_types == 3)] = 'anti'
# Define the responses ('correct answers')/directions for every trial
correct_answers[trial_types == 0] = 0
correct_answers[trial_types == 1] = 1
# correct_answers[(trial_types == 0) | (trial_types == 2)] = 0 # 0 = respond LEFT
# correct_answers[(trial_types == 1) | (trial_types == 3)] = 1 # 1 = respond RIGHT
# Create dataframe for easier handling
trial_data = pd.DataFrame({'correct_answer': correct_answers,
'cue': cue_by_trial,
'trial_type': trial_types})
# Should we pseudorandomize?
if pseudorandomize:
trial_data = Pseudorandomizer(trial_data, max_identical_iters={'correct_answer': 3,
'cue': 3}).run()
trial_data['null_trial'] = False
if null_trials > 0:
trial_data = add_pseudorandom_null_trials(trial_data, n_null_trials=null_trials)
# Add block number for completeness
if block_number is not None:
trial_data['block'] = block_number
trial_data['block_type'] = 'localizer'
# Usually, we also want to add the duration of all the 'trial phases'
if add_timing:
# Set phase_3 and phase_0 to 0 (no post-cue fixcross, no feedback)
trial_data = get_localizer_timing(trial_data, TR=TR)
trial_data['response_modality'] = response_modality.lower()
return trial_data
In [3]:
def create_cognitive_block(n_trials, block_number=None, response_modality=None,
add_timing=True, pseudorandomize=True, n_null_trials=0, TR=2):
"""
Creates a block of SAT-trials; mixing speed and accuracy trials
"""
trial_types = np.hstack((np.repeat([0, 1], repeats=n_trials / 4), # SPEED cue, left/right corr
np.repeat([2, 3], repeats=n_trials / 4))) # ACCURACY cue, left/right corr
if trial_types.shape[0] != n_trials:
raise(ValueError('The provided n_trials (%d) could not be split into the correct number of trial types. '
'Closest option is %d trials' % (n_trials, trial_types.shape[0])))
cue_by_trial = np.zeros(n_trials, dtype='<U5')
correct_answers = np.zeros(n_trials, dtype=np.int8)
cue_by_trial[(trial_types == 0) | (trial_types == 1)] = 'SPD'
cue_by_trial[(trial_types == 2) | (trial_types == 3)] = 'ACC'
correct_answers[(trial_types == 0) | (trial_types == 2)] = 0 # 0 = left is correct
correct_answers[(trial_types == 1) | (trial_types == 3)] = 1 # 1 = right is correct
# Create dataframe for easier handling
trial_data = pd.DataFrame({'correct_answer': correct_answers,
'cue': cue_by_trial,
'trial_type': trial_types})
if pseudorandomize:
trial_data = Pseudorandomizer(trial_data,
max_identical_iters={'cue': 5, 'correct_answer': 5}).run()
if n_null_trials > 0:
trial_data['null_trial'] = False
trial_data = add_pseudorandom_null_trials(trial_data,
n_null_trials=n_null_trials,
null_column_name='null_trial')
if block_number is not None:
trial_data['block'] = block_number
if response_modality is not None:
trial_data['response_modality'] = response_modality
trial_data['block_type'] = 'cognitive_%s' % response_modality
if add_timing:
while True:
trial_data = get_block_timing(trial_data, TR=TR) # Add default timing
if check_good_ITI_phase0(trial_data):
break
return trial_data
In [4]:
def create_limbic_block(n_trials, subject_number=1, block_number=None,
response_modality=None, add_timing=True, pseudorandomize=True,
n_null_trials=0, TR=2):
trial_types = np.hstack((np.repeat([0, 1], repeats=n_trials/4), # Neutral cue, left/right corr
np.repeat([2, 3], repeats=n_trials/8), # Left cue, left/right corr
np.repeat([4, 5], repeats=n_trials/8))) # Right cue, left/right corr
if trial_types.shape[0] != n_trials:
raise(ValueError('The provided n_trials (%d) could not be split into the correct number of trial types. '
'Closest option is %d trials' % (n_trials, trial_types.shape[0])))
cue_by_trial = np.zeros(n_trials, dtype='<U5')
correct_answers = np.zeros(n_trials, dtype=np.int8)
cue_by_trial[(trial_types == 0) | (trial_types == 1)] = 'NEU'
cue_by_trial[(trial_types == 2) | (trial_types == 3)] = 'LEFT'
cue_by_trial[(trial_types == 4) | (trial_types == 5)] = 'RIGHT'
correct_answers[(trial_types == 0) |
(trial_types == 2) |
(trial_types == 4)] = 0 # 0 = left is correct
correct_answers[(trial_types == 1) |
(trial_types == 3) |
(trial_types == 5)] = 1 # 1 = right is correct
# Create dataframe for easier handling
trial_data = pd.DataFrame({'correct_answer': correct_answers,
'cue': cue_by_trial,
'trial_type': trial_types})
if pseudorandomize:
trial_data = Pseudorandomizer(trial_data,
max_identical_iters={'cue': 4, 'correct_answer': 4}).run()
if n_null_trials > 0:
trial_data['null_trial'] = False
trial_data = add_pseudorandom_null_trials(trial_data,
n_null_trials=n_null_trials,
null_column_name='null_trial')
if block_number is not None:
trial_data['block'] = block_number
if response_modality is not None:
trial_data['response_modality'] = response_modality
trial_data['block_type'] = 'limbic_%s' % response_modality
if add_timing:
while True:
trial_data = get_block_timing(trial_data, TR=TR) # Add default timing
if check_good_ITI_phase0(trial_data):
break
return trial_data
In [5]:
def get_localizer_timing(trial_data, phase_0=None, phase_1=None, phase_2=None, phase_3=None, phase_4=None, phase_5=None, phase_6=None, TR=2):
"""
Each localizer trial consists of 7 phases.
In phase_0, we wait for the scanner pulse. Note that phase_0 of trial n is the ITI after trial n-1. Set this timing always to 0: it is the `minimum` time to wait for the pulse
In phase_1, we show the pre-cue fixation cross. By default, timing is jittered (0s, 0.5s, 1s, 1.5s if TR=2 -- 0, .75, 1.5, 2.25 if TR=3).
In phase_2, we show the cue. Follows an exponential distrbibu.
In phase_3, we show the post-cue fixation cross. Defaults to 0s.
In phase_4, we assume the participant responds, and wait a bit until we show the fix cross. Defaults to 0.6s
In phase_5 and phase_6, we do nothing (exist for compatibility with the experimental blocks)
Phase_7 is ITI
"""
if TR == 2:
trial_data['phase_0'] = 0 if phase_0 is None else phase_0
trial_data['phase_1'] = np.random.choice([0.2, .7, 1.2, 1.7], size=trial_data.shape[0]) if phase_1 is None else phase_1
trial_data['phase_2'] = 0.8 if phase_2 is None else phase_2
trial_data['phase_3'] = 0 if phase_3 is None else phase_3
trial_data['phase_4'] = 0.6 if phase_4 is None else phase_4
trial_data['phase_5'] = 0 if phase_5 is None else phase_5
trial_data['phase_6'] = 0 if phase_6 is None else phase_6
elif TR == 3:
trial_data['phase_0'] = 0 if phase_0 is None else phase_0
trial_data['phase_1'] = np.random.choice([0, .750, 1.500, 2.250], size=trial_data.shape[0]) if phase_1 is None else phase_1
trial_data['phase_2'] = np.round(np.random.exponential(scale=1/6, size=trial_data.shape[0])+.8, 3) if phase_2 is None else phase_2
# trial_data['phase_2'] = np.round(np.random.uniform() ) if phase_2 is None else phase_2
trial_data['phase_3'] = 0 if phase_3 is None else phase_3
trial_data['phase_4'] = 0.8 if phase_4 is None else phase_4
trial_data['phase_5'] = 0 if phase_5 is None else phase_5
trial_data['phase_6'] = 0 if phase_6 is None else phase_6
# Calculate duration of trial (depends on random, jittered durations of the fix cross)
trial_data['trial_duration'] = trial_data[['phase_' + str(x) for x in range(7)]].sum(axis=1)
# Because of TR = 2s, some trials can last 8 seconds, but most will last 10. Find trials with total time < 8 seconds
# We calculate the ITI as the difference between the minimum number of pulses necessary for all phases to show.
# [same applies to TR = 3]
# min_TRs = np.ceil(trial_data['trial_duration'].values / TR)
# trial_data['phase_7'] = min_TRs*TR - trial_data['trial_duration'].values
trial_data['phase_7'] = 6 - trial_data['trial_duration'].values
# Recalculate trial duration so it includes the ITI
trial_data['trial_duration'] = trial_data[['phase_' + str(x) for x in range(8)]].sum(axis=1)
# Add trial start times relative to start of block
trial_data['trial_start_time_block'] = trial_data['trial_duration'].shift(1).cumsum()
trial_data.loc[0, 'trial_start_time_block'] = 0
# Add cue onset times relative to start of block
trial_data['cue_onset_time_block'] = trial_data['trial_start_time_block'] + \
trial_data['phase_1']
# Add stimulus onset times relative to start of block
trial_data['stimulus_onset_time_block'] = trial_data['trial_start_time_block'] + \
trial_data['phase_1'] + \
trial_data['phase_2'] + \
trial_data['phase_3']
return trial_data
In [6]:
def get_block_timing(trial_data, phase_0=None, phase_1=None, phase_2=None, phase_3=None, phase_4=None, phase_5=None, phase_6=None, TR=2):
"""
Each trial consists of 7 phases.
In phase_0, we wait for the scanner pulse. Note that phase_0 of trial n is the ITI after trial n-1. Set this timing always to 0: it is the `minimum` time to wait for the pulse
In phase_1, we show the pre-cue fixation cross. By default, timing is jittered (0s, 0.5s, 1s, 1.5s)
In phase_2, we show the cue. In decision-making trials, this is 4.8 seconds.
In phase_3, we show the post-cue fixation cross. Timing is jittered (0s, 0.5s, 1s, 1.5s)
In phase_4, we show the stimulus. Default is 1.5s.
Phase 5 is defined as the period of stimulus presentation, after the participant made a response. The duration is determined by the participant RT, so not set here.
In phase_6, we show feedback. Default is 0.35s.
"""
if TR == 2:
trial_data['phase_0'] = 0 if phase_0 is None else phase_0
trial_data['phase_1'] = np.random.choice([0, .5, 1, 1.5], size=trial_data.shape[0]) if phase_1 is None else phase_1
trial_data['phase_2'] = 4.8 if phase_2 is None else phase_2
trial_data['phase_3'] = np.random.choice([0, .5, 1, 1.5], size=trial_data.shape[0]) if phase_3 is None else phase_3
trial_data['phase_4'] = 2 if phase_4 is None else phase_4
trial_data['phase_5'] = 0 if phase_5 is None else phase_5
trial_data['phase_6'] = 0.35 if phase_6 is None else phase_6
elif TR == 3:
trial_data['phase_0'] = 0 if phase_0 is None else phase_0
trial_data['phase_1'] = np.random.choice([0, .750, 1.500, 2.250], size=trial_data.shape[0]) if phase_1 is None else phase_1
trial_data['phase_2'] = 1 if phase_2 is None else phase_2
trial_data['phase_3'] = np.random.choice([0.750, 1.500, 2.250, 3.000], size=trial_data.shape[0]) if phase_3 is None else phase_3
trial_data['phase_4'] = 2 if phase_4 is None else phase_4
trial_data['phase_5'] = 0 if phase_5 is None else phase_5
trial_data['phase_6'] = 0.5 if phase_6 is None else phase_6
# Calculate duration of trial (depends on random, jittered durations of the fix cross)
trial_data['trial_duration'] = trial_data[['phase_' + str(x) for x in range(7)]].sum(axis=1)
if TR == 2:
# Because of TR = 2s, some trials can last 8 seconds, but most will last 10. Find trials with total time < 8 seconds
# We calculate the ITI as the difference between the minimum number of pulses necessary for all phases to show.
min_TRs = np.ceil(trial_data['trial_duration'].values / TR)
trial_data['phase_7'] = min_TRs*TR - trial_data['trial_duration'].values
elif TR == 3:
# In this case, fill all trials until 9s have passed.
trial_data['phase_7'] = 9 - trial_data['trial_duration'].values
# Recalculate trial duration so it includes the ITI
trial_data['trial_duration'] = trial_data[['phase_' + str(x) for x in range(8)]].sum(axis=1)
# Add trial start times relative to start of block
trial_data['trial_start_time_block'] = trial_data['trial_duration'].shift(1).cumsum()
trial_data.loc[0, 'trial_start_time_block'] = 0
# Add cue onset times relative to start of block
trial_data['cue_onset_time_block'] = trial_data['trial_start_time_block'] + \
trial_data['phase_1']
# Add stimulus onset times relative to start of block
trial_data['stimulus_onset_time_block'] = trial_data['trial_start_time_block'] + \
trial_data['phase_1'] + \
trial_data['phase_2'] + \
trial_data['phase_3']
return trial_data
In [7]:
def check_good_ITI_phase0(data):
# Get rid of Null Trials, and the trials before the Null Trials
nulls = data[data['null_trial'] == True].index.values
nulls = np.hstack((nulls, data.shape[0]-1))
start_id = 0
for end_id in nulls:
data_subset = data.iloc[np.arange(start_id, end_id)].copy()
# Shift rows in column phase_1
data_subset['phase_1'] = data_subset['phase_1'].shift(-1)
# Check whether (shifted) phase_1 values == phase_7 values, AND phase_1 values is 0.
idx = (data_subset['phase_1'].values == data_subset['phase_7'].values) & (data_subset['phase_1'].values == 0.00)
if np.sum(idx) > 0:
# print(data_subset[['phase_1', 'phase_7']])
return False
start_id = end_id + 1
return True
# dat = create_cognitive_block(n_trials=100,
# block_number=1,
# response_modality='hand',
# n_null_trials=7,
# TR=3)
# print(dat[['phase_1', 'phase_7']].head(6))
# check_good_ITI_phase0(dat)
In [8]:
class Pseudorandomizer(object):
def __init__(self, data, max_identical_iters={'cue': 4, 'correct_answer': 4}):
self.data = data
self.max_identical_iters = {x: y+1 for x, y in max_identical_iters.items()}
# add 1: if 4 rows is allowed, only give an error after 5 identical rows
def check_trial_rows(self, data, row_n):
"""
Returns True if any of the conditions for pseudorandomization are violated for the given rows,
False if they are fine.
"""
# First, check for the maximum iterations
for column, max_iter in self.max_identical_iters.items():
if row_n - max_iter < 0:
continue
# Select rows [max_iter-1 - row_n] we're going to check. Never select any row with index < 0
row_selection = [x for x in np.arange(row_n, row_n-max_iter, -1)]
# Next, we check if the selected rows only contain *1* trial type.
# If so, this means we have max_iter rows of the same trials, and we need to change something.
if data.iloc[row_selection][column].nunique() == 1:
return True
return False
def run(self, debug=False):
"""
Pseudorandomizes: makes sure that it is not possible to have more than x iterations for every type of column, specified in columns.
"""
# Start by copying from original data, and shuffle
self.data = self.data.sample(frac=1,
random_state=np.random.randint(0, 1e7, dtype='int')).reset_index(drop=True)
if debug:
outer_while_i = 0
debug_print_after_i = 100
good_set = False
while not good_set:
if debug:
outer_while_i += 1
reshuffle = False # Assume the dataset does not need reshuffling.
for row_n in range(0, self.data.shape[0]):
# Select rows [max_iter-1 - row_n] we're going to check
# Check if the current row, and the (max_iters-1) rows before, are the same value (number of unique values = 1).
# If so, then move the current row number to the bottom of the dataframe. However, we need to re-check the same four rows again
# after moving a row to the bottom: therefore, a while loop is necessary.
checked_row = False
n_attempts_at_moving = 0
if debug:
inner_while_i = 0
while not checked_row:
if debug:
inner_while_i += 1
if inner_while_i > debug_print_after_i:
print('New inner loop started for current row')
if self.check_trial_rows(self.data, row_n):
if debug and inner_while_i > debug_print_after_i:
print('Found too many consecutively identical rows.')
# If there are too many consecutively identical rows at the bottom of the dataframe,
# break and start over/shuffle
if row_n >= (self.data.shape[0] - self.max_identical_iters[self.max_identical_iters.keys()[0]]):
if debug and inner_while_i > debug_print_after_i:
print('These occurred at row_n %d, which is at the bottom of the DF.' % row_n)
checked_row = True
reshuffle = True
# Too many consecutive identical rows? Move row_n to the bottom, and check again with the new row_n.
else:
if debug and inner_while_i > debug_print_after_i:
print('These occurred at row_n %d. Checking the remainder of the DF.' % row_n)
# Check if moving to the bottom even makes sense: if all remaining values are identical, it doesn't.
if (self.data.iloc[row_n:][self.max_identical_iters.keys()].nunique().values < 2).any():
if debug and inner_while_i > debug_print_after_i:
print('All remaining values are identical. I should stop the for-loop, and start over.')
checked_row = True
reshuffle = True
else:
if n_attempts_at_moving < 50:
n_attempts_at_moving += 1
if debug and inner_while_i > debug_print_after_i:
print('Not all remaining values are identical. I should move the final part to the bottom.')
# If not, move the current row to the bottom
row_to_move = self.data.iloc[row_n,:]
# Delete row from df
self.data.drop(row_n, axis=0, inplace=True)
# Append original row to end. Make sure to reset index
self.data = self.data.append(row_to_move).reset_index(drop=True)
# If we already tried moving the current row to the bottom for 50 times, let's forget about it and restart
else:
checked_row = True
reshuffle = True
else:
if debug and inner_while_i > debug_print_after_i:
print('Checked row, but the row is fine. Next row.')
checked_row = True
if reshuffle:
good_set = False
break # out of the for loop
# Reached the bottom of the dataframe, but no reshuffle call? Then we're set.
if row_n == self.data.shape[0]-1:
good_set = True
if reshuffle:
# Shuffle, reset index to ensure trial_data.drop(row_n) rows
self.data = self.data.sample(frac=1, random_state=np.random.randint(0, 1e7, dtype='int')).reset_index(drop=True)
return self.data
def add_pseudorandom_null_trials(data, min_row=4, max_row=4, min_n_rows_separate=7,
n_null_trials=10, null_column_name=''):
"""
Adds null trials interspersed at pseudorandom locations. You can determine the minimum
number of trials at the start before a null trial, the minimum number of trials at the end in which no
nulls are shown, and the minimum number of trials that the null trials have to be separated
"""
good_idx = False
while not good_idx:
indx = np.random.choice(np.arange(min_row, data.shape[0]-max_row),
replace=False, size=n_null_trials)
diffs = np.diff(np.sort(indx))
if (diffs >= min_n_rows_separate).all():
good_idx = True
data.index = np.setdiff1d(np.arange(data.shape[0] + n_null_trials), indx)
new_rows = pd.DataFrame({null_column_name: [True]*n_null_trials}, columns=data.columns, index=indx)
data = data.append(new_rows).sort_index()
# Always end with a null trial
last_row = pd.DataFrame({null_column_name: [True]*1}, columns=data.columns, index=[data.shape[0]])
data = data.append(last_row).sort_index()
return data
In [9]:
def create_localizer(n_localizer_blocks, n_trials_per_localizer_block, localizer_order, block_number=0, pseudorandomize=True, TR=3):
# Initiate empty dataframe, generate block
block_data = pd.DataFrame()
for localizer_block in range(int(n_localizer_blocks/2)):
loc_block = create_localizer_block_single_effector(n_trials=n_trials_per_localizer_block,
response_modality=localizer_order[0],
block_number=block_number,
pseudorandomize=pseudorandomize, TR=TR)
block_data = block_data.append(loc_block)
loc_block = create_localizer_block_single_effector(n_trials=n_trials_per_localizer_block,
response_modality=localizer_order[1],
block_number=block_number,
pseudorandomize=pseudorandomize, TR=TR)
block_data = block_data.append(loc_block)
return block_data
In [10]:
# Function to convolve design
from nipy.modalities.fmri import hrf, utils
def stim_to_design(pp_design, block=None, silent=False):
# Check if we only need to do a subset of the design
if block is not None:
pp_design = pp_design.loc[pp_design['block'] == block]
# Get rid of null trials
pp_design = pp_design.loc[pp_design['null_trial'] == False,:]
# hrf.glover is a symbolic function; get a function of time to work on arrays
hrf_func = utils.lambdify_t(hrf.glover(utils.T))
max_time = np.ceil((pp_design['stimulus_onset_time'].max()+25)*10)
if 0 in pp_design['block'].unique():
block0_trials = pp_design.loc[pp_design['block'] == 0]
# Get cue-types and response types for the first block
loc_cue_vec = np.zeros(shape=(int(max_time), 4))
response_vec = np.zeros(shape=(int(max_time), 4))
loc_cue_names = []
response_names = []
i = -1
for effector_type in block0_trials['response_modality'].unique():
for cue_type in block0_trials['cue'].unique():
subset = pp_design.loc[(pp_design['block'] == 0 ) &
(pp_design['response_modality'] == effector_type) &
(pp_design['cue'] == cue_type)]
i += 1
response_names.append('resp_%s_%s' % (effector_type, cue_type))
loc_cue_names.append('cue_%s_%s' % (effector_type, cue_type))
# Get cue onsets & durations
onsets = np.round(subset['cue_onset_time'].values*10)
durations = np.round(subset['phase_2'].values*10)
for onset, duration in zip(onsets, durations):
loc_cue_vec[np.arange(onset, onset+duration, dtype='int'), i] = 1
# Get response onsets & durations
onsets = np.round(subset['stimulus_onset_time'].values*10)
durations = np.round(subset['phase_4'].values*10)
for onset, duration in zip(onsets, durations):
response_vec[np.arange(onset, onset+duration, dtype='int'), i] = 1
# For all further EVs, make sure not to include the localizer trials.
pp_design = pp_design.loc[pp_design['block'] > 0]
# 10 types of stimuli: (n_cue_types) * (n_stim_types)
stim_vec = np.zeros(shape=(int(max_time), pp_design['correct_answer'].nunique()*pp_design['cue'].nunique()))
stim_names = []
# Get stimulus onsets and durations
i = -1
for stim_type in pp_design['correct_answer'].unique():
for cue_type in pp_design['cue'].unique():
i += 1
stim_names.append('stimulus_' + str(int(stim_type)) + '_' + cue_type)
subset = pp_design.loc[(pp_design['correct_answer'] == stim_type) &
(pp_design['cue'] == cue_type)]
stim_onsets = np.round(subset['stimulus_onset_time'].values*10)
stim_durations = np.round(subset['phase_4'].values*10)
for onset, duration in zip(stim_onsets, stim_durations):
# stim_vec = np.hstack((stim_vec, np.zeros(shape=(int(max_time), 1))))
stim_vec[np.arange(onset, onset+duration, dtype='int'), i] = 1
# Get cue onsets by cue type
cue_names = []
n_conditions = len(np.unique(pp_design['cue']))
cue_vec = np.zeros(shape=(int(max_time), n_conditions)) # A column per cue type condition
i = -1
for condition in pp_design['cue'].unique():
i += 1
cue_names.append('cue_' + condition)
# Find cue onsets
onsets = np.round(pp_design.loc[pp_design['cue'] == condition, 'cue_onset_time'].values*10)
durations = np.round(pp_design.loc[pp_design['cue'] == condition, 'phase_2'].values*10)
for onset, duration in zip(onsets, durations):
cue_vec[np.arange(onset, onset+duration, dtype='int'), i] = 1
# Combine everything in a single array
if 'loc_cue_vec' in locals():
ev_vec = np.hstack((loc_cue_vec, response_vec, cue_vec, stim_vec))
ev_names = loc_cue_names + response_names + cue_names + stim_names
else:
ev_vec = np.hstack((cue_vec, stim_vec))
ev_names = cue_names + stim_names
# Create hrf to convolve with
hrf_full = hrf_func(np.linspace(0, stop=int(max_time/10), num=int(max_time)))
# Pre-allocate output. This will be an n_timepoints x n_conditions+1 matrix.
X = np.empty(shape=(int(max_time), ev_vec.shape[1]))
# Convolve everything: the stimulus first, the cues afterwards.
for i, ev_name in enumerate(ev_names):
if not silent:
print('Convolving %s...' % ev_name)
X[:, i] = np.convolve(hrf_full, ev_vec[:, i])[:int(max_time)]
return X, ev_names
In [11]:
def optimize_brute_force(n_trials,
c, # contrasts to be optimized
run_type='localizer',
block_number=0,
pseudorandomize=True,
TR=3,
n_attempts=1e4,
# For non-localizer:
n_null_trials=0,
response_modality='hand',
# for localizer:
n_localizer_blocks=None, localizer_order=None):
""" Performs a brute force search of the best possible trial order & jitter times for a single run """
n_attempts = int(n_attempts)
# Generate n_attempt seeds to check
seeds = np.round(np.random.uniform(low=int(0), high=int(2**32 - 1), size=n_attempts)).astype(int)
effs = np.zeros(shape=seeds.shape)
best_eff = 0
best_block = None
for i in range(n_attempts):
# Set seed
np.random.seed(seed=seeds[i])
# Generate run
if run_type == 'localizer':
block_data = create_localizer(n_localizer_blocks=n_localizer_blocks,
n_trials_per_localizer_block=n_trials,
localizer_order=localizer_order,
block_number=block_number,
pseudorandomize=pseudorandomize, TR=TR)
elif run_type == 'cognitive':
block_data = create_cognitive_block(n_trials=n_trials,
block_number=block_number,
response_modality=response_modality,
n_null_trials=n_null_trials,
TR=TR)
elif run_type == 'limbic':
block_data = create_limbic_block(n_trials=n_trials,
block_number=block_number,
response_modality=response_modality,
n_null_trials=n_null_trials,
TR=TR)
# Add trial start times (relative to start of experiment)
block_data['trial_start_time'] = block_data['trial_duration'].shift(1).cumsum()
block_data.loc[0, 'trial_start_time'] = 0
# Add cue onset times (relative to start of experiment)
block_data['cue_onset_time'] = block_data['trial_start_time'] + \
block_data['phase_1']
# Add stimulus onset times (relative to start of experiment)
block_data['stimulus_onset_time'] = block_data['trial_start_time'] + \
block_data['phase_1'] + \
block_data['phase_2'] + \
block_data['phase_3']
# Calculate efficiency
X, ev_names = stim_to_design(block_data, block=block_number, silent=True)
# Loop over the contrasts
dvars = [(c[ii, :].dot(np.linalg.pinv(X.T.dot(X))).dot(c[ii, :].T))
for ii in range(c.shape[0])]
eff = c.shape[0] / np.sum(dvars)
effs[i] = eff
# Found a better block than anything earlier? Save this
if eff > best_eff:
best_eff = eff
best_seed = seeds[i]
best_block = block_data
# Save everything
out_dict = {'seeds': seeds,
'efficiencies': effs,
'best_eff': best_eff,
'best_seed': best_seed,
'best_block': best_block,
'contrasts': c,
'contrast_ev_names': ev_names}
print('Done optimizing, tried %d seeds. Best efficiency: %.4f (mean eff: %.3f (SD %.3f))' %
(n_attempts, best_eff, np.mean(effs), np.std(effs)))
# return block
return best_block, out_dict
In [12]:
import itertools
from pprint import pprint
# Use itertools permutations to get all possible orders of both blocks and localizers
block_order = list(itertools.permutations("ABCD"))
loc_order = list(itertools.permutations("XY"))
# Repeat all elements in loc_orders to match sizes
loc_order = [item for item in loc_order for i in range(len(block_order))]
# Merge localizer and blocks
block_order = [(x[0], x[1], y[0], y[1], y[2], y[3]) for x, y in zip(loc_order, block_order*2)]
pprint(block_order)
len(block_order) # 48 possible conditions!
Out[12]:
In [13]:
a = [x for x in range(120) if x % 4 == 0 and x % 8 == 0]
print(a)
In [23]:
participant_range = [25, 26]
In [ ]:
n_trials_per_localizer_block = 8
n_localizer_blocks = 6
TR = 3
os.chdir('/Users/steven/Sync/PhDprojects/subcortex/flashtask/designs')
n_optim_attempts = 1e3
n_trials_limbic = 80
n_trials_cognitive = 72
n_null_trials_limbic = 8
n_null_trials_cognitive = 7
## Contrasts to optim for
c_localizer = np.array([
[0, 0, 0, 0, 1, 1, 0, 0], # hand vs baseline
[0, 0, 0, 0, 0, 0, 1, 1], # eye vs baseline
[0, 0, 0, 0, 1, 1, -1, -1] # hand - eye
])
# Cognitive: [u'cue_ACC', u'cue_SPD', u'stimulus_0_ACC', u'stimulus_0_SPD', u'stimulus_1_ACC', u'stimulus_1_SPD']
# Of interest: ACC vs SPD cue
# ACC vs SPD stimulus
# ACC vs SPD trial (cue + stim)
c_cognitive = np.array([
[1, -1, 0, 0, 0, 0], # ACC vs SPD cue
[0, 0, 1, -1, 1, -1], # ACC vs SPD stimulus
[1, -1, 1, -1, 1, -1] # ACC vs SPD cue and stimulus
])
# Limbic:
# [u'cue_RIGHT', u'cue_NEU', u'cue_LEFT',
# u'stimulus_1_RIGHT', u'stimulus_1_NEU', u'stimulus_1_LEFT',
# u'stimulus_0_RIGHT', u'stimulus_0_NEU', u'stimulus_0_LEFT']
# Of interest: direction vs neutral cue; direction vs neutral stim; direction vs neutral cue+stim
c_limbic = np.array([
[1, -2, 1, 0, 0, 0, 0, 0, 0], # direction vs neutral cue
[0, 0, 0, 1, -2, 1, 1, -2, 1], # direction vs neutral stim
[1, -2, 1, 1, -2, 1, 1, -2, 1] # direction vs neutral cue+stim
])
# Loop & Run
for pp in range(participant_range[0], participant_range[1]):
pp_str = str(pp).zfill(3)
print('Processing pp %s...' % pp_str)
block_order_this_pp = block_order[pp % len(block_order)]
# Empty DataFrame
design_this_pp = pd.DataFrame()
# Get localizer block
if block_order_this_pp[0] == 'X':
localizer_order = ['hand', 'eye']
else:
localizer_order = ['eye', 'hand']
design_this_pp, optim_res = optimize_brute_force(n_trials=n_trials_per_localizer_block,
c=c_localizer,
run_type='localizer',
block_number=0,
pseudorandomize=True,
TR=3,
n_attempts=n_optim_attempts,
n_localizer_blocks=6,
localizer_order=localizer_order)
with open('pp_' + pp_str + '_block_0_optim.pkl', 'wb') as f:
pkl.dump(optim_res, f)
# Get blocks
for block_number, block_char in enumerate(block_order_this_pp):
if block_char in ['X', 'Y']:
continue
if block_char == 'A':
block_data, optim_res = optimize_brute_force(n_trials=n_trials_cognitive,
c=c_cognitive,
run_type='cognitive',
block_number=block_number-1,
response_modality='hand',
n_null_trials=n_null_trials_cognitive,
TR=TR,
pseudorandomize=True,
n_attempts=n_optim_attempts)
elif block_char == 'B':
block_data, optim_res = optimize_brute_force(n_trials=n_trials_cognitive,
c=c_cognitive,
run_type='cognitive',
block_number=block_number-1,
response_modality='eye',
n_null_trials=n_null_trials_cognitive,
TR=TR,
pseudorandomize=True,
n_attempts=n_optim_attempts)
elif block_char == 'C':
block_data, optim_res = optimize_brute_force(n_trials=n_trials_limbic,
c=c_limbic,
run_type='limbic',
block_number=block_number-1,
response_modality='hand',
n_null_trials=n_null_trials_limbic,
TR=TR,
pseudorandomize=True,
n_attempts=n_optim_attempts)
elif block_char == 'D':
block_data, optim_res = optimize_brute_force(n_trials=n_trials_limbic,
c=c_limbic,
run_type='limbic',
block_number=block_number-1,
response_modality='eye',
n_null_trials=n_null_trials_limbic,
TR=TR,
pseudorandomize=True,
n_attempts=n_optim_attempts)
with open('pp_' + pp_str + '_block_' + str(block_number-1) + '_optim.pkl', 'wb') as f:
pkl.dump(optim_res, f)
design_this_pp = design_this_pp.append(block_data)
# Set indices
design_this_pp.index.name = 'block_trial_ID'
design_this_pp.reset_index(inplace=True)
design_this_pp.index.name = 'trial_ID'
# Add trial start times (relative to start of experiment)
design_this_pp['trial_start_time'] = design_this_pp['trial_duration'].shift(1).cumsum()
design_this_pp.loc[0, 'trial_start_time'] = 0
# Add cue onset times (relative to start of experiment)
design_this_pp['cue_onset_time'] = design_this_pp['trial_start_time'] + \
design_this_pp['phase_1']
# Add stimulus onset times (relative to start of experiment)
design_this_pp['stimulus_onset_time'] = design_this_pp['trial_start_time'] + \
design_this_pp['phase_1'] + \
design_this_pp['phase_2'] + \
design_this_pp['phase_3']
# Re-order column order for nicety
design_this_pp = design_this_pp[['block_trial_ID', 'block', 'block_type', 'null_trial', 'correct_answer', 'cue', 'response_modality', 'trial_type',
'phase_0', 'phase_1', 'phase_2', 'phase_3', 'phase_4', 'phase_5', 'phase_6', 'phase_7', 'trial_duration',
'trial_start_time', 'cue_onset_time', 'stimulus_onset_time',
'trial_start_time_block', 'cue_onset_time_block', 'stimulus_onset_time_block']]
# Save full data
if not os.path.exists(os.path.join('pp_%s' % pp_str, 'all_blocks')):
os.makedirs(os.path.join('pp_%s' % pp_str, 'all_blocks'))
design_this_pp.to_csv(os.path.join('pp_%s' % pp_str, 'all_blocks', 'trials.csv'), index=True)
# Save individual blocks
for block_num, block_type in zip(design_this_pp['block'].unique(), design_this_pp['block_type'].unique()):
block = design_this_pp.loc[design_this_pp['block'] == block_num]
if not os.path.exists(os.path.join('pp_%s' % pp_str, 'block_%d_type_%s' % (block_num, block_type))):
os.makedirs(os.path.join('pp_%s' % pp_str, 'block_%d_type_%s' % (block_num, block_type)))
block.to_csv(os.path.join('pp_%s' % pp_str, 'block_%d_type_%s' % (block_num, block_type), 'trials.csv'), index=True)
All following code is used to transform the created designs into FSL-accepted formats. First, we create 3-column .txt-files for every regressor. Then, we manually and painfully make the .fsf file for a single subject (for all designs) in the FSL gui (if someone can point me to a CLI for this, I'd be very grateful). Finally, we copy the created .fsf-file and substitute all references to the first pp for each other pp.
The current set-up is to model the following EVs:
In total, the first-level design has 23 EVs (currently not including responses). In the following code, we extract the EV timing from the design files.
In [ ]:
for pp_num in range(participant_range[0], participant_range[1]):
print('Processing pp %d...' % (pp_num))
pp_str = str(pp_num).zfill(3)
# Get all blocks directories of this subject, making sure to ignore any existing .feat-dirs
pp_block_dirs = glob('pp_%s/*' % pp_str)
pp_block_dirs = [x for x in pp_block_dirs if not x.endswith('.feat')]
# Loop over blocks
for pp_block_dir in pp_block_dirs:
# For all blocks, or the localizer block, we can use the "global" timing (not within-block) to create EVs.
if 'all_blocks' in pp_block_dir or 'localizer' in pp_block_dir:
stim_onset_col = 'stimulus_onset_time'
cue_onset_col = 'cue_onset_time'
else: # Otherwise, use only the block timing
stim_onset_col = 'stimulus_onset_time_block'
cue_onset_col = 'cue_onset_time_block'
# Define output directory, create if it doesnt exist
output_dir = os.path.join(pp_block_dir, 'evs')
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Read design from csv
design = pd.read_csv(os.path.join(pp_block_dir, 'trials.csv'))
design = design[['correct_answer','trial_type', 'block', 'phase_2', 'phase_4', 'cue', cue_onset_col,
stim_onset_col, 'response_modality', 'null_trial']]
# Get rid of all null trials
design = design.loc[design['null_trial'] != True]
# Weights of all EVs are 1
design['weight'] = 1
# For the localizer block
if 0 in design['block'].unique():
subs = design.loc[design['block'] == 0]
# EVs for cues and responses, separated for the response modality (eye/hand) and direction (left/right):
# 4 EVs for cue (left/right x eye/hand), and 4 EVs for the responses (left/right x eye/hand)
for effector in subs['response_modality'].unique():
for cue in subs['cue'].unique():
# response
ev = subs.loc[(subs['response_modality'] == effector) &
(subs['cue'] == cue), [stim_onset_col, 'phase_4', 'weight']].values.tolist()
with open(os.path.join(output_dir, 'ev_resp_%s_%s.txt' % (effector, cue)), 'wb') as f:
for _list in ev:
for _string in _list:
f.write(str(_string) + '\n')
f.write('\n')
# responses
ev = subs.loc[(subs['response_modality'] == effector) &
(subs['cue'] == cue), [cue_onset_col, 'phase_2', 'weight']].values.tolist()
with open(os.path.join(output_dir, 'ev_cue_%s_%s.txt' % (effector, cue)), 'wb') as f:
for _list in ev:
for _string in _list:
f.write(str(_string) + '\n')
f.write('\n')
# Get rid of localizer block here
design = design.loc[design['block'] > 0]
# For all decision-making blocks, we model the cue types (spd/acc or left/neu/right), so create EVs for these
for cue_type in design['cue'].unique():
# Get cue onset time, cue duration, and cue weight
ev = design.loc[(design['cue'] == cue_type), [cue_onset_col, 'phase_2', 'weight']].values.tolist()
with open(os.path.join(output_dir, 'ev_cue_%s.txt' % cue_type), 'wb') as f:
for _list in ev:
for _string in _list:
f.write(str(_string) + '\n')
f.write('\n')
# We also model the stimuli, separate for type (left/right), but separate for every cue type (spd/acc OR left/neu/right)
for stim_type in design['correct_answer'].unique():
if np.isnan(stim_type): # a nan stimtype corresponds to a null trial, so skip these
continue
for cue_type in design['cue'].unique():
# Get stimulus onset time, stimulus duration, and weight
ev = design.loc[(design['correct_answer'] == stim_type) &
(design['cue'] == cue_type),
[stim_onset_col, 'phase_4', 'weight']].values.tolist()
with open(os.path.join(output_dir, 'ev_stimulus_%d_%s.txt' % (stim_type, cue_type)), 'wb') as f:
for _list in ev:
for _string in _list:
f.write(str(_string) + '\n')
f.write('\n')
In [ ]:
for block_name in ['all_blocks', '_type_localizer', '_type_cognitive_hand', '_type_cognitive_eye', '_type_limbic_eye', '*_type_limbic_hand']:
# Get .fsf-file from pp001
pp_001_fn = glob(os.path.join('pp_001', '*' + block_name, 'design.fsf'))[0]
pp_001_block_name = pp_001_fn.split('/')[1]
with open(pp_001_fn, 'rb') as f:
fsf_templ = f.readlines()
for pp in range(participant_range[0], participant_range[1]):
fsf_thispp = copy.copy(fsf_templ)
# Get path to save the design.fsf file for this pp
this_pp_fn = os.path.join(glob(os.path.join('pp_' + str(pp).zfill(3), '*' + block_name))[0], 'design.fsf')
this_pp_block_name = this_pp_fn.split('/')[1]
for i, line in enumerate(fsf_thispp):
if not block_name == 'all_blocks':
fsf_thispp[i] = re.sub(pp_001_block_name,
this_pp_block_name, line)
fsf_thispp[i] = re.sub('pp_001', 'pp_' + str(pp).zfill(3), fsf_thispp[i])
# Find fn for current pp
with open(this_pp_fn, 'wb') as f:
f.writelines(fsf_thispp)
In [ ]:
wd = os.getcwd()
for sub in range(participant_range[0], participant_range[1]):
os.chdir(wd)
sub_dir = 'pp_' + str(sub).zfill(3)
block_dirs = [f for f in os.listdir(sub_dir) if os.path.isdir(os.path.join(sub_dir, f))]
block_dirs = [f for f in block_dirs if not '.feat' in f]
for block_dir in block_dirs:
design_dir = os.path.join(wd, sub_dir, block_dir)
os.chdir(design_dir)
os.system('feat design.fsf')
os.chdir(wd)
design creation without optimizing: