John A. Marohn (jam99@cornell.edu)
Department of Chemistry and Chemical Biology
Cornell University
Ithaca, NY USA; 14853-1301
2014/06/30 -- 2014/07/01
We generate a frequency-modulated sine-wave signal and use freqdemod
to recover the signal's frequency vs time.
In [1]:
import os, sys
module_path = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(module_path)
Now we are ready to load the module:
In [2]:
from freqdemod import Signal
Set the digitization frequency to 100 kHz, corresponding to a time step of 10 us. We want a signal with quite a few points, say 128k = 128 x 1024 = 131072 points. The duration of the signal will thus be 131072 x 10E-6 = 1.31072 seconds.
The sine wave has an initial frequency of 4 kHz. Between 0.25s and 0.75s, the frequency is ramped linearly to 6 kHz. It remains 6 kHz until the end of the signal.
In [3]:
import math
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
Set up the frequency-ramped sine wave. This takes quite a bit of care!
In [4]:
fd = 100E3
dt = 1/fd
f_start = 4.000E3
f_end = 6.000E3
# time array
t1 = dt*np.arange(int(round(0.25/dt)))
t2 = dt*np.arange(int(round((0.75-0.25)/dt)))
t3 = dt*np.arange(128*1024-t1.size-t2.size)
t2plus = t2+t1[-1]+dt
t3plus = t3+t2plus[-1]+dt
t = np.append(np.append(t1,t2plus),t3plus)
# frequency array
f1 = f_start*np.ones(t1.size)
f2 = f_start + t2*(f_end-f_start)/(t2[-1]-t2[0])
f3 = f_end*np.ones(t3.size)
f = np.append(np.append(f1,f2),f3)
# phase accumulator
p = np.zeros(t.size)
p[0] = 0.0
for k in np.arange(1,t.size):
p[k] = p[k-1] + dt*f[k-1]
p = 2*np.pi*p
# the signal
x = np.cos(p)
Now check out the arrays
In [5]:
print "total number of time points: ", t.size
print "number of points in t1: ", t1.size
print "number of points in t2: ", t2.size
print "number of points in t3: ", t3.size
print "number of points in f: ", f.size
In [6]:
print "time array ==>"
print " near first transition: ", 1E6*t[t1.size-3:t1.size+3]
print " near second transition", 1E6*t[t1.size+t2.size-3:t1.size+t2.size+3]
print " at end = ", 1E6*t[-3:]
In [7]:
print "frequency ramp start: ", f[0:3]
print "frequency ramp end: ", f[-3:]
In [8]:
print "phase array ==>"
print " near first transition: ", p[t1.size-3:t1.size+3]/(2*np.pi)
print " near second transition", p[t1.size+t2.size-3:t1.size+t2.size+3]/(2*np.pi)
print " at end = ", 1E6*p[-3:]
In [9]:
plt.plot(t,f)
plt.ylim([3E3,7E3])
plt.show()
plt.plot(t,p)
plt.show()
trans1 = list(np.arange(t1.size-20,t1.size+20))
plt.plot(1E6*t[trans1],x[trans1],"o-")
plt.show()
trans2 = list(np.arange(t1.size+t2.size-20,t1.size+t2.size+20))
plt.plot(1E6*t[trans2],x[trans2],"o-")
plt.show()
In [10]:
S = Signal(x,"x","nm",1/fd)
S.binarate("middle")
S.window(1.0E-3)
S.fft()
S.filter(bw=5.0E3)
S.ifft()
S.trim()
print(S)
In [11]:
plt.plot(S.signal['t']*1E6,S.signal['theta'])
plt.xlim(0,S.signal['t_original'][-1]*1E6)
plt.ylabel("phase [cycles]")
plt.xlabel("t [us]")
plt.show()
plt.plot(S.signal['t']*1E6,S.signal['a'])
plt.xlim(0,S.signal['t_original'][-1]*1E6)
plt.ylabel("amplitude")
plt.xlabel("t [us]")
plt.show()
Pick out 1000 points of data near the beginning and near the end. Fit the phase vs time data to a line. We see that we recover the expected intial and final frequencies.
In [12]:
zf = np.polyfit(S.signal['t'][1000:2000], S.signal['theta'][1000:2000], 1)
print "Best-fit frequency = {0:.3f} Hz".format(zf[0])
zf = np.polyfit(S.signal['t'][-2000:-1000], S.signal['theta'][-2000:-1000], 1)
print "Best-fit frequency = {0:.3f} Hz".format(zf[0])
Some points have been removed from the array to eliminate filter ripple. We can see there are fewer points than the original 131072 points.
In [13]:
S.signal['theta'].size
Out[13]:
We want to break the data into 10 ms chunks and fit each chunk to a line. We thus want to break the data into sub-arrays of length
In [14]:
ct = int(round(10.0E-3/dt))
print ct
Each chunk should be 1000 points long and there should be 131 of them:
In [15]:
n = ct*int(round(S.signal['theta'].size/ct))
s_sub = S.signal['theta'][0:n].reshape((n/ct,ct))
print s_sub.shape
Now set up two time arrays.
The first array, t_chunk
, is the time array we will use as the x-axis for curve fitting. The initial element of this array will not be zero because of the trimming.
The second array, t_fit
, the a time array will will plot the curve-fit results against.
In [16]:
t_chunck = S.signal['t'][0:ct]
print "t_chunck.shape = ", t_chunck.shape
print "first and last t_chunck values [us]: ", \
1E6*np.array([t_chunck[0], t_chunck[-1]])
t_fit = S.signal['t'][0:n:ct]
print "t_fit.shape = ", t_fit.shape
print "first few and last few t_fit values [us]: ", \
1E6*np.array([t_fit[0], t_fit[1], t_fit[2], t_fit[-1]])
In [17]:
a = s_sub[:,:,np.newaxis][:,0,:]
b = np.ones(ct)
c = a*b
print a.shape
print b.shape
print c.shape
print c[0,0:3]
print c[1,0:3]
In [18]:
plt.subplot(131)
plt.plot(1E3*t_chunck,s_sub[0,:])
plt.ylim([0,150])
plt.subplot(132)
plt.plot(1E3*t_chunck,s_sub[1,:])
plt.ylim([0,150])
plt.subplot(133)
plt.plot(1E3*t_chunck,s_sub[2,:])
plt.ylim([0,150])
plt.show()
a = s_sub[:,:,np.newaxis][:,0,:]
b = np.ones(ct)
c = a*b
s_sub_reset = s_sub - c
plt.subplot(131)
plt.plot(1E3*t_chunck,s_sub_reset[0,:])
plt.ylim([0,150])
plt.subplot(132)
plt.plot(1E3*t_chunck,s_sub_reset[1,:])
plt.ylim([0,150])
plt.subplot(133)
plt.plot(1E3*t_chunck,s_sub_reset[2,:])
plt.ylim([0,150])
plt.show()
In [19]:
print s_sub_reset.shape
print s_sub_reset[:,0].shape
Curve fit Method 1.
In [20]:
def myfunction(x):
return np.polyfit(t_chunck, x, 1)[0]
import time
start = time.time()
zf = np.apply_along_axis(myfunction, axis=1, arr=s_sub_reset)
stop = time.time()
print "Time to execute curve fit = {0:.3f} ms".format(1E3*(stop-start))
Curve fit Method 2.
In [21]:
import time
zf = np.zeros(s_sub_reset[:,0].size)
start = time.time()
for k in np.arange(s_sub_reset[:,0].size):
zf[k] = np.polyfit(t_chunck, s_sub_reset[k,:], 1)[0]
stop = time.time()
print "Time to execute curve fit = {0:.3f} ms".format(1E3*(stop-start))
Plot the curve fit results
In [22]:
plt.plot(t_fit,zf)
plt.xlabel("t [us]")
plt.ylabel("frequency [Hz]")
plt.show()
In [23]:
dt_chunk_target = 1000.07E-6
n_per_chunk = int(round(dt_chunk_target/dt))
dt_chunk = dt*n_per_chunk
n_tot_chunk = int(round(S.signal['theta'].size/n_per_chunk))
n_total = n_per_chunk*n_tot_chunk
print "points per chunk =", n_per_chunk
print "time per chunk = {0:.3f} us".format(1E6*dt_chunk)
print "number of chunks =", n_tot_chunk
print "number of points analyzed =", n_total
Now rearrange the arrays, first the phase, then time.
In [24]:
s_sub = S.signal['theta'][0:n_total].reshape((n_tot_chunk,n_per_chunk))
print "rearranging the phase into an array of shape {}".format(s_sub.shape)
a = s_sub[:,:,np.newaxis][:,0,:]
b = np.ones(n_per_chunk)
c = a*b
s_sub_reset = s_sub - c
plt.plot(s_sub_reset.flatten()[0:n_per_chunk*20])
plt.xlabel("array index")
plt.ylabel("phase [cyeles]")
plt.show()
In [25]:
t_sub = S.signal['t'][0:n_total].reshape((n_tot_chunk,n_per_chunk))
print "rearranging the time into an array of shape {}".format(t_sub.shape)
a = t_sub[:,:,np.newaxis][:,0,:]
b = np.ones(n_per_chunk)
c = a*b
t_sub_reset = t_sub - c
plt.plot(1E6*t_sub_reset.flatten()[0:n_per_chunk*20])
plt.xlabel("array index")
plt.ylabel("time [us]")
plt.show()
Impliment a least-squares fit "by hand". We just want the slope:
In [26]:
SXY = np.sum(t_sub_reset*s_sub_reset,axis=1)
SX = np.sum(t_sub_reset,axis=1)
SY = np.sum(s_sub_reset,axis=1)
SXX = np.sum(t_sub_reset*t_sub_reset,axis=1)
slope = (n_per_chunk*SXY-SX*SY)/(n_per_chunk*SXX-SX*SX)
print "the slope array has a shape of {0}".format(slope.shape)
Plot the result -- nice!
In [27]:
plt.plot(t_sub[:,0],slope)
plt.xlabel("t [s]")
plt.ylabel("frequency [Hz]")
plt.show()
Now put the whole procedure together and time it. To be fair, we will not include the setup, just the fitting part of the procedure. The speed up using this brute-force procedure is approximately a factor of 5. We do have to store many more arrays.
In [28]:
import time
start = time.time()
s_sub = S.signal['theta'][0:n_total].reshape((n_tot_chunk,n_per_chunk))
s_sub_reset = s_sub - s_sub[:,:,np.newaxis][:,0,:]*np.ones(n_per_chunk)
t_sub = S.signal['t'][0:n_total].reshape((n_tot_chunk,n_per_chunk))
t_sub_reset = t_sub - t_sub[:,:,np.newaxis][:,0,:]*np.ones(n_per_chunk)
SXY = np.sum(t_sub_reset*s_sub_reset,axis=1)
SX = np.sum(t_sub_reset,axis=1)
SY = np.sum(s_sub_reset,axis=1)
SXX = np.sum(t_sub_reset*t_sub_reset,axis=1)
slope = (n_per_chunk*SXY-SX*SY)/(n_per_chunk*SXX-SX*SX)
stop = time.time()
print "Time to execute curve fit = {0:.3f} ms".format(1E3*(stop-start))
We want to compare the best-fit frequency to the known frequency. We have to evaluate the known frequency at just the right time. This is tricky -- we have to account for the filter delay.
Take a look at size of the original arrays and the chunk-ed down arrays:
In [29]:
# the original data arrays
print t.shape
print f.shape
In [30]:
# the best-fit arrays
print t_sub[:,0].shape
print slope.shape
In [31]:
# the first few starting times of the chunked-down time array
print t_sub[0,0]
print t_sub[1,0]
We want to extract the known frequencies at the times when t == t_sub[:,0]
. How to do this? We know the time offset due to the filter,
In [32]:
S.signal['td']
Out[32]:
which we can convert to an array index
In [33]:
id = int(S.signal['td']/S.signal['dt'])
print "delay index =",id
As a check, let us recreate the chunked time axis from this known delay and the already computed n_per_chunk
.
In [34]:
t_theory = t[id:-1:n_per_chunk][0:n_tot_chunk]
t_theory.size
Out[34]:
We can see that the intial times in the rccreated array agree exactly with the data in the chunked time array. Good -- this tells us we have chosen the correct array offsets.
In [35]:
print t_sub[0:3,0]
print t_theory[0:3]
Now obtain the expected slope by evaluating the known slope at the same array offests. We can now compare the known frequency to the measured frequency.
In [36]:
slope_theory = f[id:-1:n_per_chunk][0:n_tot_chunk]
plt.plot(t_theory,slope_theory,'k-')
plt.plot(t_sub[:,0],slope,'b-')
plt.xlabel("t [s]")
plt.ylabel("frequency [Hz] (theor. and meas.)")
plt.show()
resid = slope - slope_theory
plt.plot(t_theory,resid)
plt.xlabel("t [s]")
plt.ylabel("frequency residual [Hz]")
plt.show()
T_total = dt_chunk*np.ones(resid.size).sum()
mean_square_error = (dt_chunk*resid*resid).sum()/T_total
print "integration period = {} s".format(T_total)
print "rms frequency error = {0} Hz".format(math.sqrt(mean_square_error))
It is hard to see the difference between the calculated and the known frequency. Computing the residual error is more revealing. Plotting the residual error we see a large 2 Hz error in the measured frequency during the ramp. Given that there is no noise in the data, this error is significant: 2/4000 = 0.05 percent. Surprisingly, the difference between the measured and the known frequency is constant during the ramp.
...
Ok, paper-and-pencil calculations tell me that the least squares fit really measures the frequency at the center of the integration window. To account for this correction, we need to compare to the known frequency at an array index n_per_chunk/2
later than we did above. Adding a -1
shift to the time index further improves the agreement.
Making these two corrections -- the expected n_per_chunk/2
shift and the empirical -1
shift -- the agreement improves dramatically. The rms error drops from 1.2 Hz to 0.002 Hz.
In [37]:
slope_theory = f[id-1+n_per_chunk/2:-1:n_per_chunk][0:n_tot_chunk]
plt.plot(t_theory,slope_theory,'k-')
plt.plot(t_sub[:,0],slope,'b-')
plt.xlabel("t [s]")
plt.ylabel("frequency [Hz] (theor. and meas.)")
plt.show()
resid = slope - slope_theory
plt.plot(t_theory,1E6*resid)
plt.ylim([-0.1,0.1])
plt.xlabel("t [s]")
plt.ylabel("frequency residual [uHz]")
plt.show()
T_total = dt_chunk*np.ones(resid.size).sum()
mean_square_error = (dt_chunk*resid*resid).sum()/T_total
print "integration period = {} s".format(T_total)
print "rms frequency error = {0} Hz".format(math.sqrt(mean_square_error))
Care must be taken to generate the oscillating signal when the frequency is a function of time. Above I used a phase-advance to construct the oscillating signal.
We can speed up the curve fitting used to extract the frequency by a factor of 5 or more by implementing our own least-squares slope calculation.
For a linear frequency ramp, the agreement between measured and expected frequency is excellent, provided one compares the measured frequency to the known frequency evaluated at the middle of the fitting window.
In [46]:
import time
start = time.time()
s_sub = S.signal['theta'][0:n_total].reshape((n_tot_chunk,n_per_chunk))
s_sub_reset = s_sub - s_sub[:,:,np.newaxis][:,0,:]*np.ones(n_per_chunk)
t_sub = S.signal['t'][0:n_total].reshape((n_tot_chunk,n_per_chunk))
t_sub_reset = t_sub - t_sub[:,:,np.newaxis][:,0,:]*np.ones(n_per_chunk)
SXY = np.sum(t_sub_reset*s_sub_reset,axis=1)
SX = np.sum(t_sub_reset,axis=1)
SY = np.sum(s_sub_reset,axis=1)
SXX = np.sum(t_sub_reset*t_sub_reset,axis=1)
slope = (n_per_chunk*SXY-SX*SY)/(n_per_chunk*SXX-SX*SX)
stop = time.time()
print "Time to execute curve fit = {0:.3f} ms".format(1E3*(stop-start))
Since the time data are equally spaced, we can calculate the SX
and SXX
sums by hand in closed form. The measured and calculated values agree exactly.
In [47]:
print "SX measured = {}".format(SX[0])
SXcalc = (S.signal['dt'])*0.50*(n_per_chunk-1)*(n_per_chunk)
print "SX calculated = {}".format(SXcalc)
In [48]:
print "SXX measured = {}".format(SXX[0])
SXXcalc = (S.signal['dt'])**2*(1/6.0)*\
(n_per_chunk)*(n_per_chunk-1)*(2*n_per_chunk-1)
print "SXX calculated = {}".format(SXXcalc)
We can go back and further simplify the curve fitting by using the calculated values of SX
and SXX
.
In [49]:
import time
start = time.time()
s_sub = S.signal['theta'][0:n_total].reshape((n_tot_chunk,n_per_chunk))
s_sub_reset = s_sub - s_sub[:,:,np.newaxis][:,0,:]*np.ones(n_per_chunk)
t_sub = S.signal['t'][0:n_total].reshape((n_tot_chunk,n_per_chunk))
t_sub_reset = t_sub - t_sub[:,:,np.newaxis][:,0,:]*np.ones(n_per_chunk)
# SX = np.sum(t_sub_reset,axis=1)
# SXX = np.sum(t_sub_reset*t_sub_reset,axis=1)
SX = (S.signal['dt'])*0.50*(n_per_chunk-1)*(n_per_chunk)
SXX = (S.signal['dt'])**2*(1/6.0)*\
(n_per_chunk)*(n_per_chunk-1)*(2*n_per_chunk-1)
SY = np.sum(s_sub_reset,axis=1)
SXY = np.sum(t_sub_reset*s_sub_reset,axis=1)
slope = (n_per_chunk*SXY-SX*SY)/(n_per_chunk*SXX-SX*SX)
stop = time.time()
print "Time to execute curve fit = {0:.3f} ms".format(1E3*(stop-start))
Surprising, not much of an additional speed up. maybe 10 or 20 percent ... There is a memory reduction, since there are two fewer large arrays to store. Therefore, use the the calculated SX
and SXX
.