In [1]:
from ipyparallel import Client
cl = Client()
cl.ids
Out[1]:
In [2]:
# clear the namespace in engines
cl.clear()
%px %who
In [3]:
%%px --local
# run whole cell on all engines a well as in the local IPython session
import numpy as np
import sys
sys.path.insert(0, '/home/claudius/Downloads/dadi')
import dadi
In [4]:
%%px --local
# import 1D spectrum of ery on all engines:
fs_ery = dadi.Spectrum.from_file('dadiExercises/ERY.FOLDED.sfs.dadi_format')
# import 1D spectrum of ery on all engines:
fs_par = dadi.Spectrum.from_file('dadiExercises/PAR.FOLDED.sfs.dadi_format')
In [5]:
%%px --local
ns = fs_ery.sample_sizes # both populations have the same sample size
fs_ery.pop_ids = ['ery']
fs_par.pop_ids = ['par']
# setting the smallest grid size slightly larger than the largest population sample size (36)
pts_l = [50, 60, 70]
In [ ]:
?dadi.Demographics1D.bottlegrowth
The built-in bottlegrowth
model specifies an instantaneous size change $T \times 2N_{ref}$ generations in the past immediately followed by the start of exponential growth (or decline) toward the contemporary population size. The model has three parameters:
In [6]:
%%px --local
# create link to function that specifies the model
func = dadi.Demographics1D.bottlegrowth
# create extrapolating version of the model function
func_ex = dadi.Numerics.make_extrap_log_func(func)
In [7]:
%%px
# set lower and upper bounds to nuB, nuF and T
upper_bound = [1e4, 1e4, 4]
lower_bound = [1e-4, 1e-4, 0]
In [8]:
import numpy as np
# create range of starting values evenly distributed in log space
p0_nuB = np.logspace(-3, 3, base=10.0, num=6)
p0_nuF = np.logspace(-3, 3, base=10.0, num=6)
p0_T = np.logspace(-4, np.log10(4), base=10, num=6)
In [9]:
# number of starting parameter combinations:
6**3
Out[9]:
In [10]:
# create load balanced view of engines
lbview = cl.load_balanced_view()
In [11]:
def run_dadi(p_init): # for the function to be called with map, it needs to have one input variable
"""
p_init: initial parameter values to run optimisation from
"""
if perturb == True:
p_init = dadi.Misc.perturb_params(p_init, fold=fold,
upper_bound=upper_bound, lower_bound=lower_bound)
# note upper_bound and lower_bound variables are expected to be in the namespace of each engine
# run optimisation of paramters
popt = dadi_opt_func(p0=p_init, data=sfs, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=verbose, maxiter=maxiter, full_output=full_output)
return popt
The following cell is very inportant for the upper function run_dadi
to work. It creates global variables on all engines, which are called by the function (instead of supplied to the function, which is difficult to implement).
In [21]:
%%px
# set up global variables on engines required for run_dadi function call
dadi_opt_func = dadi.Inference.optimize_log # uses gradient based BFGS algorithm
sfs = fs_ery # use ERY spectrum
perturb = False
fold = 1
maxiter = 3 # run a maximum of 3 iterations
verbose = 0
full_output = True # need to have full output to get the warnflags (see below)
Check what's in the namespace of the engines:
In [13]:
%px %who
In [14]:
from itertools import product, izip
In [15]:
for i, c in enumerate(product(p0_nuB, p0_nuF, p0_T)):
if i>10: break
print c
During these optimisations, there are usually a few that run much longer than the rest. I would like to be able to interrupt those runs. The following is taken from an issue thread at the ipyparallel github repo:
In [16]:
%%px --local
import os
In [17]:
engine_pids = cl[:].apply(os.getpid).get_dict()
In [18]:
engine_pids
Out[18]:
In [20]:
import signal
# define function to interrupt an engine
def signal_engine(engine_id, sig=signal.SIGINT):
"""send a signal to a local engine"""
pid = engine_pids[engine_id]
os.kill(pid, sig)
In [33]:
# run optimnisations with all combinations of starting values
# DO NOT RUN
ar_ery = lbview.map(run_dadi, product(p0_nuB, p0_nuF, p0_T), block=False, order=True)
In [34]:
ar_ery.progress
Out[34]:
In [35]:
for ID in cl.ids:
signal_engine(ID)
In [36]:
cl.ids
Out[36]:
This has no effect. Engines keep running. The only way to stop them is by shutting down the whole ipcluster.
In [31]:
# total running time in minutes
ar_ery.elapsed/60
Out[31]:
In [23]:
from collections import defaultdict
In [24]:
def get_flag_count(ar, NM=True):
"""
ar: asyncresult object from BFGS or Nelder-Mead optimisation
"""
if NM: # if ar from Nelder-Mead
i = 4 # the warnflag is reported at index position 4 in the output array
else: # ar from BFGS optimisation
i = 6
warnflag = defaultdict(int)
for res in ar:
if res[i] == 1:
warnflag[1] +=1
elif res[i] == 2:
warnflag[2] += 1
elif res[i] == 0:
warnflag[0] += 1
else:
warnflag[999] +=1
if NM:
print "success", warnflag[0]
print "Maximum number of function evaluations made.", warnflag[1]
print "Maximum number of iterations reached.", warnflag[2]
print "unknown flag", warnflag[999]
else:
print "success", warnflag[0]
print "Maximum number of iterations exceeded.", warnflag[1]
print "Gradient and/or function calls not changing.", warnflag[2]
print "unknown flag", warnflag[999]
In [ ]:
get_flag_count(ar_ery, NM=False)
Almost no optimisation was successfull, mostly because of lacking gradient in the likelihood function. This calls for the other optimsation algorithm that is more robust to noise in the data: Nelder-Mead.
In [ ]:
?dadi.Inference.optimize_log_fmin
In [ ]:
%%px
dadi_opt_func = dadi.Inference.optimize_log_fmin # uses Nelder-Mead algorithm
In [ ]:
# run optimnisations with all combinations of starting values
ar_ery = lbview.map(run_dadi, product(p0_nuB, p0_nuF, p0_T), block=False, order=True)
In [ ]:
ar_ery.progress
In [ ]:
ar_ery.elapsed/60
In [ ]:
get_flag_count(ar_ery, NM=True)
In [ ]:
cl[0]['maxiter']
I need to extend the maximum number of iterations allowed per optimisation in order to get any results.
In [ ]:
%%px
maxiter = 50
In [ ]:
cl[:]['maxiter']
In [ ]:
# run optimnisations with all combinations of starting values
# DO NOT RUN
ar_ery = lbview.map(run_dadi, product(p0_nuB, p0_nuF, p0_T), block=False, order=True)
In [ ]:
ar_ery.progress
In [ ]:
ar_ery.elapsed/60
This runs too long! Again, most of the time just on two cores. I cannot wait that long for exploratory analyses. Unfortunately, I cannot interrupt those long lasting jobs on the remote engines once they have started. If I shutdown the whole IPython cluster of engines, I cannot retrieve the information from the jobs that finished earlier (and did not get aborted). I think I need to be able to abort jobs that last too long but still have access to the results of the other jobs.
The only solution I can think of at the moment is to write a python script that takes initial starting values as input (like the function run_dadi
), runs dadi optimisation with it and spits out the optimal parameter values to a separate file. Then I start this script in many subprocesses, thereby running in many instances in parallel on many initial parameter values and I would be able to interrupt the parent process (that initiates the subprocesses) without loosing the output from already completed subprocesses.
In [ ]:
def flatten(array):
"""
Returns a list of flattened elements of every inner lists (or tuples)
****RECURSIVE****
"""
res = []
for el in array:
if isinstance(el, (list, tuple)):
res.extend(flatten(el))
continue
res.append(el)
return res
In [ ]:
# create parallel function with load balancing
@lbview.parallel(block=True)
def get_ll(p):
"""
p: parameter combination
First, calculates the best-fit model SFS given paramter combination p.
Then returns the log likelihood of the expected SFS given the observed SFS.
"""
expected_sfs = func_ex(p, ns, pts_l)
return dadi.Inference.ll_multinom(expected_sfs, sfs)
# expected_sfs does not need to be folded, ll_multinom does that automatically
# make sure that sfs points to right spectrum
Due to the issues with job control in the ipyparallel package, I am forced to use a more low-level parallel framework.
Do a kernel restart from the menu above in order to clear the namespace for a fresh restart!
In [1]:
%whos
I have written a python script called run_dadi.py
, which can take many command line arguments and in the following parallel framework is the replacement for the run_dadi
function from above.
In [2]:
% ll
Let's have a look at run_dadi.py
.
In [3]:
%less run_dadi.py
In [4]:
! ./run_dadi.py -h
In [46]:
import multiprocessing
import subprocess32 as subprocess
import time, signal, os
In [2]:
%pdoc subprocess
See the readme of the subprocess32 module.
In [6]:
subprocess.__name__
Out[6]:
In [7]:
multiprocessing.cpu_count()
Out[7]:
There are only 12 physical cores on huluvu (6 dual-core CPU's').
In [47]:
from itertools import product
import numpy as np
In [48]:
# set lower and upper bounds to nuB, nuF and T
upper_bound = [1e4, 1e4, 4]
lower_bound = [1e-4, 1e-4, 0]
In [10]:
str(upper_bound)
Out[10]:
In [49]:
# create range of starting values evenly distributed in log space
p0_nuB = np.logspace(-3, 3, base=10.0, num=6)
p0_nuF = np.logspace(-3, 3, base=10.0, num=6)
p0_T = np.logspace(-4, np.log10(4), base=10, num=6)
In [12]:
for i, comb in enumerate(product(p0_nuB, p0_nuF, p0_T)):
print comb
if i > 10: break
In [50]:
import sys
sys.path.insert(0, '/home/claudius/Downloads/dadi')
import dadi
The following variables will be passed as string values to run_dadi.py
:
In [14]:
path_to_spectrum_file = "dadiExercises/ERY.FOLDED.sfs.dadi_format"
dadi_model = "dadi.Demographics1D.bottlegrowth"
upper = str(upper_bound)
lower = str(lower_bound)
dadi_opt_func = "dadi.Inference.optimize_log_fmin"
stub = "bottlegrowth_NM"
maxiter = 100
In [16]:
% ll dadiExercises/
In [15]:
# check creation of command line
for i, p_init in enumerate(product(p0_nuB, p0_nuF, p0_T)):
cmd = "./run_dadi.py -p %s -m %s -u '%s' -l '%s' -d %s -s %s --maxiter %s -i '%s'" \
% (path_to_spectrum_file, dadi_model, upper, lower, dadi_opt_func, stub, str(maxiter), str(p_init))
print cmd
if i >= 0: break
Next, I need to define an executor class. This is taken from Tiago Antao's Cookbook:
In [54]:
class executor:
def __init__(self, limit):
self.limit = limit
self.ncores = multiprocessing.cpu_count()
self.running = []
self.finished = 0
def submit(self, cmd, p_init):
#self.progress()
self.wait()
if hasattr(self, 'out'):
out = self.out
else:
out = '/dev/null'
if hasattr(self, 'err'):
err = self.err
else:
err = '/dev/null'
if err == 'stderr':
errSt = ''
else:
errSt = '2> ' + err
proc = subprocess.Popen('%s > %s %s' % (cmd, out, errSt), shell=True)
self.running.append(proc)
#print "started running optimsation with " + str(p_init)
#print "started subprocess with process id" + str(proc.pid)
if hasattr(self, 'out'):
del self.out
if hasattr(self, 'err'):
del self.err
def wait(self, for_all=False):
self.clean_done()
#numWaits = 0
if self.limit > 0 and type(self.limit) == int:
cond = 'len(self.running) >= self.ncores - self.limit'
elif self.limit < 0:
cond = 'len(self.running) >= - self.limit'
else:
cond = 'len(self.running) >= self.ncores * self.limit'
while eval(cond) or (for_all and len(self.running) > 0):
time.sleep(1)
self.clean_done() # updates self.running, removes finished jobs from the running queue
#numWaits += 1
def clean_done(self):
terminated = []
for i, p in enumerate(self.running):
if p.poll() is not None: # None means it's still running
terminated.append(i)
for idx in reversed(terminated):
del self.running[idx]
self.finished += 1
def progress(self):
self.clean_done()
#for p in self.running:
# if p.poll() is not None:
# print "subprocess with process id %d reports that it has finished" % (p.pid)
# else:
# print "subprocess with process id %d reports that it is still running" % (p.pid)
print self.finished
# def kill_all(self):
# import os, signal
# self.clean_done()
# for p in self.running:
# os.kill(p.pid, signal.SIGINT)
# self.clean_done()
# None of my attempts to kill the subprocesses has worked.
# It seems they simply become disconnected, but still continue running.
In [ ]:
%%time
E = executor(limit=-12) # leave 4 cores unused
# this will block
for i, p_init in enumerate(product(p0_nuB, p0_nuF, p0_T)):
cmd = "./run_dadi.py -p %s -m %s -u '%s' -l '%s' -d %s -s %s --maxiter %s -i '%s'" \
% (path_to_spectrum_file, dadi_model, upper, lower, dadi_opt_func, stub, str(maxiter), str(p_init))
E.err = "stderr"
E.submit(cmd, p_init)
#if i >= 9: break
In [21]:
E.progress()
The last 8 optimisations take an exceptionally long time to run.
When I want to kill all processes that are still running, I can execute the following command line in the shell:
sudo pgrep -f "python ./run_dadi" | xargs kill
In [22]:
E.progress()
In [24]:
% ll OUT_run_dadi/*pickle | wc -l
Of 216 initial parameter combinations, 208 produced optimisation output file.
This parallel framework, albeit much more laborious to implement than using ipyparallel, has proven useful so far. It allows me to run many optimisations in parallel, save output of individual optimisation runs to file, abort the final long lasting runs from the command line and still be able to retrieve the information from the successful runs. This is something I could not do with ipyparallel, which forced me to always wait until all optimisation jobs had finished or else loose all the output.
The script run_dadi.py
has pickled the optimsation results to individual files for each initial parameter combination. They are located in the directory OUT_run_dadi
. I now want to read these output files into a single data structure.
In [11]:
import pickle
In [12]:
import glob
pickled = glob.glob('OUT_run_dadi/*pickle')
sorted(pickled[:10])
Out[12]:
In [13]:
opt_out = []
for filename in glob.iglob('OUT_run_dadi/*pickle'):
opt_out.append( pickle.load(open(filename, "r")) )
In [14]:
len(opt_out)
Out[14]:
I killed the 8 last running processes.
In [202]:
# check
opt_out[0]
Out[202]:
Look's good.
In order to get the flag count over these optimisations, I need to modify the previous function slightly, as it now handles slightly different input (list of tuples instead AsyncResult object).
In [38]:
def get_flag_count(out, NM=True):
"""
out: list of tuples, each containing p_init and popt + additional info, including warnflags
as produced by run_dadi.py
"""
from collections import defaultdict
if NM: # if ar from Nelder-Mead
i = 4 # the warnflag is reported at index position 4 in the output array
else: # ar from BFGS optimisation
i = 6
warnflag = defaultdict(int)
for res in out:
if res[1][i] == 1: # notice the change in indexing
warnflag[1] +=1
elif res[1][i] == 2:
warnflag[2] += 1
elif res[1][i] == 0:
warnflag[0] += 1
else:
warnflag[999] +=1
if NM:
print "success", warnflag[0]
print "Maximum number of function evaluations made.", warnflag[1]
print "Maximum number of iterations reached.", warnflag[2]
print "unknown flag", warnflag[999]
else:
print "success", warnflag[0]
print "Maximum number of iterations exceeded.", warnflag[1]
print "Gradient and/or function calls not changing.", warnflag[2]
print "unknown flag", warnflag[999]
In [31]:
get_flag_count(opt_out, NM=True)
Only 27 optimisations were successfull.
In [207]:
maxiter
Out[207]:
I have run each optimisation allowing for up to 100 iterations. This should be plenty.
In [32]:
dadi_opt_func
Out[32]:
I have done the optimisations using the Nelder-Mead optimisation algorithm that should be more robust with noisy data than gradient based algorithms (like BFGS).
In [33]:
type(opt_out[0][1][0])
Out[33]:
Add this type of object to list of objects to flatten in the following function.
In [15]:
def flatten(array):
"""
Returns a list of flattened elements of every inner lists (or tuples)
****RECURSIVE****
"""
import numpy
res = []
for el in array:
if isinstance(el, (list, tuple, numpy.ndarray)):
res.extend(flatten(el))
continue
res.append(el)
return list( res )
In [16]:
def get_ll(p):
"""
p: parameter combination
First, calculates the best-fit model SFS given paramter combination p.
Then returns the log likelihood of the expected SFS given the observed SFS.
"""
expected_sfs = func_ex(p, ns, pts_l)
return dadi.Inference.ll_multinom(expected_sfs, sfs)
# expected_sfs does not need to be folded, ll_multinom does that automatically
# make sure that sfs points to right spectrum
In [17]:
# import 1D spectrum of ery on all engines:
fs_ery = dadi.Spectrum.from_file('dadiExercises/ERY.FOLDED.sfs.dadi_format')
# create link to function that specifies the model
func = dadi.Demographics1D.bottlegrowth
# create extrapolating version of the model function
func_ex = dadi.Numerics.make_extrap_log_func(func)
ns = fs_ery.sample_sizes # both populations have the same sample size
fs_ery.pop_ids = ['ery']
# setting the smallest grid size slightly larger than the largest population sample size (36)
pts_l = [50, 60, 70]
sfs = fs_ery
In [18]:
# check get_ll works
ll = get_ll(opt_out[0][1][0])
ll
Out[18]:
In [40]:
# check addition of logL to list of parameters
flatten(opt_out[0])[:6] + ['aaaaa']
Out[40]:
In [19]:
successfull_popt = [flatten(out)[:6] + [get_ll(out[1][0])] for out in opt_out if out[1][4] == 0]
In [42]:
successfull_popt[:3]
Out[42]:
In [20]:
import pandas as pd
In [21]:
df = pd.DataFrame(data=successfull_popt, columns=['nuB_0', 'nuF_0', 'T_0', 'nuB_opt', 'nuF_opt', 'T_opt', 'logL'])
In [22]:
df.sort_values(by='logL', ascending=False)
Out[22]:
In [26]:
p_opt = df.sort_values(by='logL', ascending=False).iloc[0,3:6]
p_opt
Out[26]:
It looks promising that the best parameter combination was found twice from quite different starting values. The best parameter combination is more than 100 logL units more likely than the second best parameter combination. This result should be verified by running a few more optimisations with starting values in the vicinity of these optimal parameter values (via perturb
).
These optimal parameter values seem to suggest that the ancient population size first underwent a 40 fold increase at $0.37 \times 2N_{ref}$ generations ago. It then underwent exponential population decline (note, the exponential nature is assumed by the model, but may not be very realistic) to 45% of the ancient population size $N_{ref}$.
I am going to try and use ipyparallel again for the optimisations with perturbed initial parameter values.
In [1]:
from ipyparallel import Client
cl = Client()
cl.ids
Out[1]:
In [2]:
%%px --local
# run whole cell on all engines a well as in the local IPython session
import numpy as np
import sys
sys.path.insert(0, '/home/claudius/Downloads/dadi')
import dadi
Note, in the following analysis, I am going to use a slightly different spectrum: unfolded from ANGSD, then folded in dadi.
In [3]:
%%px --local
# import 1D spectrum of ery on all engines:
fs_ery = dadi.Spectrum.from_file('ERY.unfolded.sfs').fold()
In [4]:
%%px --local
ns = fs_ery.sample_sizes # both populations have the same sample size
fs_ery.pop_ids = ['ery']
# setting the smallest grid size slightly larger than the largest population sample size (36)
pts_l = [50, 60, 70]
In [5]:
%%px --local
# create link to function that specifies the model
func = dadi.Demographics1D.bottlegrowth
# create extrapolating version of the model function
func_ex = dadi.Numerics.make_extrap_log_func(func)
In [6]:
%%px
# set lower and upper bounds to nuB, nuF and T
upper_bound = [1e4, 1e4, 4]
lower_bound = [1e-4, 1e-4, 0]
In [7]:
def run_dadi(p_init): # for the function to be called with map, it needs to have one input variable
"""
p_init: initial parameter values to run optimisation from
"""
if perturb == True:
p_init = dadi.Misc.perturb_params(p_init, fold=fold,
upper_bound=upper_bound, lower_bound=lower_bound)
# note upper_bound and lower_bound variables are expected to be in the namespace of each engine
# run optimisation of paramters
popt = dadi_opt_func(p0=p_init, data=sfs, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=verbose, maxiter=maxiter, full_output=full_output)
return p_init, popt
In [8]:
# create load balanced view of engines
lbview = cl.load_balanced_view()
In [9]:
%%px
# set up global variables on engines required for run_dadi function call
dadi_opt_func = dadi.Inference.optimize_log_fmin # use Nelder-Mead algorithm
sfs = fs_ery # use ERY spectrum
perturb = True
fold = 1.5
maxiter = 100 # run a maximum of 3 iterations
verbose = 0
full_output = True # need to have full output to get the warnflags (see below)
In [10]:
p_opt = [39.786289160859184, 0.45082070925507933, 0.37350201677769701]
In [11]:
from itertools import repeat
In [12]:
# run optimnisations with perturbed optimal parameter values as starting values
# repeat this 20 times
ar_ery = lbview.map(run_dadi, repeat(list(p_opt), 20), block=False, order=True)
In [14]:
ar_ery.wall_time
Out[14]:
This just takes 9 seconds to complete!
In [15]:
ar_ery.get()[0]
Out[15]:
In [17]:
def get_flag_count(out, NM=True):
"""
out: list of tuples, each containing p_init and popt + additional info, including warnflags
as produced by run_dadi.py
"""
from collections import defaultdict
if NM: # if ar from Nelder-Mead
i = 4 # the warnflag is reported at index position 4 in the output array
else: # ar from BFGS optimisation
i = 6
warnflag = defaultdict(int)
for res in out:
if res[1][i] == 1: # notice the change in indexing
warnflag[1] +=1
elif res[1][i] == 2:
warnflag[2] += 1
elif res[1][i] == 0:
warnflag[0] += 1
else:
warnflag[999] +=1
if NM:
print "success", warnflag[0]
print "Maximum number of function evaluations made.", warnflag[1]
print "Maximum number of iterations reached.", warnflag[2]
print "unknown flag", warnflag[999]
else:
print "success", warnflag[0]
print "Maximum number of iterations exceeded.", warnflag[1]
print "Gradient and/or function calls not changing.", warnflag[2]
print "unknown flag", warnflag[999]
In [18]:
get_flag_count(ar_ery, NM=True)
13 of 20 optimisations were successful.
In [20]:
def flatten(array):
"""
Returns a list of flattened elements of every inner lists (or tuples)
****RECURSIVE****
"""
import numpy
res = []
for el in array:
if isinstance(el, (list, tuple, numpy.ndarray)):
res.extend(flatten(el))
continue
res.append(el)
return list( res )
In [22]:
successfull_popt = [flatten(out)[:7] for out in ar_ery if out[1][4] == 0]
In [24]:
import pandas as pd
In [25]:
df = pd.DataFrame(data=successfull_popt, columns=['nuB_0', 'nuF_0', 'T_0', 'nuB_opt', 'nuF_opt', 'T_opt', '-logL'])
In [26]:
df.sort_values(by='-logL', ascending=False)
Out[26]:
Those optimisations that were successful all return the same optimal parameter combination.
In [27]:
import dill
dill.dump(list(ar_ery.get()), open("OUT_bottlegrowth/ERY_perturb_ar_ery.dill", "w"))
What were the initial parameter values for unsuccessful optimisations?
In [45]:
for out in ar_ery:
if out[1][4] == 0:
continue
print out[0]
It is worrying that the optimisation from these starting values does not converge within 100 iterations.
In [51]:
% ll dadiExercises/
In [52]:
path_to_spectrum_file = "dadiExercises/PAR.FOLDED.sfs.dadi_format"
dadi_model = "dadi.Demographics1D.bottlegrowth"
upper = str(upper_bound)
lower = str(lower_bound)
dadi_opt_func = "dadi.Inference.optimize_log_fmin"
stub = "PAR_bottlegrowth_NM"
maxiter = 100
In [53]:
# check creation of command line
for i, p_init in enumerate(product(p0_nuB, p0_nuF, p0_T)):
cmd = "./run_dadi.py -p %s -m %s -u '%s' -l '%s' -d %s -s %s --maxiter %s -i '%s'" \
% (path_to_spectrum_file, dadi_model, upper, lower, dadi_opt_func, stub, str(maxiter), str(p_init))
print cmd
if i >= 0: break
In [ ]:
E = executor(limit=4) # leave 4 cores unused
# this will block
for i, p_init in enumerate(product(p0_nuB, p0_nuF, p0_T)):
cmd = "./run_dadi.py -p %s -m %s -u '%s' -l '%s' -d %s -s %s --maxiter %s -i '%s'" \
% (path_to_spectrum_file, dadi_model, upper, lower, dadi_opt_func, stub, str(maxiter), str(p_init))
E.err = "stderr"
E.submit(cmd, p_init)
This runs a very long time!
Several optimisation runs take more than two hours. I have terminated those runs from the terminal with several executions of the following command:
pgrep -of "python ./run_dadi.py" | xargs kill
This will kill the longest running command that has a matching command line and thereby freeing slots for new optimisations.
In [56]:
E.progress()
I needed to kill 26 optimisation runs because they could not finish with 1 hour.
In [57]:
pickled = glob.glob('OUT_run_dadi/PAR*pickle')
sorted(pickled[:10])
Out[57]:
In [58]:
opt_out = []
for filename in glob.glob('OUT_run_dadi/PAR*pickle'):
opt_out.append( pickle.load(open(filename)))
In [59]:
len(opt_out)
Out[59]:
In [60]:
opt_out[0]
Out[60]:
In [61]:
get_flag_count(opt_out, NM=True)
The vast majority of optimisations could not converge to an optimum within the 100 allowed iterations.
In [63]:
# this is very important for the correct calculation of likelihoods
# import 1D spectrum of ery on all engines:
fs_par = dadi.Spectrum.from_file('dadiExercises/PAR.FOLDED.sfs.dadi_format')
sfs = fs_par
In [ ]:
# this takes too long
# DO NOT RUN
#successfull_popt = [flatten(out[:6]) + [get_ll(out[1][0])] for out in opt_out if out[1][4] == 0]
In [67]:
# create parallel function with load balancing
@lbview.parallel(block=True)
def get_ll(p):
"""
p: parameter combination
First, calculates the best-fit model SFS given paramter combination p.
Then returns the log likelihood of the expected SFS given the observed SFS.
"""
expected_sfs = func_ex(p, ns, pts_l)
return dadi.Inference.ll_multinom(expected_sfs, sfs)
# expected_sfs does not need to be folded, ll_multinom does that automatically
# make sure that sfs points to right spectrum
In [68]:
# this takes less than 1 minute running on the virtual 20 cores in parallel
ll = get_ll.map([out[1][0] for out in opt_out if out[1][4] == 0])
In [73]:
successfull_popt = [flatten(out)[:6] for out in opt_out if out[1][4] == 0]
In [74]:
successfull_popt[:3]
Out[74]:
In [75]:
print len(successfull_popt)
print len(ll)
In [77]:
from itertools import izip
In [80]:
i=0
for row in izip(successfull_popt, ll):
print flatten(row)
i+=1
if i > 3: break
In [81]:
p_opt = [flatten(row) for row in izip(successfull_popt, ll)]
In [82]:
df = pd.DataFrame(data=p_opt, columns=['nuB_0', 'nuF_0', 'T_0', 'nuB_opt', 'nuF_opt', 'T_opt', 'logL'])
In [85]:
df.sort_values(by='logL', ascending=False).head(20)
Out[85]:
There is no convergence on a set of optimal parameter values. Fitting the bottlegrowth
model to the 1D SFS of parallelus must be considered as failed.
In [90]:
import matplotlib.pyplot as plt
%matplotlib inline
plt.rcParams['figure.figsize'] = [12.0, 10.0]
plt.plot(fs_par[:19])
Out[90]:
Unfortunately, the first and second frequency classes show counts that are inconsistent with any demographic history. It might therefore be worthwhile to mask them out. I think masking out just one of the first two frequency classes will lead to highly biased inferences. Masking both frequency classes will reduce a lot of the power to infer the demographic history. I therefore think that at least for this SFS, masking will most likely not lead to better estimates.
In [28]:
plt.plot(fs_ery[:19])
I think the same assessment needs to be made for the SFS of ery. Again, masking out just one of the first two frequency classes