In [701]:
%matplotlib inline
# Create python script for UmTRX Rx quality check
# https://app.asana.com/0/22610823221062/23968246579512
import numpy as np
import matplotlib.pyplot as plt
import scipy.fftpack
import os.path
#$ ~/distr/uhd$ ./build/examples/rx_samples_to_file --nsamps=1024 --freq=900000000 --rate=1300000 --file=1024_900Mhz.dat
#DATA_FILE="/Users/ati/Downloads/umtrx_rx_gsm_good2.dat"
SAMPLE_RATE = 1083333 #Hz
CENTER_FREQ = 9470e5 #Hz
FCCH_FREQ = 67700 #Hz
# download DATA_FILE from https://www.dropbox.com/s/q5qcm1jfnf7zdee/umtrx_rx_test_9468.dat?dl=0
# created with /rx_samples_to_file --file umtrx_rx_test_9468.dat --duration=10 --rate 1083333 --freq 9470e5 --gain 15
DATA_FILE="/Users/ati/Downloads/umtrx_rx_test_9468.dat"
DATA_FILE_TYPE=np.short
def get_src_points():
#return get_points_mock()
return get_points_from_file(DATA_FILE)
def get_points_from_file(filename):
is_interleaved = True
if os.path.isfile(filename):
points = np.fromfile(open(filename, "rb"), dtype=DATA_FILE_TYPE)
if is_interleaved:
new_points = np.array([], dtype=np.complex)
for i in range(0, 2*64*1024 , 2):
new_points = np.append(new_points, points[i+1] + points[i]*1j)
return new_points
else:
return points
else:
print "error opening ", filename
sys.exit(1)
def get_bursts(data_points):
reasonably_large_value = 16383 # points in data file
if len(data_points) < reasonably_large_value:
raise ValueError("Too few data points: ", len(data_points))
N = 15 # running mean window width.
data_points_abs = np.abs(data_points)
mean = np.mean(data_points_abs[1024:4096])
running_th = [1 if x > mean/2.0 else 0 for x in np.convolve(data_points_abs, np.ones((N,))/N)]
i = 0
res = []
in_pause = True
while i < len(data_points):
if in_pause and running_th[i]:
in_pause = False
res.append([])
res[len(res)-1].append(data_points[i])
elif not in_pause and not running_th[i]:
in_pause = True
elif not in_pause and running_th[i]:
res[len(res)-1].append(data_points[i])
i += 1
return res[1:-1],running_th # skip first and last (partial) bursts
# detect fcch burst
def is_fcch(x):
#window = np.hamming(NUM_SRC_POINTS)
#y = np.abs(scipy.fftpack.fft(window*x[0:NUM_SRC_POINTS]))
y = fft(x)
acor = np.correlate(y, y, "same")
acor = acor[len(acor)/2:]
ratio = acor[0.2*len(acor)]/acor[0] # compare acor at window start and start + 20% with 0.1
return ratio < 0.1
def join(a,b):
return max(a,b)
def fft(x):
window = np.hamming(len(x))
return np.abs(scipy.fftpack.fft(x))
def average_spectrum(bursts):
min_samples = min(map(len, bursts))
window = np.hamming(min_samples)
y = fft(window*bursts[0][0:min_samples])
for i in range(1, len(bursts)):
y = map(join, y, fft(window*bursts[i][0:min_samples]))
return y
def get_frequency_offset(fcch_burst):
# find number of bin with maximum value
fcch_spectrum = fft(fcch_burst)
plot_spectrum(fcch_spectrum, "original FCCH spectrum")
bin_freq_width = 1.0*SAMPLE_RATE/len(fcch_spectrum)
print "bin_freq_width = %f"%bin_freq_width
#return 0
return -1.0*(bin_freq_width*(np.argmax(fcch_spectrum)) + FCCH_FREQ)
def shift_spectrum(burst, freq_offset):
exp_coeff = 2.0*np.pi*(0+1j)*freq_offset/SAMPLE_RATE
return [burst[idx] * np.exp(exp_coeff*idx) for idx in range(0, len(burst))]
def show_iq(bursts, fcch_burst):
freq_offset = get_frequency_offset(fcch_burst)
print "FCCH frequency offset: %f" % freq_offset
plot_spectrum(fft(shift_spectrum(fcch_burst, freq_offset)), "Shifted FCCH spectrum")
for i in range(0, len(bursts)):
bursts[i] = shift_spectrum(bursts[i], freq_offset)
#burst_number = 0
#plot_spectrum(fft(bursts[burst_number]), "shifted spectrum for burst %d"%burst_number)
plot_xy([point for burst in bursts for point in burst], "I/Q constellation")
def main():
src_points = get_src_points()
show_cnt = 2048
plot_time(src_points[0:show_cnt], "%d source samples"%show_cnt)
bursts, envelope = get_bursts(src_points)
print "Found %d bursts." % len(bursts)
plt.plot(np.array(envelope[0:show_cnt])*15000, c="r")
avg_s = average_spectrum(bursts[0:32])
plot_spectrum(avg_s, "Average spectrum of 32 bursts")
# find FCCH burst. Guaranteed if src №of frames >= 81
fcch_burst = next(burst for burst in bursts if is_fcch(burst)) # raises StopIteration if no burst found
print "Found FCCH burst."
#plot_time(fcch_burst, "FCCH burst")
show_iq(bursts[27:28], fcch_burst)
def plot_spectrum(y, title):
plt.figure(figsize=(19,4))
plt.title(title + " (logarythmic)")
plt.grid(True)
bin_width = 1.0*SAMPLE_RATE/len(y)
x_vals = np.array(np.linspace(0.0, len(y), num=len(y))*bin_width + CENTER_FREQ)
#print "len_x = %d, len_y = %d" % (len(x_vals), len(y))
plt.plot(x_vals, np.log10(np.array(y)/len(y)), c="b")
return True
def plot_time(y, title):
plt.figure(figsize=(19,4))
plt.title(title)
plt.plot(y, c="g")
return True
def plot_xy(xy, title):
plt.figure(figsize=(5,5))
plt.title(title)
plt.scatter(np.real(xy), np.imag(xy), c="r")
return True
main()
In [ ]: