The magic commands become automaticaly available when you create a client:
In [1]:
from ipyparallel import Client
cl = Client()
cl.ids
Out[1]:
In [2]:
cl[:].targets
Out[2]:
In [3]:
%%px --noblock
# run a whole cell in non-blocking mode, by default on all engines
# note: the magic has to be at the top of the cell
import time
time.sleep(1)
time.time()
Out[3]:
In [4]:
%pxresult
# get the result from the AsyncResult object
In [5]:
%%px --local
# run whole cell on all engines a well as in the local IPython session
import sys
sys.path.insert(0, '/home/claudius/Downloads/dadi')
sys.path[0]
Out[5]:
In [6]:
%%px --local
import dadi
Dadi should now be imported on all remote engines as well as locally.
In [7]:
%whos
The remote namespace can be checked with:
In [8]:
%px %who
As can be seen, dadi is imported on all engines.
Like the dadi module, I would like to load the 1D folded SFS's of par and ery into the namespaces of the local IPython session as well as on all engines.
In [9]:
%ll
In [10]:
% ll dadiExercises/
In [11]:
! cat dadiExercises/ERY.FOLDED.sfs.dadi_format
In [12]:
! cat dadiExercises/PAR.FOLDED.sfs.dadi_format
In [13]:
%%px --local
# import 1D spectrum of ery on all engines:
fs_ery = dadi.Spectrum.from_file('dadiExercises/ERY.FOLDED.sfs.dadi_format')
# import 1D spectrum of ery on all engines:
fs_par = dadi.Spectrum.from_file('dadiExercises/PAR.FOLDED.sfs.dadi_format')
In [14]:
%px %who
In [15]:
%who
In [16]:
%%px --local
ns = fs_ery.sample_sizes # both populations have the same sample size
fs_ery.pop_ids = ['ery']
fs_par.pop_ids = ['par']
# setting the smallest grid size slightly larger than the largest population sample size (36)
pts_l = [50, 60, 70]
In [10]:
%psource dadi.Demographics1D.growth
This model assumes that exponential growth (or decline) started some time $T$ in the past and the current effective population size is a multiple $\nu$ of the ancient populations size, i. e. before exponential growth began. So this model just estimates two parameters. If $\nu$ is estimated to be 1, this indicates that the population size hasn't changed (although see Myers2008). If it is below one, this indicates exponential population decline (how realistic is that?). If it is above 1, this indicates exponential population growth.
In [17]:
%%px --local
# create link to function that specifies a simple growth or decline model
func = dadi.Demographics1D.growth
# create extrapolating version of the function
func_ex = dadi.Numerics.make_extrap_log_func(func)
# set lower and upper bounds to nu and T
upper_bound = [100, 5]
lower_bound = [1e-3, 0]
In [18]:
%px %who
In [22]:
from itertools import product
import numpy as np
from __future__ import print_function
# create range of starting values for nu and T, evenly distributed in log space
p0_nu = np.logspace(-3, np.log10(50), base=10.0, num=10)
p0_T = np.logspace(-3, np.log10(2), base=10, num=10)
# create all possible pairwise combinations of nu and T
i = 0
num_comb = len(p0_nu)*len(p0_T)
for p0 in product(p0_nu, p0_T):
i+=1
if i>20 and i<=(num_comb-20):
continue # continue with the next iteration of the loop
print( i , p0 )
print("The total number of parameter combinations is ", i)
These starting values cover a very wide range of possible parameter combinations. This should be better than merely randomly perturbing a single parameter combination a certain number of times (and running the optimisation from those perturbed starting values).
In [23]:
# create load balanced view of engines
lbview = cl.load_balanced_view()
In [47]:
%pdoc dadi.Inference.optimize_log
dadi also offers other otimisation algorithms: optimize_log_fmin
and optimize_grid
.
In [25]:
# create parallel function for ery spectrum
@lbview.parallel(block=False)
def run_dadi_on_ery(p_init): # for the function to be called with map, it needs to have one input variable
"""
p_init: initial parameter values to run optimisation from
"""
# run optimisation of paramters
popt = dadi.Inference.optimize_log(p0=p_init, data=fs_ery, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=0, maxiter=100, full_output=False)
return (p_init, popt) # returns a tuple with starting values and optimal nu and T
In [26]:
# run dadi in parallel on all combinations of starting values
ar = run_dadi_on_ery( product(p0_nu, p0_T) )
# wait for results to come in and print progress report
#ar.wait_interactive()
In [31]:
ar.progress
Out[31]:
In [32]:
# run after optimisations have finished
ar.wall_time/60
Out[32]:
It took 7 minutes to run these 100 optimisations.
In [40]:
# save optimsation results to file
# can be loaded back into another IPython session with: ar = pickle.load(filehandle)
# unfortunately, I cannot save the AsyncResult object with pickle (dill can save it, but emits a
# "maximum recursion depth exceeded" error when I try to load the object from file again).
# I need to save the result as a plain list
import dill
with open("exp_growth_optim_res_ERY.pickle", "w") as fhandle:
dill.dump(list(ar.get()), fhandle)
In [41]:
# this can now be loaded back from file into memory:
with open("exp_growth_optim_res_ERY.pickle", "r") as fhandle:
ra = dill.load(fhandle)
In [39]:
?pickle
In [51]:
ar.get()[:10]
Out[51]:
The tuples returned from my parallel function have been flattened into one array. That's not nice.
In [46]:
?print # print_function from __future__
In [42]:
# print result of optimisations
for i, p in enumerate(ar):
if (i+1) % 2:
#print(i, p)
print("starting values: nu = %.4f, T = %.6f" % (p[0][0], p[0][1]), end=" ==> ")
else:
#print(i, p)
print("optim. values: nu = %.4f, T = %.6f" % (p[0], p[1]))
Clearly, different starting parameter values lead to different optimised parameter values. There is a great variance in the returned optimised parameters. Maybe the maximum number of iterations (maxiter
) is not enough?
I don't understand the the maxiter
option to the optimisation function. Could it be that 100 iterations are not enough to allow some optimisations with starting values far away from the optimal values to find the optimum?
In [43]:
# get the optimized parameter values
popt = [p for i, p in enumerate(ar) if not (i+1) % 2]
print( popt[:10] )
len(popt)
Out[43]:
In [44]:
# get the starting values
pstart = [p for i, p in enumerate(ar) if (i+1) % 2]
pstart[:10]
Out[44]:
In [45]:
# get log likelihood values for parameter combinations
@lbview.parallel(block=True)
def get_ll(p):
# calculate the best-fit model SFS given paramter combination
expected_sfs = func_ex(p, ns, pts_l)
# calculate the log likelihood of the expected SFS given the observed SFS
return dadi.Inference.ll_multinom(expected_sfs, fs_ery)
# run get_ll on all sets of parameters in parallel
ll = get_ll.map(popt)
In [46]:
sorted(ll, reverse=True)[:20]
Out[46]:
In [47]:
# get arrays of individual parameters from each optimisation run
nu = [p_set[0] for p_set in popt]
T = [p_set[1] for p_set in popt]
nu_start = [p_set[0][0] for p_set in pstart]
T_start = [p_set[0][1] for p_set in pstart]
In [48]:
print(len(ll), len(nu), len(T), len(nu_start), len(T_start))
I would like to create a data frame with pandas
with these arrays and then sort the table by ll
.
In [50]:
import pandas as pd
In [51]:
# create pandas data frame
df = pd.DataFrame({
'a_nu_start' : nu_start,
'b_T_start' : T_start,
'c_nu' : nu,
'd_T' : T,
'e_log ll' : ll
})
df.head()
Out[51]:
In [52]:
df.dtypes
Out[52]:
In [53]:
# get statistics for each column
df.describe()
Out[53]:
In [54]:
df.sort_values(by='e_log ll', ascending=False).head(50)
Out[54]:
Given the range of starting values that result in almost exactly the same optimal parameter values, I am fairly convinced that the optimal parameters for this model are close to $\nu = 0.136$ and $T = 0.0075$. Given the model, the optimal parameters indicate that the ancestral populations to the erythropus sample started to decline exponentially (how realistic is that?) $0.0075 \times 2N_a$ generations ago to a contemporary population size of about 14% of the ancient effective population size ($N_a$).
In [55]:
%matplotlib inline
In [56]:
import matplotlib.pyplot as plt
In [58]:
plt.rcParams['font.size'] = 20.0
plt.figure(figsize=[12.0, 10.0])
plt.scatter(np.log(nu), np.log(T), c=ll, s=75, alpha=0.5)
plt.xlabel(r'log $\nu$')
plt.ylabel("log T")
plt.title("log likelihoods for parameter combinations")
plt.grid()
plt.colorbar()
Out[58]:
The parameter combination $\nu = 0.001049$ and $T = 0.000023$ has only slightly smaller log likelihood. This parameter combination corresponds to a much more recent and much more drastic population reduction. I think a likelihood ratio test would be required to see whether the data can distinguish between these two scenarios.
Maybe I should do a longer run, starting from the best-fit parameter sets found here and setting maxiter
to something ridiculously high, so that it is guaranteed that the ML values will be found within gtol
deviation.
In [59]:
p0 = df.sort_values(by='e_log ll', ascending=False).iloc[0, 2:4]
p_init = np.array(p0)
p_init
Out[59]:
In [60]:
# create direct view of engines
dview = cl[:]
# define function to run dadi with perturbed starting values
# note the high maxiter
def run_dadi_on_ery(p_init): # for the function to be called with map, it needs to have one input variable
"""
p_init: initial parameter values to run optimisation from
"""
# perturb starting values by up to a factor of 2 times 'fold'
p0 = dadi.Misc.perturb_params(p_init, fold=2, upper_bound=upper_bound, lower_bound=lower_bound)
# run optimisation of paramters
popt = dadi.Inference.optimize_log(p0=p0, data=fs_ery, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=0, maxiter=1e9, full_output=False)
return popt # returns a tuple with starting values and optimal nu and T
In [89]:
%pdoc run_dadi_on_ery
In [61]:
# run optimisation with perturbed initial values on all 11 engines
fine_opt_res = dview.apply_async( run_dadi_on_ery, p_init )
In [63]:
# get running time
fine_opt_res.wall_time
Out[63]:
This toke only 6 seconds to compute on 11 engines.
In [64]:
# get optimised parameters
fine_opt_res.get()
Out[64]:
In [65]:
# run get_ll on all sets of parameters in parallel
ll = get_ll.map(fine_opt_res.get())
ll
Out[65]:
In [77]:
nu_opt = [p[0] for p in fine_opt_res.get()]
T_opt = [p[1] for p in fine_opt_res.get()]
df = pd.DataFrame({
'a_nu' : nu_opt,
'b_T' : T_opt,
'c_ll' : ll
})
df
Out[77]:
There are still three outlier results. Maybe I need to use a different optimsiation function.
I would like to comparisons between model predicted and observed SFS for a couple of optimisation results.
In [78]:
p_init
Out[78]:
In [85]:
# calculate the best-fit model SFS given the paramter combination with the highest likelihood
expected_sfs = func_ex(p_init, ns, pts_l)
expected_sfs.fold()
Out[85]:
In [86]:
plt.rcParams['figure.figsize'] = [12.0, 10.0]
print(p_init)
dadi.Plotting.plot_1d_comp_multinom(expected_sfs.fold()[:19], fs_ery[:19], residual='linear')
Compared to the standard neutral model prediction, it seems that only the first frequency class has a markedly reduced residual with this model (about 20 standard deviations).
In [84]:
p = np.array(df.iloc[0, 0:2])
print(p)
expected_sfs = func_ex(p_init, ns, pts_l)
dadi.Plotting.plot_1d_comp_multinom(expected_sfs.fold()[:19], fs_ery[:19], residual='linear')
I cannot make out a clear difference between these this and the previous parameter set.
In [87]:
df
Out[87]:
In [88]:
p = np.array(df.iloc[6, 0:2])
print(p)
expected_sfs = func_ex(p_init, ns, pts_l)
dadi.Plotting.plot_1d_comp_multinom(expected_sfs.fold()[:19], fs_ery[:19], residual='linear')
Again, even this very different parameter set seems to produce a very similar SFS to the ones produced by the two more likely parameter sets.
In [91]:
# define function to run dadi with perturbed starting values
def run_dadi_on_ery(p_init, dadi_opt_func, maxiter=100): # for the function to be called with map, it needs to have one input variable
"""
p_init: initial parameter values to perturb and then run optimisation from
dadi_opt_func:
+ optimize_log (BFGS based)
+ optimize_log_fmin (Nelder-Mead)
maxiter: maximum number of iterations
"""
# perturb starting values by up to a factor of 2 times 'fold'
p0 = dadi.Misc.perturb_params(p_init, fold=2, upper_bound=upper_bound, lower_bound=lower_bound)
# run optimisation of paramters
popt = dadi_opt_func(p0=p0, data=fs_ery, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=0, maxiter=maxiter, full_output=False)
return popt # returns a tuple with starting values and optimal nu and T
In [92]:
%pdoc run_dadi_on_ery
Now run dadi optimisation with Nelder-Mead algorithm.
In [93]:
# run optimisation with perturbed initial values on all 11 engines
fine_opt_res = dview.apply_async( run_dadi_on_ery, p_init, dadi.Inference.optimize_log_fmin, maxiter=1e9 )
In [94]:
fine_opt_res.wall_time
Out[94]:
In [96]:
ll = get_ll.map( fine_opt_res.get() )
In [97]:
nu = [p[0] for p in fine_opt_res.get()]
T = [p[1] for p in fine_opt_res.get()]
df = pd.DataFrame({
'a_nu' : nu,
'b_T' : T,
'c_ll' : ll
})
df
Out[97]:
Half of the optimisations result in a combination of parameters that do not have the highest likelihood. So it does seem as though Nelder-Mead is more robust than BFGS (but note the random perturbation of starting values).
In [101]:
# create parallel function for par spectrum
@lbview.parallel(block=False)
def run_dadi_on_par(p_init): # for the function to be called with map, it needs to have one input variable
"""
p_init: initial parameter values to run optimisation from
"""
# run optimisation of paramters
popt = dadi.Inference.optimize_log(p0=p_init, data=fs_par, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=0, maxiter=100, full_output=False)
return (p_init, popt) # returns a tuple with starting values and optimal nu and T
In [ ]:
# run dadi in parallel on all combinations of starting values
#ar = run_dadi_on_par( product(p0_nu, p0_T) )
# wait for results to come in and print progress report
#ar.wait_interactive()
In [108]:
ar.wall_time/60
Out[108]:
This runs definitely too long! run_dadi_on_par
spends most of its time on the last three tasks. A maxiter
of 100, therefore, does not seem to be limiting the search. Set to lower maxiter
next time. Maybe 50.
Ryan Gutenkunst writes on the dadi forum:
The parameter bounds are primarily about avoiding really slow fits. Calculation with high migration rates and times or small population sizes is much slower, and the optimizer can explore extreme values before settling down. So typically we set migrations rates to be bounded [0, 20ish], times to be [0, 5ish], population sizes to be [1e-3, 1e6ish].
Maybe I should set the time parameter to something above 0.
In [107]:
%psource dadi.Inference.optimize_log
In [109]:
# save optimsation results to file
# can be loaded back into another IPython session with: ar = pickle.load(filehandle)
# unfortunately, I cannot save the AsyncResult object with pickle (dill can save it, but emits a
# "maximum recursion depth exceeded" error when I try to load the object from file again).
# I need to save the result as a plain list
import dill
with open("exp_growth_optim_res_PAR.pickle", "w") as fhandle:
dill.dump(list(ar.get()), fhandle)
In [110]:
# get the optimized parameter values
popt = [p for i, p in enumerate(ar) if not (i+1) % 2]
# get the starting values
pstart = [p for i, p in enumerate(ar) if (i+1) % 2]
In [111]:
# get arrays of individual parameters from each optimisation run
nu = [p_set[0] for p_set in popt]
T = [p_set[1] for p_set in popt]
nu_start = [p_set[0][0] for p_set in pstart]
T_start = [p_set[0][1] for p_set in pstart]
In [112]:
# run get_ll on all sets of parameters in parallel
ll = get_ll.map(popt)
In [113]:
# create pandas data frame
df = pd.DataFrame({
'a_nu_start' : nu_start,
'b_T_start' : T_start,
'c_nu' : nu,
'd_T' : T,
'e_log ll' : ll
})
df.head()
Out[113]:
In [114]:
df.sort_values(by='e_log ll', ascending=False).head(30)
Out[114]:
The parameter combination with the highest likelihood is not found several times from different starting values. This does not give me much confidence in the maximum likelihood parameter set.
In [115]:
plt.rcParams['font.size'] = 20.0
plt.figure(figsize=[12.0, 10.0])
plt.scatter(np.log(nu), np.log(T), c=ll, s=75, alpha=0.5)
plt.xlabel(r'log $\nu$')
plt.ylabel("log T")
plt.title("log likelihoods for parameter combinations")
plt.grid()
plt.colorbar()
Out[115]:
In [123]:
# define function to run dadi with perturbed starting values
# note the high maxiter
def run_dadi_on_par(p_init): # for the function to be called with map, it needs to have one input variable
"""
p_init: initial parameter values to run optimisation from
"""
# perturb starting values by up to a factor of 2 times 'fold'
p0 = dadi.Misc.perturb_params(p_init, fold=2, upper_bound=upper_bound, lower_bound=lower_bound)
# run optimisation of paramters
popt = dadi.Inference.optimize_log(p0=p0, data=fs_par, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=0, maxiter=1e9, full_output=False)
return popt # returns optimal nu and T
In [136]:
p_init
Out[136]:
In [124]:
# get parameter combination with highest likelihood as starting values
p_init = np.array( df.sort_values(by='e_log ll', ascending=False).head().iloc[0, 2:4] )
# run optimisation with perturbed initial values on all 11 engines
fine_opt_res = dview.apply_async( run_dadi_on_par, p_init )
In [126]:
fine_opt_res.wall_time
Out[126]:
In [127]:
ll = get_ll.map(fine_opt_res.get())
In [128]:
nu_opt = [p[0] for p in fine_opt_res.get()]
T_opt = [p[1] for p in fine_opt_res.get()]
df = pd.DataFrame({
'a_nu' : nu_opt,
'b_T' : T_opt,
'c_ll' : ll
})
df
Out[128]:
None of these parameter combinations is the most likely determined above! I can therefore not be very confident that I have found the most likely parameter combination.
Maybe another optimisation algorithm is better at finding the global maximum.
In [129]:
# define function to run dadi with perturbed starting values
def run_dadi_optimisation(p_init, dadi_opt_func, sfs, maxiter=100): # for the function to be called with map, it needs to have one input variable
"""
p_init: initial parameter values to perturb and then run optimisation from
dadi_opt_func:
+ optimize_log (BFGS based)
+ optimize_log_fmin (Nelder-Mead)
sfs: ipyparallel Reference to dadi Spectrum object (previously imported into all engines)
maxiter: maximum number of iterations
"""
# perturb starting values by up to a factor of 2 times 'fold'
p0 = dadi.Misc.perturb_params(p_init, fold=2, upper_bound=upper_bound, lower_bound=lower_bound)
# run optimisation of paramters
popt = dadi_opt_func(p0=p0, data=sfs, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=0, maxiter=maxiter, full_output=False)
return popt # returns a tuple with starting values and optimal nu and T
In [130]:
from ipyparallel import Reference
In [131]:
sfs = Reference('fs_par')
In [132]:
# run optimisation with perturbed initial values on all 11 engines
# use Nelder-Mead algorithm this time
fine_opt_res = dview.apply_async( run_dadi_optimisation, \
p_init, dadi.Inference.optimize_log_fmin, sfs, maxiter=100 )
In [134]:
fine_opt_res.wall_time
Out[134]:
In [135]:
ll = get_ll.map(fine_opt_res.get())
nu_opt = [p[0] for p in fine_opt_res.get()]
T_opt = [p[1] for p in fine_opt_res.get()]
df = pd.DataFrame({
'a_nu' : nu_opt,
'b_T' : T_opt,
'c_ll' : ll
})
df
Out[135]:
Again, none of these parameter combinations is close to the most likely ones determined above.
Let's see how the best-fitting model and data compare to each other.
In [140]:
# calculate the best-fit model SFS given the paramter combination with the highest likelihood
expected_sfs = func_ex(p_init, ns, pts_l)
print(p_init)
dadi.Plotting.plot_1d_comp_multinom(expected_sfs.fold()[:19], fs_par[:19], residual='linear')
These logarithmic plots are not very helpfull.
In [145]:
# get optimally scaled (by theta) expected SFS for given data
model = dadi.Inference.optimally_scaled_sfs(expected_sfs.fold(), fs_par)
print(p_init)
plt.plot(model, 'ro-', label="model")
plt.plot(fs_par[:19], 'bo-', label="data")
plt.legend()
plt.xlabel('minor allele frequency')
plt.ylabel('SNP count')
Out[145]:
So, if I could believe the optimal parameter combination found here to be the global maximum, it would imply that the ancestral populations of the parallelus sample have undergone an exponential decline to about 1/1000th of the ancient population size, $N_{ref}$ (i. e. very strong bottleneck) and this happend in the very recent past, i. e. only $2.3 \times 10^{-5} \times 2N_{ref}$ generations ago.
In [ ]:
In [ ]:
In [42]:
%time run_dadi.map(range(20))
Out[42]:
Most of the time this just runs on a single core! I don't know why. Note the difference between the wall time and the total time.
Running the simulation locally on a single core:
In [44]:
%time
# import 1D spectrum of ery on all engines:
fs_ery = dadi.Spectrum.from_file('dadiExercises/ERY.FOLDED.sfs.dadi_format')
fs_ery.pop_ids = ['ery']
ns = fs_ery.sample_sizes
# create link to function that specifies a simple growth or decline model
func = dadi.Demographics1D.growth
# create extrapolating version of the function
func_ex = dadi.Numerics.make_extrap_log_func(func)
# set lower and upper bounds to nu and T
upper_bound = [100, 3]
lower_bound = [1e-2, 0]
# set starting value
p0 = [1, 1] # corresponds to constant population size
# setting the smallest grid size slightly larger than the largest population sample size (36)
pts_l = [40, 50, 60]
# perturb starting values by up to a factor of 2 times fold:
p1 = dadi.Misc.perturb_params(p0, fold=1.5, upper_bound=upper_bound, lower_bound=lower_bound)
# run optimisation of paramters
popt = dadi.Inference.optimize_log(p0=p1, data=fs_ery, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=0, maxiter=100, full_output=False)
popt
Out[44]:
In [45]:
%time
# perturb starting values by up to a factor of 2 times fold:
p1 = dadi.Misc.perturb_params(p0, fold=1.5, upper_bound=upper_bound, lower_bound=lower_bound)
# run optimisation of paramters
popt = dadi.Inference.optimize_log(p0=p1, data=fs_ery, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=0, maxiter=100, full_output=False)
popt
Out[45]:
It really shouldn't take 2 minutes to run this 20 times with different starting values.
In [46]:
%px %who
Note, that p1
and popt
are not in the namespace of the remote engines. They apparently have been cleared automatically.
In [47]:
?run_dadi.map
In [48]:
lbview.block
Out[48]:
Jobs submitted to the load balanced view run asynchonously in non-blocking mode.
In [52]:
cl.metadata
Out[52]:
In [53]:
# create serial function
def serial_run_dadi(x): # for the function to be called with map, it needs to have one input variable
# perturb starting values by up to a factor of 2 times fold:
p1 = dadi.Misc.perturb_params(p0, fold=1.5, upper_bound=upper_bound, lower_bound=lower_bound)
# run optimisation of paramters
popt = dadi.Inference.optimize_log(p0=p1, data=fs_ery, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=0, maxiter=100, full_output=False)
return popt # returns a numpy array of optimal nu and T
In [55]:
# run dadi optimisation in non blocking mode
ar = lbview.map(serial_run_dadi, range(20))
ar.wait_interactive()
In [59]:
# the sum of the computation time of all of the tasks:
ar.serial_time
Out[59]:
In [61]:
# the time between the first task submitted and last result received.
# This is the actual cost of computation, including IPython overhead
ar.wall_time
Out[61]:
In [62]:
parallel_speedup = ar.serial_time / ar.wall_time
parallel_speedup
Out[62]:
In [56]:
ar.get()
Out[56]:
In [63]:
for i, r in enumerate(ar):
print "Optimsation number %i: %.3f (nu), %.3f (T)" % (i, r[0], r[1])
In [78]:
# create serial function
def serial_run_dadi_on_data(sfs):
"""
sfs: Spectrum object, provided to 'data' option of dadi.Inference.optimize_log
"""
# perturb starting values by up to a factor of 2 times fold:
p1 = dadi.Misc.perturb_params(p0, fold=1.5, upper_bound=upper_bound, lower_bound=lower_bound)
# run optimisation of paramters
popt = dadi.Inference.optimize_log(p0=p1, data=sfs, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=0, maxiter=100, full_output=False)
return popt # returns a numpy array of optimal nu and T
In [65]:
from ipyparallel import Reference
In [66]:
# create a Reference to `fs_ery` on the engines.
data = Reference('fs_ery')
In [69]:
ar = lbview.map(serial_run_dadi_on_data, [data] * 20)
In [70]:
for i, r in enumerate(ar):
print "Optimsation number %i: %.3f (nu), %.3f (T)" % (i, r[0], r[1])
In [79]:
# run parallel optimisation on 1D SFS of par
ar = lbview.map(serial_run_dadi_on_data, [data] * 20)
# print result
for i, r in enumerate(ar):
print "Optimsation number %i: %.3f (nu), %.3f (T)" % (i, r[0], r[1])
In [80]:
# run parallel optimisation on 1D SFS of par
# return results as they become ready (unordered)
# 3 calls of the function per task (chunksize)
ar = lbview.map(serial_run_dadi_on_data, [data] * 50, ordered=False, chunksize=3)
# print result
for i, r in enumerate(ar):
print "Optimsation number %i: %.3f (nu), %.3f (T)" % (i, r[0], r[1])
I think it should be a good idea to use load balanced views of the engines, since I think some optimisations take longer than others.
In [81]:
numpy.mean(ar[0])
Out[81]:
Look at max_iter
from dadi.Inference.optimize_log
.
Restart the kernel from the menu to clear the namespace!
In [1]:
%who
In [145]:
from ipyparallel import Client
cl = Client()
cl.ids
Out[145]:
In [146]:
%%px --local
# run whole cell on all engines a well as in the local IPython session
import numpy as np
import sys
sys.path.insert(0, '/home/claudius/Downloads/dadi')
import dadi
Use unfolded ANGSD spectra and fold with dadi:
In [147]:
%%px --local
# import 1D spectrum of ery on all engines:
fs_ery = dadi.Spectrum.from_file('ERY.unfolded.sfs').fold()
# import 1D spectrum of ery on all engines:
fs_par = dadi.Spectrum.from_file('PAR.unfolded.sfs').fold()
In [148]:
cl[0]['fs_ery']
Out[148]:
In [149]:
%%px --local
ns = fs_ery.sample_sizes # both populations have the same sample size
fs_ery.pop_ids = ['ery']
fs_par.pop_ids = ['par']
# setting the smallest grid size slightly larger than the largest population sample size (36)
pts_l = [40, 50, 60]
In [150]:
%%px --local
# create link to function that specifies the model
func = dadi.Demographics1D.growth
# create extrapolating version of the model function
func_ex = dadi.Numerics.make_extrap_log_func(func)
In [10]:
%pdoc dadi.Demographics1D.growth
In [151]:
# create load balanced view of engines
lbview = cl.load_balanced_view()
In [13]:
def run_dadi(p_init): # for the function to be called with map, it needs to have one input variable
"""
p_init: initial parameter values to run optimisation from
"""
if perturb == True:
p_init = dadi.Misc.perturb_params(p_init, fold=fold,
upper_bound=upper_bound, lower_bound=lower_bound)
# note upper_bound and lower_bound variables are expected to be in the namespace of each engine
# run optimisation of paramters
popt = dadi_opt_func(p0=p_init, data=sfs, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=verbose, maxiter=maxiter, full_output=full_output)
return p_init, popt
The following sets global variables on the remote engines that are called by run_dadi
:
In [13]:
%%px
# set up global variables on engines required for run_dadi function call
dadi_opt_func = dadi.Inference.optimize_log # uses BFGS algorithm
sfs = fs_ery # use ERY spectrum
perturb = False
fold = 1
maxiter = 10 # run a maximum of 20 iterations
verbose = 0
full_output = True # need to have full output to get the warnflags (see below)
This lets the optimisations run with the faster BFGS algorithm and stop after a maximum of 10 iterations. This is intended for a broad sweep across the parameter space. Also note that this sets the optimisations to fit the ery spectrum.
In [14]:
%%px
# set lower and upper bounds to nu and T
upper_bound = [1e4, 4]
lower_bound = [1e-4, 0]
In [15]:
p0_nu = np.logspace(-3, 3, base=10.0, num=15)
p0_T = np.logspace(-3, np.log10(3), base=10, num=15)
In [16]:
from itertools import product
In [18]:
# DO NOT RUN
#ar_ery = lbview.map(run_dadi, product(p0_nu, p0_T), block=False, order=True)
In [29]:
ar_ery.progress
Out[29]:
In [30]:
ar_ery.get()[0]
I needed to kill a single very long lasting optimisation job from the terminal (because it is not possible to do that with ipyparallel). Unfortunately that also means that the results from all 224 completed (or terminated) optimisation runs are lost as well. This is the major shortcoming of ipyparallel at the moment.
Couldn't I make ipyparallel pickle the results of the optimisations to file?
In [17]:
%%px --local
import dill # better version of pickle
In [24]:
%%px --local
# set outname stub on all engines
outname = "OUT_exp_growth_model/ERY"
In [25]:
print outname
In [28]:
i=0
for p_init in product(p0_nu, p0_T):
i+=1
if i > 3: break
name = outname[:]
for p in p_init:
if p < 1:
name += "_%.4f" % (p)
else:
name += "_%4d" % (p)
print name
Now, define a new run_dadi
function that pickles output to file:
In [84]:
def run_dadi(p_init): # for the function to be called with map, it needs to have one input variable
"""
p_init: initial parameter values to run optimisation from
"""
if perturb == True:
p_init = dadi.Misc.perturb_params(p_init, fold=fold,
upper_bound=upper_bound, lower_bound=lower_bound)
# note upper_bound and lower_bound variables are expected to be in the namespace of each engine
# run optimisation of paramters
popt = dadi_opt_func(p0=p_init, data=sfs, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=verbose, maxiter=maxiter, full_output=full_output)
# pickle to file
import dill
name = outname[:] # make copy of file name stub!
for p in p_init:
name += "_%.4f" % (p)
with open(name + ".dill", "w") as fh:
dill.dump((p_init, popt), fh)
return p_init, popt
In [31]:
%ll
In [32]:
ar_ery = lbview.map(run_dadi, product(p0_nu, p0_T), block=False, order=True)
In [41]:
ar_ery.progress
Out[41]:
I killed the last optimisation.
Load pickled output:
In [42]:
import glob
In [46]:
len( glob.glob('OUT_exp_growth_model/*dill') )
Out[46]:
In [43]:
ar_ery = []
for filename in glob.glob('OUT_exp_growth_model/*dill'):
ar_ery.append( dill.load(open(filename)) )
In [48]:
ar_ery[1]
Out[48]:
Get flag count from optimisations:
In [49]:
def get_flag_count(out, NM=True):
"""
out: list of tuples, each containing p_init and popt + additional info, including warnflags
as produced by run_dadi.py
"""
from collections import defaultdict
if NM: # if ar from Nelder-Mead
i = 4 # the warnflag is reported at index position 4 in the output array
else: # ar from BFGS optimisation
i = 6
warnflag = defaultdict(int)
for res in out:
if res[1][i] == 1: # notice the change in indexing
warnflag[1] +=1
elif res[1][i] == 2:
warnflag[2] += 1
elif res[1][i] == 0:
warnflag[0] += 1
else:
warnflag[999] +=1
if NM:
print "success", warnflag[0]
print "Maximum number of function evaluations made.", warnflag[1]
print "Maximum number of iterations reached.", warnflag[2]
print "unknown flag", warnflag[999]
else:
print "success", warnflag[0]
print "Maximum number of iterations exceeded.", warnflag[1]
print "Gradient and/or function calls not changing.", warnflag[2]
print "unknown flag", warnflag[999]
In [50]:
get_flag_count(ar_ery, NM=False)
There were 39 successfull optimisations.
Create table of results from successful optimisations:
In [53]:
def flatten(array):
"""
Returns a list of flattened elements of every inner lists (or tuples)
****RECURSIVE****
"""
import numpy
res = []
for el in array:
if isinstance(el, (list, tuple, numpy.ndarray)):
res.extend(flatten(el))
continue
res.append(el)
return list( res )
In [54]:
import pandas as pd
In [59]:
i = 6 # where to find flag, 6 for BFGS, 4 for Nelder-Mead
successfull_popt_ery = [flatten(out)[:5] for out in ar_ery if out[1][i] == 0]
df = pd.DataFrame(data=successfull_popt_ery, \
columns=['nu_0', 'T_0', 'nu_opt', 'T_opt', '-logL'])
df.sort_values(by='-logL', ascending=True) # smaller is better
Out[59]:
These results are completely inconclusive. I think it would make sense to limit the search space for $T$ to something well above 0 and start a few optimisations from parameter values close to a neutral model.
In [61]:
%%px
lower_bound = [1e-4, 1e-6]
In [63]:
cl[0]['lower_bound']
Out[63]:
In [65]:
%%px
# set up global variables on engines required for run_dadi function call
dadi_opt_func = dadi.Inference.optimize_log # uses BFGS algorithm
sfs = fs_ery # use ERY spectrum
perturb = True
fold = 1
maxiter = 10 # run a maximum of 20 iterations
verbose = 0
full_output = True # need to have full output to get the warnflags (see below)
outname = "OUT_exp_growth_model/ERY_perturb"
In [66]:
p_init = [1, 1] # this should a neutral model, i. e. constant population size
In [67]:
from itertools import repeat
ar_ery = lbview.map(run_dadi, repeat(p_init, 20), block=False, order=True)
In [69]:
ar_ery.progress
Out[69]:
In [70]:
get_flag_count(ar_ery, NM=False)
None of these optimisations was able to converge on optimal parameter values.
In [71]:
%ll OUT_exp_growth_model/ERY_perturb*
The perturbation has not been very effective.
In [73]:
! ls -l OUT_exp_growth_model/ERY_perturb* | wc -l
Why are there only 19 output files? There should be 20! Two output filenames happened to be identical.
In [72]:
%%px
# use Nelder-Mead algorithm
dadi_opt_func = dadi.Inference.optimize_log_fmin
In [74]:
! rm -f OUT_exp_growth_model/ERY_perturb*
In [75]:
! ls -l OUT_exp_growth_model/ERY_perturb* | wc -l
In [76]:
ar_ery = lbview.map(run_dadi, repeat(p_init, 20), block=False, order=False)
In [78]:
ar_ery.elapsed
Out[78]:
In [79]:
get_flag_count(ar_ery, NM=True)
In [80]:
%%px
# increase perturbation
fold = 3
In [81]:
ar_ery = lbview.map(run_dadi, repeat(p_init, 20), block=False, order=False)
In [82]:
ar_ery.progress
Out[82]:
In [88]:
ar_ery.elapsed
Out[88]:
In [83]:
get_flag_count(ar_ery, NM=True)
In [85]:
! rm -f OUT_exp_growth_model/ERY_perturb*
In [87]:
%%px
maxiter = 50
In [89]:
ar_ery = lbview.map(run_dadi, repeat(p_init, 20), block=False, order=False)
In [90]:
ar_ery.progress
Out[90]:
In [91]:
ar_ery.elapsed
Out[91]:
In [92]:
get_flag_count(ar_ery, NM=True)
OK. Three successful optimisations.
In [93]:
i = 4 # where to find flag, 6 for BFGS, 4 for Nelder-Mead
successfull_popt_ery = [flatten(out)[:5] for out in ar_ery if out[1][i] == 0]
df = pd.DataFrame(data=successfull_popt_ery, \
columns=['nu_0', 'T_0', 'nu_opt', 'T_opt', '-logL'])
df.sort_values(by='-logL', ascending=True) # smaller is better
Out[93]:
OK, that looks promising. Two quite different parameter combinations have lead to the same optimal parameter combination.
In [98]:
p_init = np.array( df.sort_values(by='-logL').iloc[0, 2:4] )
p_init
Out[98]:
In [99]:
ar_ery = lbview.map(run_dadi, repeat(p_init, 20), block=False, order=False)
In [100]:
ar_ery.elapsed
Out[100]:
In [101]:
get_flag_count(ar_ery, NM=True)
In [102]:
i = 4 # where to find flag, 6 for BFGS, 4 for Nelder-Mead
successfull_popt_ery = [flatten(out)[:5] for out in ar_ery if out[1][i] == 0]
df = pd.DataFrame(data=successfull_popt_ery, \
columns=['nu_0', 'T_0', 'nu_opt', 'T_opt', '-logL'])
df.sort_values(by='-logL', ascending=True) # smaller is better
Out[102]:
This looks like convergence.
In [104]:
%%px
maxiter = 100
In [105]:
ar_ery = lbview.map(run_dadi, repeat(p_init, 20), block=False, order=False)
In [107]:
ar_ery.elapsed
Out[107]:
This is still extremely fast!
In [106]:
get_flag_count(ar_ery, NM=True)
In [108]:
i = 4 # where to find flag, 6 for BFGS, 4 for Nelder-Mead
successfull_popt_ery = [flatten(out)[:5] for out in ar_ery if out[1][i] == 0]
df = pd.DataFrame(data=successfull_popt_ery, \
columns=['nu_0', 'T_0', 'nu_opt', 'T_opt', '-logL'])
df.sort_values(by='-logL', ascending=True) # smaller is better
Out[108]:
In [111]:
# safe optimisation result to file
# dill can dump but not load an AsyncResult object
dill.dump(list(ar_ery.get()), open("OUT_exp_growth_model/ERY_perturb_ar_ery.dill", "w"))
In [114]:
test = dill.load(open("OUT_exp_growth_model/ERY_perturb_ar_ery.dill"))
get_flag_count(test, NM=True)
Since starting optimisations from a broad range of initial values did not lead to useful results with the ery spectrum, I am going to use the same strategy that led to a successful optimisation with ery: creating initial parameter values from a perturbation of parameters for a neutral model.
In [117]:
%%px
# set up global variables on engines required for run_dadi function call
dadi_opt_func = dadi.Inference.optimize_log_fmin # uses Nelder-Mead algorithm
sfs = fs_par # use PAR spectrum
perturb = True
fold = 3
maxiter = 50 # run a maximum of 50 iterations
verbose = 0
full_output = True # need to have full output to get the warnflags (see below)
outname = "OUT_exp_growth_model/PAR_perturb"
In [118]:
p_init = [1, 1]
In [119]:
ar_par = lbview.map(run_dadi, repeat(p_init, 20), block=False, order=False)
In [126]:
ar_par.elapsed/60
Out[126]:
In [127]:
ar_par.progress
Out[127]:
I needed to kill the last two processes in the terminal because they ran more than 15 minutes. I cannot use the AsyncResult object because of that and unstead need to load the pickled output from file.
In [130]:
ar_par = []
for filename in glob.glob('OUT_exp_growth_model/PAR_perturb*'):
ar_par.append( dill.load(open(filename)) )
In [131]:
get_flag_count(ar_par, NM=True)
Only 1 successful optimisation.
In [132]:
i = 4 # where to find flag, 6 for BFGS, 4 for Nelder-Mead
successfull_popt_par = [flatten(out)[:5] for out in ar_par if out[1][i] == 0]
df = pd.DataFrame(data=successfull_popt_par, \
columns=['nu_0', 'T_0', 'nu_opt', 'T_opt', '-logL'])
df.sort_values(by='-logL', ascending=True) # smaller is better
Out[132]:
That is hitting the upper limit for the time parameter.
Maybe a sweep through the parameter space could help to find an optimum.
In [157]:
%%px
# set up global variables on engines required for run_dadi function call
dadi_opt_func = dadi.Inference.optimize_log # uses BFGS algorithm
sfs = fs_par # use PAR spectrum
perturb = False
fold = 3
maxiter = 10 # run a maximum of 10 iterations
verbose = 0
full_output = True # need to have full output to get the warnflags (see below)
outname = "OUT_exp_growth_model/PAR"
In [135]:
ar_par = lbview.map(run_dadi, product(p0_nu, p0_T), block=False, order=False)
In [143]:
ar_par.progress
Out[143]:
In [144]:
ar_par.elapsed/60
Out[144]:
Unfortunately, this is too slow. I need to abort this by stopping the cluster of engines.
In [19]:
import glob, dill
In [25]:
ar_par = []
for filename in glob.glob('OUT_exp_growth_model/PAR_[0123456789]*'):
try:
ar_par.append( dill.load( open(filename) ) )
except:
pass
In [26]:
get_flag_count(ar_par, NM=False)
After restarting the cluster, it is important to be EXTREMELY careful to load all required variables and modules on all engines again.
In [2]:
%who
In [27]:
! rm -f OUT_exp_growth_model/PAR_[0123456789]*
In [28]:
from ipyparallel import Client
cl = Client()
cl.ids
Out[28]:
In [29]:
%%px --local
# run whole cell on all engines a well as in the local IPython session
import numpy as np
import sys
sys.path.insert(0, '/home/claudius/Downloads/dadi')
import dadi
In [30]:
%%px --local
# import 1D spectrum of ery on all engines:
fs_ery = dadi.Spectrum.from_file('ERY.unfolded.sfs').fold()
# import 1D spectrum of ery on all engines:
fs_par = dadi.Spectrum.from_file('PAR.unfolded.sfs').fold()
In [31]:
%%px --local
ns = fs_ery.sample_sizes # both populations have the same sample size
fs_ery.pop_ids = ['ery']
fs_par.pop_ids = ['par']
# setting the smallest grid size slightly larger than the largest population sample size (36)
pts_l = [40, 50, 60]
In [32]:
%%px --local
# create link to function that specifies the model
func = dadi.Demographics1D.growth
# create extrapolating version of the model function
func_ex = dadi.Numerics.make_extrap_log_func(func)
In [33]:
# create load balanced view of engines
lbview = cl.load_balanced_view()
In [34]:
%%px --local
import dill # better version of pickle
In [35]:
from itertools import product
In [36]:
def run_dadi(p_init): # for the function to be called with map, it needs to have one input variable
"""
p_init: initial parameter values to run optimisation from
"""
if perturb == True:
p_init = dadi.Misc.perturb_params(p_init, fold=fold,
upper_bound=upper_bound, lower_bound=lower_bound)
# note upper_bound and lower_bound variables are expected to be in the namespace of each engine
# run optimisation of paramters
popt = dadi_opt_func(p0=p_init, data=sfs, model_func=func_ex, pts=pts_l, \
lower_bound=lower_bound, upper_bound=upper_bound, \
verbose=verbose, maxiter=maxiter, full_output=full_output)
import dill
# pickle to file
name = outname[:] # make copy of file name stub!
for p in p_init:
name += "_%.4f" % (p)
with open(name + ".dill", "w") as fh:
dill.dump((p_init, popt), fh)
return p_init, popt
In [37]:
%%px
# set lower and upper bounds to nu and T
upper_bound = [1e4, 4]
lower_bound = [1e-4, 1e-6]
I am trying to reduce the initial parameter sweep.
In [38]:
p0_nu = np.logspace(-2, 2, base=10.0, num=10)
p0_T = np.logspace(-2, np.log10(2), base=10, num=10)
In [40]:
%%px
# set up global variables on engines required for run_dadi function call
dadi_opt_func = dadi.Inference.optimize_log # uses BFGS algorithm
sfs = fs_par # use PAR spectrum
perturb = False
fold = 3
maxiter = 10 # run a maximum of 10 iterations
verbose = 0
full_output = True # need to have full output to get the warnflags (see below)
outname = "OUT_exp_growth_model/PAR"
In [41]:
ar_par = lbview.map(run_dadi, product(p0_nu, p0_T), block=False, order=False)
In [51]:
def get_flag_count(out, NM=True):
"""
out: list of tuples, each containing p_init and popt + additional info, including warnflags
as produced by run_dadi.py
"""
from collections import defaultdict
if NM: # if ar from Nelder-Mead
i = 4 # the warnflag is reported at index position 4 in the output array
else: # ar from BFGS optimisation
i = 6
warnflag = defaultdict(int)
for res in out:
if res[1][i] == 1: # notice the change in indexing
warnflag[1] +=1
elif res[1][i] == 2:
warnflag[2] += 1
elif res[1][i] == 0:
warnflag[0] += 1
else:
warnflag[999] +=1
if NM:
print "success", warnflag[0]
print "Maximum number of function evaluations made.", warnflag[1]
print "Maximum number of iterations reached.", warnflag[2]
print "unknown flag", warnflag[999]
else:
print "success", warnflag[0]
print "Maximum number of iterations exceeded.", warnflag[1]
print "Gradient and/or function calls not changing.", warnflag[2]
print "unknown flag", warnflag[999]
In [67]:
ar_par = []
for filename in glob.glob('OUT_exp_growth_model/PAR_[0123456789]*'):
try:
ar_par.append( dill.load( open(filename) ) )
except:
pass
In [68]:
get_flag_count(ar_par, NM=False)
Ok, the reduced sweep through the parameter space was not successful for the parallelus spectrum. The exponential growth/decline model could not be fit to the par spectrum.
In [ ]: