We will input a noisy single-molecule trace of, for instance, the activity of a molecular motor protein. These traces are very often step-like, with the size and duration of the steps containing important information regarding the enzymatic cycle of the machine. This information, unfortunately, is masked by noise that is intrinsic to the experiment and comes from many different sources (the Brownian motion of the particle of interest, for instance, etc). Our task is to unveil the molecular motor's behavior despite the underlying noise.
In the case of optical tweezers experiments conducted in passive mode, the molecular motor actively pulls itself into regions of higher or lower external forces (increase/decrease depends on the geometry of the experiment). In this case the constrained Brownian motion of the motor contributes noise that is not stationary, but changes as a function of the external force. Many other related, but seemingly different types of experiments similarly have non-stationary noise. Here we implement an algorithm to specifically address this kind of scenario. Assumptions:
The program can be nicely organized by following the order of events required to do the fit.
Trace:
This should be populated with an input file.
The params should be populated with input file that contains entire range of possible parameters.
Slices:
The initial fit has exactly one dwell per slice. The optimal position for each dwell is at the mean. Will need to calculate SICP, too
For each iteration, choose the optimal location by minimizing SICP. Stop adding steps once SICP no longer gets smaller.
Define function that calculates logarithm of factorial
In [1]:
def log_factorial(x):
sum = 0
for i in range(x):
sum += log(x-i)
return sum
In [2]:
log_factorial(4)
Out[2]:
In [3]:
log(4) + log(3) + log(2) + log(1)
Out[3]:
Trace data is contained in external txt file, space delimited. Format is: time force position
In [4]:
# import data
def data_point(time, force, position):
'''
Inputs: a data point will have a time, a position, and a force (floats)
Returns a dictionary with these values
'''
# each data point is a dict having time, force, and position
return {'t': time,
'f': force,
'p': position}
def load_trace(trace_filename):
'''
Opens datafile, loads in each row, and dumps time, position, and force into a trace datastructure
--Requires data_point function
--Trace datastructure is a list of dictionaries. Each dictionary entry is a datapoint with time, force, and position
Returns trace datastructure
'''
dataFile = open(trace_filename, 'r')
trace = []
for row in dataFile:
d = [float(f) for f in row.strip().split(' ')]
# Have to cast each string value to a float before append
trace.append(data_point(*d))
dataFile.close()
# data is now dumped in trace list (each list item is a dictionary)
return trace
In [5]:
# we will select the trace file using a very basic GUI selection
# these are the libraries
from Tkinter import Tk
from tkFileDialog import askopenfilename
Tk().withdraw() # we don't want a full GUI, so keep the root window from appearing
trace_filename = askopenfilename(title="Select Trace File!") # show an "Open" dialog box and return the path to the selected file
trace = load_trace(trace_filename) # load this file
# print first and last couple list items to confirm it's set up correctly
print "Len:", len(trace)
print "First:", trace[0]
print "Last:", trace[-1]
Import list of nu and So for each possible force slice
In [6]:
# import nu, So for each slice
# we will select the parameter file using a very basic GUI selection
# these are the required libraries
from Tkinter import Tk
from tkFileDialog import askopenfilename
# a slice of data will have a start/end force and
# corresponding (nu, So) bias params
def slice_params(startForce, endForce, nu, So):
'''
Inputs SICP parameters (nu, So) for all force intervals,
bounded by startForce and endForce.
Returns dictionary of this information
'''
return {'sF': startForce, 'eF': endForce, 'nu': nu, 'So': So}
# each possible slice interval is unordered list of params
Tk().withdraw() # we don't want a full GUI, so keep the root window from appearing
param_filename = askopenfilename(title="Select (nu, So) Parameter File!") # show an "Open" dialog box and return the path to the selected file
sliceFile = open(param_filename, 'r')
# this is the input file that contains the (nu, So)
# values for each force interval
params = []; tmp = [];
for row in sliceFile:
tmp.append(row.strip().split(' '))
[params.append(slice_params(float(tmp[i][0]),float(tmp[i][1]),float(tmp[i][2]),float(tmp[i][3]))) for i in range(0,len(tmp))]
del tmp
sliceFile.close()
# possible parameters are now dumped in params list
# (each list item is a dictionary)
In [7]:
print params
Slice the data according to force by recording start/end indices of each slice. Attach approprate (nu, So) to each slice
In [8]:
stF=sorted(trace, key=lambda x: x['f'])
def slice_indexing(startForce, startIndex, endForce, endIndex, nu, So): # slice the data, keeping track of indexes and bias params
return {'sF': startForce,
'sI': startIndex,
'eF': endForce,
'eI': endIndex,
'nu': nu,
'So': So}
def find_nearest(array, value):
index=argmin([abs(array[i]-value) for i in range(0,len(array))])
return array[index]
slices = [];
# very messy, but loop creates integer force list. because of how range() works, the last force isn't included
for forces in range(int(round(stF[0]['f'])),int(round(stF[len(stF)-1]['f']))):
''' sometimes the starting force will only be just below a certain
force (i.e. 4.7 will be starting force. If you sliced the trace to
include a 4 to 5 pN interval there would hardly be any data in that
interval. A similar thing might happen at the ending force where it
will be only just above a certain force (i.e. 10.2, in this case you
wouldn't want to incluede a 10 to 11 pN interval) the int(round(...
in the for loop above will specifically treat these possible
starting/ending potential issues.'''
# the if/else statements correct the indices of the starting and
# end force intervals (if forces is either the first or last
# force interval, we set the indices accordingly
if forces == int(round(stF[0]['f'])):
sI=0;
else:
sI=[trace[i]['f'] for i in range(0,len(trace))].index(find_nearest([trace[i]['f'] for i in range(0,len(trace))],forces))
if forces+1 == int(round(stF[len(stF)-1]['f'])):
eI = len(trace)-1
else:
eI=[trace[i]['f'] for i in range(0,len(trace))].index(find_nearest([trace[i]['f'] for i in range(0,len(trace))],forces+1))
sF=forces; eF=forces+1;
nu=params[[params[i]['sF'] for i in range(0,len(params))].index(sF)]['nu'];
So=params[[params[i]['sF'] for i in range(0,len(params))].index(sF)]['So'];
slices.append(slice_indexing(forces, sI, forces+1, eI, nu, So))
del stF
Confirm that slicing is working graphically
In [9]:
figure(figsize=(20,6))
subplot(1,2,1)
for s in range(0,len(slices)): # iterate over each slice, s indexes slices
force = [trace[i]['f'] for i in range(slices[s]['sI'], slices[s]['eI'])]
time = [trace[i]['t'] for i in range(slices[s]['sI'], slices[s]['eI'])]
plot(time, force); title('Sliced by Force', fontsize=30);
xlabel('$t$', fontsize=30); ylabel('force', style='italic', fontsize=30); tick_params(labelsize=25)
subplot(1,2,2)
plot([trace[i]['f'] for i in range(0,len(trace))]); title('Original Trace',fontsize=30)
xlabel('$t$', fontsize=30); ylabel('force', style='italic', fontsize=30); tick_params(labelsize=25)
Optimal single dwell of each slice is at mean of data
In [10]:
def dwell_params(startIndex, endIndex, positionLoc, forceLoc, sliceLoc): # when dwell is added, keep track of this information
return {'sI': startIndex, 'eI': endIndex, 'p': positionLoc, 'f': forceLoc, 'slice': sliceLoc}
def slice_initial_sicp(dwellSigSq, dwellNumPts, numSteps, whichSlice): # list, list, list
nu = slices[whichSlice]['nu']
So = slices[whichSlice]['So']
d = numSteps+1
n = dwellNumPts
sicp = 0.5*(n-d)*log(2*pi) + 0.5*(log(n)) + 0.5*(n+nu-d-1)*log(0.5*(n*dwellSigSq+So)) + log(2) - log_factorial(int(round(0.5*(n+nu-d-3))))
#0.5*(n+nu-d-3)*log(0.5*(n+nu-d-3)) + 0.5*(n+nu-d-3) - 0.5*(log(0.5*(n-d-3+nu)))
#(numSteps+1)*(log(2*pi)+1) + log(dwellNumPts) + (dwellNumPts+nu-(numSteps+1)-1)*log(dwellNumPts*dwellSigSq+So) - (dwellNumPts+nu-(numSteps+1)-3)*log(dwellNumPts+nu-(numSteps+1)-3) - log(dwellNumPts+nu-(numSteps+1)-3)
return sicp
dwellList=[]; sicpSlice=[];
for s in range(0,len(slices)): # iterate over each slice, s indexes list
pos=[trace[i]['p'] for i in range(slices[s]['sI'], slices[s]['eI'])] # get position list only within current slice
force=[trace[i]['f'] for i in range(slices[s]['sI'], slices[s]['eI'])] # get force list only within current slice
startIndex=slices[s]['sI']
endIndex=slices[s]['eI']
positionLoc=mean(pos)
forceLoc=mean(force)
sliceLoc=s
dwellSigSq=var(pos); dwellNumPts=(endIndex-startIndex); numSteps=0;
sicpSlice.append(slice_initial_sicp(dwellSigSq, dwellNumPts, numSteps, s))
dwellList.append(dwell_params(startIndex, endIndex, positionLoc, forceLoc, sliceLoc))
del pos; del force; del startIndex; del endIndex; del positionLoc; del forceLoc; del sliceLoc; del dwellSigSq; del dwellNumPts; del numSteps
# clear variable I won't use anymore
Recap of data structures:
Scheme:
In [11]:
def calc_sicp(proposedDwellList, sliceID): # list, list, list, int
'''
'''
# we need to generate list of dwells in given slice
dwellsInSlice = [];
for dwell in range(len(proposedDwellList)):
if proposedDwellList[dwell]['slice'] == sliceID:
dwellsInSlice.append(proposedDwellList[dwell])
# generate list of position lists bounded by each dwell
# generate list of force lists bounded by each dwell
posDwell = []; forceDwell = []; numPtsDwell = []; dwellSigSq = [];
for dwell in range(len(dwellsInSlice)):
posDwell.append([trace[i]['p'] for i in range(dwellsInSlice[dwell]['sI'],dwellsInSlice[dwell]['eI'])])
forceDwell.append([trace[i]['f'] for i in range(dwellsInSlice[dwell]['sI'],dwellsInSlice[dwell]['eI'])])
numPtsDwell.append(abs(dwellsInSlice[dwell]['eI'] - dwellsInSlice[dwell]['sI']))
dwellSigSq.append(var(posDwell[dwell]))
# get (nu, So) for this particular slice
# print dwellsInSlice
n = (dwellsInSlice[-1]['eI']-dwellsInSlice[0]['sI'])
overallSigSq = sum([a*b for a,b in zip(numPtsDwell,dwellSigSq)])/n
nu = slices[sliceID]['nu']
So = slices[sliceID]['So']
d = len(dwellsInSlice)
# calculate sicp for the slice
sicp = 0
# add all components to sicp except for the # dp's per dwell in slice
sicp += 0.5*(n-d)*log(2*pi) + 0.5*(n+nu-d-1)*log(0.5*(n*overallSigSq+So)) + log(2) - log_factorial(int(round(0.5*(n+nu-d-3))))
#0.5*(n+nu-d-3)*log(0.5*(n+nu-d-3)) + 0.5*(n+nu-d-3) - 0.5*log(0.5*(n-d-3+nu))
#d*(log(2*pi)+1) + (n+nu-d-1)*log(n*overallSigSq + So) - (n+nu-d-3)*log(n+nu-d-3) - log(n+nu-d-3)
for dwell in range(len(dwellsInSlice)):
sicp += 0.5*log(numPtsDwell[dwell])
#log(numPtsDwell[dwell]) # now add dp's per dwell component
return sicp # list of sicp's for each slice
In [12]:
def trial_step(sicpList, dwellList, l): # input the list of dwells
'''
'''
for dwell in range(0,len(dwellList)): # loop through all dwells
# print 'sIndex ' + str(dwellList[dwell]['sI']) + ' list item ' + str(l) + ' eIndex ' + str(dwellList[dwell]['eI'])
if dwellList[dwell]['sI'] < l < dwellList[dwell]['eI']: # use if statement to find which dwell to split
# split this dwell by adding step, calculate updated sicp
# then remove the original dwell
leftPos=[trace[i]['p'] for i in range(dwellList[dwell]['sI'], l)] # get position list only within current slice
rightPos=[trace[i]['p'] for i in range(l, dwellList[dwell]['eI'])] # get position list only within current slice
leftForce=[trace[i]['f'] for i in range(dwellList[dwell]['sI'], l)] # get force list only within current slice
rightForce=[trace[i]['f'] for i in range(l, dwellList[dwell]['eI'])] # get force list only within current slice
sliceID = dwellList[dwell]['slice'] # ID slice location of dwell we're splitting
proposedDwellList = list(dwellList)
# print 'l value: ' + str(l) + '...' + 'last dwell list position: ' + str(proposedDwellList[-1])
proposedDwellList.pop(dwell) # error here
proposedDwellList.insert(dwell,{'sI': dwellList[dwell]['sI'],'eI': l,'p': mean(leftPos),'f': mean(leftForce),'slice': sliceID})
proposedDwellList.insert(dwell+1,{'sI': l,'eI': dwellList[dwell]['eI'],'p': mean(rightPos),'f': mean(rightForce),'slice': sliceID})
sicpList.pop(sliceID) # remove the sicp from the slice we added a step in, we have to recalculate this
break # once you find the dwell to split and have removed the previous sicp, break from for loop
sicp = calc_sicp(proposedDwellList, sliceID)
sicpList.insert(sliceID,sicp)
return {'dwell list': proposedDwellList, 'sicp slice list': sicpList, 'sicp total': sum(sicpList)}
In [13]:
def add_step(prevDwellList,sliceSicpList): # finds optimal location for next step
'''
'''
existingDwellIndices = [prevDwellList[i]['sI'] for i in range(0,len(prevDwellList))]
output = []; trialStepSicp = []; trialDwellList = [];
for l in range(0,len(trace)-1): # for every point in the trace,
if l not in existingDwellIndices: # if it's already the location of a step, try one there
trialStepSicp = list(sliceSicpList)
trialDwellList = list(prevDwellList)
output.append(trial_step(trialStepSicp, trialDwellList, l)) # unfinished here, what datastructure will we append to?
trialLocs=sorted(output, key=lambda x: x['sicp total']) # sort trial step locations by sicp
return trialLocs[0] # return the step location that minimizes sicp
Add steps until SICP converges
In [14]:
tmp = add_step(dwellList,sicpSlice)
cnt = 1; sicp=[] #cnt counts the number of fit iterations (prop. to # steps)
while(tmp['sicp total'] < sum(sicpSlice)):
dwellList = tmp['dwell list']
sicpSlice = tmp['sicp slice list']
tmp = add_step(dwellList,sicpSlice)
#print "old: " + str(sum(sicpSlice)) + " new: " + str(sum(tmp['sicp slice list']))
sicp.append([cnt, sum(sicpSlice)])
cnt+=1
Let's take the final fit and plot it on top of the trace.
In [15]:
# these values are the indices of the start and end points of each dwell
# for a real trace they would have to be converted to time
a=[dwellList[i]['sI'] for i in range(len(dwellList))]
b=[dwellList[i]['eI'] for i in range(len(dwellList))]
x = [item for sublist in zip(a,b) for item in sublist]
print x
In [16]:
# these values are the positions of each dwell
a=[dwellList[i]['p'] for i in range(len(dwellList))]
y = [item for sublist in zip(a,a) for item in sublist]
print ['%.2f' %y[i] for i in range(len(dwellList))]
In [17]:
# these values are the positions of each dwell
a=[dwellList[i]['f'] for i in range(len(dwellList))]
z = [item for sublist in zip(a,a) for item in sublist]
print ['%.2f' %z[i] for i in range(len(dwellList))]
Save the fit to file: time force position delimited by ' ' (space)
In [18]:
data = []
data = [[x[i], z[i], y[i]] for i in range(len(x))] #time force position
#modify the name of the input trace file to create fit filename
tmp = list(trace_filename)
tmp.insert(-5,"final_SICP_fit_")
sicpFitFilename = "".join(tmp)
np.savetxt(sicpFitFilename, data, delimiter = ' ') # save the final fit to this filename
Save the sicp vs. iteration to file: iteration sicp delimited by ' ' (space)
In [19]:
data=[]; iteration = [sicp[i][0] for i in range(len(sicp))]; sicpi = [sicp[i][1] for i in range(len(sicp))];
data=[[iteration[i], sicpi[i]] for i in range(len(iteration))] # iteration sicp_val
#modify the name of the input trace file to create fit filename
tmp = list(trace_filename)
tmp.insert(-5,"SICP_progression_")
sicpConvFilename = "".join(tmp)
np.savetxt(sicpConvFilename, data, delimiter = ' ') # save the SICP vs. iteration to this filename
This is the resulting fit (in cyan) with the first term positive
In [20]:
figure(figsize=(20,12))
subplot(2,2,1)
for s in range(0,len(slices)): # iterate over each slice, s indexes slices
force = [trace[i]['p'] for i in range(slices[s]['sI'], slices[s]['eI'])]
time = [trace[i]['t'] for i in range(slices[s]['sI'], slices[s]['eI'])]
plot(time, force); title('Sliced by Force', fontsize=30);
xlabel('$time$', fontsize=30); ylabel('$position$', fontsize=30); tick_params(labelsize=25);
plot(x,y,linewidth=1.5);
subplot(2,2,2)
for s in range(0,len(slices)): # iterate over each slice, s indexes slices
force = [trace[i]['p'] for i in range(slices[s]['sI'], slices[s]['eI'])]
time = [trace[i]['t'] for i in range(slices[s]['sI'], slices[s]['eI'])]
plot(time, force); title('Sliced by Force', fontsize=30);
xlabel('$time$', fontsize=30); ylabel('$position$', fontsize=30); tick_params(labelsize=25);
plot(x,y,linewidth=1.5);
#xlim(750,850); ylim(30,50);
xlim(0,200); ylim(0,40)
subplot(2,2,3)
for s in range(0,len(slices)): # iterate over each slice, s indexes slices
force = [trace[i]['p'] for i in range(slices[s]['sI'], slices[s]['eI'])]
time = [trace[i]['t'] for i in range(slices[s]['sI'], slices[s]['eI'])]
plot(time, force); title('Sliced by Force', fontsize=30);
xlabel('$time$', fontsize=30); ylabel('$position$', fontsize=30); tick_params(labelsize=25);
plot(x,y,linewidth=1.5);
#xlim(750,850); ylim(30,50);
xlim(800,1100); ylim(80,110)
subplot(2,2,4)
for s in range(0,len(slices)): # iterate over each slice, s indexes slices
force = [trace[i]['p'] for i in range(slices[s]['sI'], slices[s]['eI'])]
time = [trace[i]['t'] for i in range(slices[s]['sI'], slices[s]['eI'])]
plot(time, force); title('Sliced by Force', fontsize=30);
xlabel('$time$', fontsize=30); ylabel('$position$', fontsize=30); tick_params(labelsize=25);
plot(x,y,linewidth=1.5);
#xlim(750,850); ylim(30,50);
xlim(600,800); ylim(55,90)
Out[20]:
Let's look at the SICP convergence
In [21]:
plot([sicp[i][0] for i in range(len(sicp))],[sicp[i][1] for i in range(len(sicp))],linewidth=1.5);
tick_params(labelsize=20);
title("First term positive", fontsize=20);
xlabel("Iteration", fontsize=20);
ylabel("SICP", fontsize=20);
#savefig('first_term_positive_SICPvsIteration.png')
Out[21]:
Plot the histogram of step sizes
In [22]:
# generate list of step sizes
ss = []
for i in range(1,len(y)):
if i % 2 == 0:
ss.append(y[i]-y[i-1])
print ['%.2f' % ss[i] for i in range(len(ss))]
In [26]:
hist(ss, bins=int(len(ss)/3), histtype='step');
xlabel("measured step size", fontsize=15);
ylabel("counts", fontsize=15);
Plot histogram of dwell durations
In [24]:
# generate list of dwell durations
dd = []
for i in range(1,len(x)):
if i % 2 != 0:
dd.append(x[i]-x[i-1])
print dd
In [30]:
hist(dd, bins=len(dd)/10, histtype='step');
xlabel("measured dwell duration", fontsize=15);
ylabel("counts", fontsize=15)
Out[30]:
In [25]: