Pulsar searching is a very compute-intensive task. Searching for repeating signals within noisy data is difficult because pulses tend to have a low signal to noise ratio. Our goal is to process, clean, identify potential periods, and fold the pulses to increase the SNR. This notebook demonstrates the algorithms used for searching regular pulses within radio spectrograms.
Keep in mind, there are faster algorithms used in state-of-the-art pulsar search pipelines [ex. Tree dedispersion]. This notebook implements the simplest pulsar searching technique.
First, we start with downloading the data and BLIMPY which is an I/O tool developed to interface with the radio data.
Note: Run this notebook in COLAB as some operations are resource-intensive. For example, downloading and loading +5GB files into RAM. This notebook is not optimized.
In [0]:
!pip install blimpy
# Pulsar data
!wget http://blpd13.ssl.berkeley.edu/borisov/AGBT19B_999_124/spliced_blc40414243444546o7o0515253545556o7o061626364656667_guppi_58837_86186_PSR_B0355+54_0013.gpuspec.8.0001.fil
# For more info on pulsar searches check out this deck
# http://ipta.phys.wvu.edu/files/student-week-2017/IPTA2017_KuoLiu_pulsartiming.pdf
First, we load the data. NOTE, targets with the starting name of PSR are radio scans of known pulsars PSR_B0355+54_0013. But, files with HIP65960 cataloged targets that shouldn't have pulsar characteristics. If you wish to learn more about the data check out https://ui.adsabs.harvard.edu/abs/2019PASP..131l4505L/abstract
The header information gives vital information about the observational setup of the telescope. For example, the coarse channel width or the observation time and duration, etc.
In [0]:
from blimpy import Waterfall
import pylab as plt
import numpy as np
import math
from scipy import stats, interpolate
from copy import deepcopy
%matplotlib inline
obs = Waterfall('/content/spliced_blc40414243444546o7o0515253545556o7o061626364656667_guppi_58837_86186_PSR_B0355+54_0013.gpuspec.8.0001.fil',
t_start=0,t_stop= 80000,max_load=10)
obs.info()
# Loads data into numpy array
data = obs.data
data.shape
coarse_channel_width = np.int(np.round(187.5/64/abs(obs.header['foff'])))
# Here we plot the integrated signal over time.
obs.plot_spectrum()
In [0]:
fig = plt.figure(figsize=(10,8))
plt.title('Spectrogram With Bandpass')
plt.xlabel("Fchans")
plt.ylabel("Time")
plt.imshow(data[:3000,0,1500:3000], aspect='auto')
plt.colorbar()
Out[0]:
The goal of this process is to clean the data of its artifacts created by combining multiple bands. Our data is created by taking sliding windows of the raw voltage data and computing an FFT of that sliding window. With these FFTs (each containing frequency information about a timestamp) for each coarse channel, we use a bandpass filter to cut off frequencies that don’t belong to that coarse channel’s frequency range. But we can’t achieve a perfect cut, and that’s why there's a falling off at the edges.
They’re called band-pass because they only allow signals in a particular frequency range, called a band, to pass-through. When we assemble the products we see these dips in the spectrogram. In other words - they aren't real signals.
To remove the bandpass features, we use spline lines to fit each channel to get a model of the bandpass of that channel. By using splines, we can fit the bandpass without fitting the more significant signals.
If you want more details on this check out https://github.com/FX196/SETI-Energy-Detection for a detailed explanation.
In [0]:
average_power = np.zeros((data.shape[2]))
shifted_power = np.zeros((int(data.shape[2]/8)))
x=[]
spl_order = 2
print("Fitting Spline")
data_adjust = np.zeros(data.shape)
average_power = data.mean(axis=0)
# Note the value 8 is the COARSE CHANNEL WIDTH
# We adjust each coarse channel to correct the bandpass artifacts
for i in range(0, data.shape[2], 8):
average_channel = average_power[0,i:i+8]
x = np.arange(0,coarse_channel_width,1)
knots = np.arange(0, coarse_channel_width, coarse_channel_width//spl_order+1)
tck = interpolate.splrep(x, average_channel, s=knots[1:])
xnew = np.arange(0, coarse_channel_width,1)
ynew = interpolate.splev(xnew, tck, der=0)
data_adjust[:,0,i:i+8] = data[:,0,i:i+8] - ynew
plt.figure()
plt.plot( data_adjust.mean(axis=0)[0,:])
plt.title('Spline Fit - adjusted')
plt.xlabel("Fchans")
plt.ylabel("Power")
fig = plt.figure(figsize=(10,8))
plt.title('After bandpass correction')
plt.imshow(data_adjust[:3000,0,:], aspect='auto')
plt.colorbar()
Out[0]:
When pulses reach Earth they reach the observer at different times due to dispersion. This dispersion is the result of the interstellar medium causing time delays. This creates a "swooping curve" on the radio spectrogram instead of plane waves. If we are going to fold the pulses to increase the SNR then we're making the assumption that the pulses arrive at the same time. Thus we need to correct the dispersion by shifting each channel down a certain time delay relative to its frequency channel. We index a frequency column in the spectrogram. Then we split it between a time delay and original data and swap the positions.
However, the problem is, we don't know the dispersion measure DM of the signal. The DM is the path integral of the signal through the interstellar medium with an electron density measure of.
$$DM =\int_0^d n_e dl$$
What we do is we brute force the DM by executing multiple trials DMs and we take the highest SNR created by the dedispersion with the given trial DM.
In [0]:
def delay_from_DM(DM, freq_emitted):
if (type(freq_emitted) == type(0.0)):
if (freq_emitted > 0.0):
return DM / (0.000241 * freq_emitted * freq_emitted)
else:
return 0.0
else:
return Num.where(freq_emitted > 0.0,
DM / (0.000241 * freq_emitted * freq_emitted), 0.0)
def de_disperse(data,DM,fchan,width,tsamp):
clean = deepcopy(data)
for i in range(clean.shape[1]):
end = clean.shape[0]
freq_emitted = i*width+ fchan
time = int((delay_from_DM(DM, freq_emitted))/tsamp)
if time!=0 and time<clean.shape[0]:
# zero_block = np.zeros((time))
zero_block = clean[:time,i]
shift_block = clean[:end-time,i]
clean[time:end,i]= shift_block
clean[:time,i]= zero_block
elif time!=0:
clean[:,i]= np.zeros(clean[:,i].shape)
return clean
def DM_can(data, data_base, sens, DM_base, candidates, fchan,width,tsamp ):
snrs = np.zeros((candidates,2))
for i in range(candidates):
DM = DM_base+sens*i
data = de_disperse(data, DM, fchan,width,tsamp)
time_series = data.sum(axis=1)
snrs[i,1] = SNR(time_series)
snrs[i,0] =DM
if int((delay_from_DM(DM, fchan))/tsamp)+1 > data.shape[0]:
break
if i %1==0:
print("Candidate "+str(i)+"\t SNR: "+str(round(snrs[i,1],4)) + "\t Largest Time Delay: "+str(round(delay_from_DM(DM, fchan), 6))+' seconds'+"\t DM val:"+ str(DM)+"pc/cm^3")
data = data_base
return snrs
In [0]:
# Functions to determine SNR and TOP candidates
def SNR(arr):
index = np.argmax(arr)
average_noise = abs(arr.mean(axis=0))
return math.log(arr[index]/average_noise)
def top(arr, top = 10):
candidate = []
# Delete the first and second element fourier transform
arr[0]=0
arr[1]=0
for i in range(top):
# We add 1 as the 0th index = period of 1 not 0
index = np.argmax(arr)
candidate.append(index+1)
arr[index]=0
return candidate
The computer now checks multiple DM values and adjust each frequency channel where it records its SNR. We increment the trial DM by a tunable parameter sens. After the trials, we take the largest SNR created by adjusting the time delays. We use that data to perform the FFT's and record the folded profiles.
In [0]:
small_data = data_adjust[:,0,:]
data_base = data_adjust[:,0,:]
sens =0.05
DM_base = 6.4
candidates = 50
fchan = obs.header['fch1']
width = obs.header['foff']
tsamp = obs.header['tsamp']
fchan = fchan+ width*small_data.shape[1]
snrs = DM_can(small_data, data_base, sens, DM_base, candidates, fchan, abs(width),tsamp)
plt.plot(snrs[:,0], snrs[:,1])
plt.title('DM values vs SNR')
plt.xlabel("DM values")
plt.ylabel("SNR of Dedispersion")
Out[0]:
In [0]:
DM = snrs[np.argmax(snrs[:,1]),0]
print(DM)
fchan = fchan+ width*small_data.shape[1]
data_adjust[:,0,:] = de_disperse(data_adjust[:,0,:], DM, fchan,abs(width),tsamp)
fig = plt.figure(figsize=(10, 8))
plt.imshow(data_adjust[:,0,:], aspect='auto')
plt.title('De-dispersed Data')
plt.xlabel("Fchans")
plt.ylabel("Time")
plt.colorbar()
plt.show()
Next, we apply the discrete Fourier transform on the data to detect periodic pulses. To do so, we look for the greatest magnitude of the Fourier transform. This indicates potential periods within the data. Then we check for consistency by folding the data by the period which the Fourier transform indicates.
The folding algorithm is simple. You take each period and you fold the signals on top of itself. If the period you guessed matches the true period then by the law of superposition it will increase the SNR. This spike in signal to noise ratio appears in the following graph. This algorithm is the following equation.
In [0]:
# Preforming the fourier transform.
%matplotlib inline
import scipy.fftpack
from scipy.fft import fft
N = 1000
T = 1.0 / 800.0
x = np.linspace(0.0, N*T, N)
y = abs(data_adjust[:,0,:].mean(axis=1))
yf = fft(y)
xf = np.linspace(0.0, 1.0/(2.0*T), N//2)
# Magintude of the fourier transform
# Between 0.00035 and 3.5 seconds
mag = np.abs(yf[:60000])
candidates = top(mag, top=15)
plt.plot(2.0/N * mag[1:])
plt.grid()
plt.title('Fourier Transform of Signal')
plt.xlabel("Periods")
plt.ylabel("Magnitude of Fourier Transform")
plt.show()
print("Signal To Noise Ratio for the Fourier Transform is: "+str(SNR(mag)))
print("Most likely Candidates are: "+str(candidates))
The idea of the folding algorithm is to see if the signal forms a consistent profile as you fold/integrate the values together. If the profile appears consistent/stable then you're looking at an accurate reading of the pulsar's period. This confirms the implications drawn from the Fourier transform. This is profiling the pulsar. When folding the pulses it forms a "fingerprint" of the pulsar. These folds are unique to the pulsar detected.
$$s_j = \sum^{N/P-1}_{K=0} D_{j+kP} $$We are suming over the regular intervals of period P. This is implemented below.
In [0]:
# Lets take an example of such a period!
# The 0th candidate is the top ranked candidate by the FFT
period = 895
fold = np.zeros((period, data.shape[2]))
multiples = int(data.data.shape[0]/period)
results = np.zeros((period))
for i in range(multiples-1):
fold[:,:]=data_adjust[i*period:(i+1)*period,0,:]+ fold
results = fold.mean(axis=1)
results = results - results.min()
results = results / results.max()
print(SNR(results))
plt.plot(results)
plt.title('Folded Signal Profile With Period: '+str(round(period*0.000349,5)))
plt.xlabel("Time (Multiples of 0.00035s)")
plt.ylabel("Normalized Integrated Signal")
Out[0]:
In [0]:
# Lets take an example of such a period!
# The 0th candidate is the top ranked candidate by the FFT
can_snr =[]
for k in range(len(candidates)):
period = candidates[k]
fold = np.zeros((period, data.shape[2]))
multiples = int(data.data.shape[0]/period)
results = np.zeros((period))
for i in range(multiples-1):
fold[:,:]=data[i*period:(i+1)*period,0,:]+ fold
results = fold.mean(axis=1)
results = results - results.min()
results = results / results.max()
can_snr.append(SNR(results))
# print(SNR(results))
print("Max SNR of Fold Candidates: "+ str(max(can_snr)))
In [0]:
# Generates multiple images saved to create a GIF
from scipy import stats
data = data
period = candidates[0]
fold = np.zeros((period, data.shape[2]))
multiples = int(data.data.shape[0]/period)
results = np.zeros((period))
for i in range(multiples-1):
fold[:,:]=data[i*period:(i+1)*period,0,:]+ fold
results = fold.mean(axis=1)
results = results - results.min()
results = results / results.max()
# Generates multiple frames of the graph as it folds!
plt.plot(results)
plt.title('Folded Signal Period '+str(period*0.000349)+" seconds| Fold Iteration: "+str(i))
plt.xlabel("Time (Multiples of 0.00035s)")
plt.ylabel("Normalized Integrated Signal")
plt.savefig('/content/drive/My Drive/Deeplearning/Pulsars/output/candidates/CAN_3/multi_chan_'+str(period)+'_'+str(i)+'.png')
plt.close()
results = fold.mean(axis=1)
results = results - results.min()
results = results / results.max()
print("The Signal To Noise of the Fold is: "+str(SNR(results)))
plt.plot(results)
Out[0]:
Below we will show you that this algorithm detects pulses and excludes targets that do not include this feature. We will do so by loading a target that isn't known to be a pulsar. HIP65960 is a target that doesn't contain repeating signals.
Below we will repeat and apply the same algorithm but on a target that isn't a pulsar. We won't reiterate the explanations again.
In [0]:
!wget http://blpd13.ssl.berkeley.edu/dl/GBT_58402_66282_HIP65960_time.h5
In [0]:
from blimpy import Waterfall
import pylab as plt
import numpy as np
import math
from scipy import stats, interpolate
%matplotlib inline
obs = Waterfall('/content/GBT_58402_66282_HIP65960_time.h5',
f_start=0,f_stop= 361408,max_load=5)
obs.info()
# Loads data into numpy array
data = obs.data
coarse_channel_width = np.int(np.round(187.5/64/abs(obs.header['foff'])))
obs.plot_spectrum()
In [0]:
average_power = np.zeros((data.shape[2]))
shifted_power = np.zeros((int(data.shape[2]/8)))
x=[]
spl_order = 2
print("Fitting Spline")
data_adjust = np.zeros(data.shape)
average_power = data.mean(axis=0)
# Note the value 8 is the COARSE CHANNEL WIDTH
# We adjust each coarse channel to correct the bandpass artifacts
for i in range(0, data.shape[2], coarse_channel_width):
average_channel = average_power[0,i:i+coarse_channel_width]
x = np.arange(0,coarse_channel_width,1)
knots = np.arange(0, coarse_channel_width, coarse_channel_width//spl_order+1)
tck = interpolate.splrep(x, average_channel, s=knots[1:])
xnew = np.arange(0, coarse_channel_width,1)
ynew = interpolate.splev(xnew, tck, der=0)
data_adjust[:,0,i:i+coarse_channel_width] = data[:,0,i:i+coarse_channel_width] - ynew
In [0]:
from copy import deepcopy
small_data = data[:,0,:]
data_base = data[:,0,:]
sens =0.05
DM_base = 6.4
candidates = 50
fchan = obs.header['fch1']
width = obs.header['foff']
tsamp = obs.header['tsamp']
# fchan = fchan+ width*small_data.shape[1]
fchan = 7501.28173828125
snrs = DM_can(small_data, data_base, sens, DM_base, candidates, fchan, abs(width),tsamp)
plt.plot(snrs[:,0], snrs[:,1])
plt.title('DM values vs SNR')
plt.xlabel("DM values")
plt.ylabel("SNR of Dedispersion")
Out[0]:
In [0]:
DM = snrs[np.argmax(snrs[:,1]),0]
print(DM)
fchan = fchan+ width*small_data.shape[1]
data_adjust[:,0,:] = de_disperse(data_adjust[:,0,:], DM, fchan,abs(width),tsamp)
In [0]:
# Preforming the fourier transform.
%matplotlib inline
import scipy.fftpack
from scipy.fft import fft
N = 60000
T = 1.0 / 800.0
x = np.linspace(0.0, N*T, N)
y = data[:,0,:].mean(axis=1)
yf = fft(y)
xf = np.linspace(0.0, 1.0/(2.0*T), N//2)
# Magintude of the fourier transform
# Between 0.00035 and 3.5 seconds
# We set this to a limit of 200 because
# The total tchan is only 279
mag = np.abs(yf[:200])
candidates = top(mag, top=15)
plt.plot(2.0/N * mag[1:])
plt.grid()
plt.title('Fourier Transform of Signal')
plt.xlabel("Periods")
plt.ylabel("Magnitude of Fourier Transform")
plt.show()
print("Signal To Noise Ratio for the Fourier Transform is: "+str(SNR(mag)))
print("Most likely Candidates are: "+str(candidates))
In [0]:
# Lets take an example of such a period!
# The 0th candidate is the top ranked candidate by the FFT
can_snr =[]
for k in range(len(candidates)):
period = candidates[k]
fold = np.zeros((period, data.shape[2]))
multiples = int(data.data.shape[0]/period)
results = np.zeros((period))
for i in range(multiples-1):
fold[:,:]=data[i*period:(i+1)*period,0,:]+ fold
results = fold.mean(axis=1)
results = results - results.min()
results = results / results.max()
can_snr.append(SNR(results))
print("Max SNR of Fold Candidates: "+ str(max(can_snr)))