In [3]:
import pyecog as pg
import os
import pandas as pd
import numpy as np
from datetime import datetime
from ipywidgets import FloatProgress # For tracking progress of power data extraction
from IPython.display import display # For tracking progress of power data extraction
###############################
### INSERT VARIABLES HERE ###
### Select NDF directory
ndf_dir = r'C:\Users\asnowball\Desktop\TESTING\NDFs'
### Select the sampling frequency in Hz
fs = 512
### Select transmitter IDs of interest (IN LIST FORMAT)
### tid_list = 'all' ... gives power outputs for all transmitters in an NDF file
tid_list = [1,2]
### Select chunk length (length in seconds of chunk over which to calculate power statistics)
### Minimum value = approx. 4 / the lower limit of the lowest frequency band
### Eg. If lowest frequency band is 1-4Hz, 4s chunk length required for reliable read-out at 1Hz
### Eg. If lowest frequency band is 0.5-1Hz, 8s chunk length required for reliable read-out at 0.5Hz
chunk_len = 4
### Select power bands of interest as list of tuples
### bands = 'full' ... outputs the full power spectral density (PSD)
bands = [(1,4),(4,8),(8,12),(12,30),(30,50),(50,70),(70,120),(120,160)]
### Select desired output format
### averaged = True ... averages power values for each NDF file
### averaged = False ... gives complete data with no averaging
averaged = True
###############################
# Power calculation function
def calculate_powerbands(flat_data, chunk_len, fs, bands):
'''
Inputs:
- flat_data: np.array, of len(arr.shape) = 1
- chunk_len: length in seconds of chunk over which to calculate bp
- fs : sampling frequency in Hz
- bands : list of tuples, containing bands of interest. e.g. [(1,4),(4,8),(8,12),(12,30),(30,50),(50,70),(70,120)]
Returns:
- numpy array, columns correspond to powerbands.
Notes:
- Band power units are "y unit"^2
- using rfft for improved efficiency - a scaling factor for power computation needs to be included
- using a hanning window, so reflecting half of the first and last chunk in
order to centre the window over the time windows of interest (and to not throw
anything away)
'''
# first reflect the first and last half chunk length so we don't lose any time
pad_dp = int((chunk_len/2) * fs)
padded_data = np.pad(flat_data, pad_width=pad_dp, mode = 'reflect')
# reshape data into array, stich together
data_arr = np.reshape(padded_data,
newshape=(int(len(padded_data)/fs/chunk_len), int(chunk_len*fs)),
order= 'C')
data_arr_stiched = np.concatenate([data_arr[:-1], data_arr[1:]], axis = 1)
# window the data with hanning window
hanning_window = np.hanning(data_arr_stiched.shape[1])
windowed_data = np.multiply(data_arr_stiched, hanning_window)
# run fft and get psd
bin_frequencies = np.fft.rfftfreq(int(chunk_len*fs)*2, 1/fs) # *2 as n.points is doubled due to stiching
A = np.abs(np.fft.rfft(windowed_data, axis = 1))
psd = 2*chunk_len*(2/.375)*(A/(fs*chunk_len*2))**2 # psd units are "y unit"^2/Hz
# A/(fs*chunk_len*2) is the normalised fft transform of windowed_data
# - rfft only returns the positive frequency amplitudes.
# To compute the power we have to sum the squares of both positive and negative complex frequency amplitudes
# .375 is the power of the Hanning window - the factor 2/.375 corrects for these two points.
# 2*chunk_len - is a factor such that the result comes as a density with units "y unit"^2/Hz and not just "y unit"^2
# this is just to make the psd usable for other purposes - in the next section bin_width*2*chunk_len equals 1.
# if full PSD output desired
if bands == 'full':
return psd
# now grab power bands from psd if desired
bin_width = np.diff(bin_frequencies[:3])[1]
pb_array = np.zeros(shape = (psd.shape[0],len(bands)))
for i,band in enumerate(bands):
lower_freq, upper_freq = band # unpack the band tuple
band_indexes = np.where(np.logical_and(bin_frequencies > lower_freq, bin_frequencies<=upper_freq))[0]
bp = np.sum(psd[:,band_indexes], axis = 1)*bin_width
pb_array[:,i] = bp
return pb_array
# Sets power band column titles
if bands == 'full':
band_list = []
for x in range(0,(chunk_len*fs)+1):
band_list.append(x/(2*chunk_len))
band_titles = ['0 Hz']
for x in range(len(band_list)-1):
band_titles.append(str(band_list[x]) + ' - ' + str(band_list[x+1]) + ' Hz')
else:
band_titles = [(str(i[0]) + ' - ' + str(i[1]) + ' Hz') for i in bands]
# Creates new folder in NDF directory for non-averaged output data
# Creates intermediary storage dictionary for averaged output data
if averaged == False:
power_dir = os.path.join(ndf_dir,'Power Outputs')
os.makedirs(power_dir)
if averaged == True:
batch_data = {}
# Tracks progress of power data extraction
progress = FloatProgress(min=0, max=len(os.listdir(ndf_dir)))
display(progress)
# Iterates through NDF directory to extract power statistics and sort into desired output format
for file in os.listdir(ndf_dir):
if file[-4:] == '.ndf':
datetime = datetime.fromtimestamp(int(file[1:11]))
ndf_file = pg.NdfFile(os.path.join(ndf_dir, file))
if tid_list == 'all':
ndf_file.load(read_ids='all', auto_glitch_removal=True, auto_resampling=True, auto_filter=True)
tids_to_load = list(ndf_file.tid_set)
else:
ndf_file.load(read_ids=tid_list, auto_glitch_removal=True, auto_resampling=True, auto_filter=True)
tids_to_load = tid_list
for tid in tids_to_load:
if averaged == True:
if tid not in batch_data.keys():
batch_data[tid] = pd.DataFrame(columns = band_titles)
raw_data = ndf_file[tid]['data']
power_data = calculate_powerbands(raw_data, chunk_len, fs, bands)
power_df = pd.DataFrame(power_data, columns=band_titles)
# Packages non-averaged data into a .csv file for output
if averaged == False:
power_df.to_csv(os.path.join(power_dir, file) + ' - TRANSMITTER {}.csv'.format(tid), index=False)
if averaged == True:
power_mean = power_df.mean()
batch_data[tid] = batch_data[tid].append(power_mean, ignore_index = True)
batch_data[tid].loc[(len(batch_data[tid]))-1, "Coastline"] = sum(abs(np.diff(raw_data))) # Add a coastline column (optional)
batch_data[tid].loc[(len(batch_data[tid]))-1, "Datetime"] = datetime # Add a datetime column (recommended)
batch_data[tid].loc[(len(batch_data[tid]))-1, "Filename"] = file # Add a filename column (recommended)
progress.value += 1 # Updates progress tracker
# Packages averaged data into an excel file for output
if averaged == True:
writer = pd.ExcelWriter(os.path.join(ndf_dir, 'Power Averages.xlsx'))
for tid in batch_data.keys():
batch_data[tid].sort_values(by = "Datetime").to_excel(writer, "Transmitter {}".format(tid), index=None)
writer.save()