In [3]:
import numpy as np
import os
import pandas as pd
import warnings
import nelpy as nel
warnings.filterwarnings("ignore")
In [4]:
datadirs = ['/home/etienne/Dropbox/neoReader/',
'C:/Etienne/Dropbox/neoReader/',
'D:/Dropbox/neoReader/']
fileroot = next( (dir for dir in datadirs if os.path.isdir(dir)), None)
# conda install pandas=0.19.2
if fileroot is None:
raise FileNotFoundError('datadir not found')
load_from_nel = True
# load from nel file:
if load_from_nel:
jar = nel.load_pkl(fileroot + 'gor01vvp01_processed_speed.nel')
exp_data = jar.exp_data
aux_data = jar.aux_data
del jar
jar = nel.load_pkl(fileroot + 'gor01vvp01_tables_speed.nel')
df = jar.df
df2 = jar.df2
del jar
In [5]:
# restrict sessions to explore to a smaller subset
min_n_placecells = 16
min_n_PBEs = 27 # 27 total events ==> minimum 21 events in training set
df2_subset = df2[(df2.n_PBEs >= min_n_PBEs) & (df2.n_placecells >= min_n_placecells)]
sessions = df2_subset['time'].values.tolist()
segments = df2_subset['segment'].values.tolist()
print('Evaluating subset of {} sessions'.format(len(sessions)))
df2_subset.sort(columns=['n_PBEs', 'n_placecells'], ascending=[0,0])
Out[5]:
In [52]:
m1 = [58]
m2 = [35]
m3 = [73,59]
m4 = [72,0,83,5]
m5 = [6,36]
m6 = [82,14,28,51,1,61]
m7 = [75,41]
m8 = [50,60]
print('machine 1 (AMZ): {}'.format(np.sum([df2_subset.loc[mid].n_PBEs for mid in m1])))
print('machine 2 (AMZ): {}'.format(np.sum([df2_subset.loc[mid].n_PBEs for mid in m2])))
print('machine 3 (AMZ): {}'.format(np.sum([df2_subset.loc[mid].n_PBEs for mid in m3])))
print('machine 4 (AMZ): {}'.format(np.sum([df2_subset.loc[mid].n_PBEs for mid in m4])))
print('machine 5 (AMZ): {}'.format(np.sum([df2_subset.loc[mid].n_PBEs for mid in m5])))
print('machine 6 (AMZ): {}'.format(np.sum([df2_subset.loc[mid].n_PBEs for mid in m6])))
print('machine 7 (ETN): {}'.format(np.sum([df2_subset.loc[mid].n_PBEs for mid in m7])))
print('machine 8 (ALX): {}'.format(np.sum([df2_subset.loc[mid].n_PBEs for mid in m8])))
NOTE: it is relatively easy (syntax-wise) to score each session as a parallel task, but since the Bayesian scoring takes such a long time to compute, we can be more efficient (higher % utilization) by further parallelizing over events, and not just over sessions. This further level of parallelization makes the bookkeeping a little ugly, so I provide the code for both approaches here.
In [22]:
# Amazon c4.8xlarge
from nelpy.utils import PrettyDuration
n_cores = 36*7
n_shuff = 5000
n_samp = 35000
print('It is estimated to take {} to score 1500 events using {} cores'. format(
PrettyDuration(32*4*60*60/4200000 * n_shuff * n_samp / n_cores), n_cores))
print('It is estimated to take {} to score {} events using {} cores'. format(
PrettyDuration(32*4*60*60/4200000 * n_shuff * n_samp / 1500), n_cores, n_cores))
In [15]:
# Etienne's local machine
from nelpy.utils import PrettyDuration
n_cores = 8
n_shuff = 5000
n_samp = 35000
print('It is estimated to take {} to score 1500 events using {} cores'. format(
PrettyDuration(21*4*60*60/4200000 * n_shuff * n_samp / n_cores), n_cores))
print('It is estimated to take {} to score {} events using {} cores'. format(
PrettyDuration(21*4*60*60/4200000 * n_shuff * n_samp / 1500), n_cores, n_cores))
In [23]:
parallelize_by_session = False
parallelize_by_event = True
n_jobs = 8 # set this equal to number of cores
n_shuffles = 5000
n_samples = 35000
w=3 # single sided bandwidth (0 means only include bin who's center is under line, 3 means a total of 7 bins)
In [21]:
if parallelize_by_session:
from joblib import Parallel, delayed
# A function that can be called to do work:
def work_sessions(arg):
# Split the list to individual variables:
ii, bst, tc = arg
scores, shuffled_scores, percentiles = nel.analysis.replay.score_Davidson_final_bst_fast(bst=bst,
tuningcurve=tc,
w=w,
n_shuffles=n_shuffles,
n_samples=n_samples)
return (ii, scores, shuffled_scores, percentiles)
# List of instances to pass to work():
parallel_sessions = [(ii, aux_data[session][segment]['PBEs'], aux_data[session][segment]['tc']) for (ii, (session, segment)) in enumerate(zip(sessions, segments))]
# Anything returned by work() can be stored:
parallel_results = Parallel(n_jobs=n_jobs, verbose=51)(map(delayed(work_sessions), parallel_sessions))
# standardize parallel results
idx = [result[0] for result in parallel_results]
# check that parallel results came back IN ORDER:
if nel.utils.is_sorted(idx):
print('parallel results are ordered...')
else:
raise ValueError('results are not ordered! handle it here before proceeding...')
scores_bayes = [result[1] for result in parallel_results]
scores_bayes_shuffled = [result[2] for result in parallel_results]
scores_bayes_percentile = [result[3] for result in parallel_results]
results = dict()
for ii, (session, segment) in enumerate(zip(sessions, segments)):
try:
results[session][segment] = dict()
except KeyError:
results[session] = dict()
results[session][segment] = dict()
results[session][segment]['scores_bayes'] = scores_bayes[ii]
results[session][segment]['scores_bayes_shuffled'] = scores_bayes_shuffled[ii]
results[session][segment]['scores_bayes_percentile'] = scores_bayes_percentile[ii]
print('done packing results')
In [24]:
if parallelize_by_event:
from joblib import Parallel, delayed
# A function that can be called to do work:
def work_events(arg):
# Split the list to individual variables:
session, segment, ii, bst, tc = arg
scores, shuffled_scores, percentiles = nel.analysis.replay.score_Davidson_final_bst_fast(bst=bst,
tuningcurve=tc,
w=w,
n_shuffles=n_shuffles,
n_samples=n_samples)
return (session, segment, ii, scores, shuffled_scores, percentiles)
# List of instances to pass to work():
# unroll all events:
parallel_events = []
for session, segment in zip(sessions, segments):
for nn in range(aux_data[session][segment]['PBEs'].n_epochs):
parallel_events.append((session, segment, nn, aux_data[session][segment]['PBEs'][nn], aux_data[session][segment]['tc']))
# Anything returned by work() can be stored:
parallel_results = Parallel(n_jobs=n_jobs, verbose=51)(map(delayed(work_events), parallel_events))
# standardize parallel results
bdries_ = [aux_data[session][segment]['PBEs'].n_epochs for session, segment in zip(sessions, segments) ]
bdries = np.cumsum(np.insert(bdries_,0,0))
bdries
sessions_ = np.array([result[0] for result in parallel_results])
segments_ = np.array([result[1] for result in parallel_results])
idx = [result[2] for result in parallel_results]
scores_bayes_evt = np.array([float(result[3]) for result in parallel_results])
scores_bayes_shuffled_evt = np.array([result[4].squeeze() for result in parallel_results])
scores_bayes_percentile_evt = np.array([float(result[5]) for result in parallel_results])
results = {}
for nn in range(len(bdries)-1):
session = np.unique(sessions_[bdries[nn]:bdries[nn+1]])
if len(session) > 1:
raise ValueError("parallel results in different format / order than expected!")
session = session[0]
segment = np.unique(segments_[bdries[nn]:bdries[nn+1]])
if len(segment) > 1:
raise ValueError("parallel results in different format / order than expected!")
segment = segment[0]
try:
results[session][segment]['scores_bayes'] = scores_bayes_evt[bdries[nn]:bdries[nn+1]]
except KeyError:
try:
results[session][segment] = dict()
results[session][segment]['scores_bayes'] = scores_bayes_evt[bdries[nn]:bdries[nn+1]]
except KeyError:
results[session] = dict()
results[session][segment] = dict()
results[session][segment]['scores_bayes'] = scores_bayes_evt[bdries[nn]:bdries[nn+1]]
results[session][segment]['scores_bayes_shuffled'] = scores_bayes_shuffled_evt[bdries[nn]:bdries[nn+1]]
results[session][segment]['scores_bayes_percentile'] = scores_bayes_percentile_evt[bdries[nn]:bdries[nn+1]]
print('done packing results')
In [ ]:
jar = nel.ResultsContainer(results=results, description='gor01 and vvp01 speed restricted results for best 20 candidate sessions')
jar.save_pkl('score_bayes_all_sessions.nel')