This importable notebook provides the tooling necessary to handle the processing for the Gather Phases in the ETL process for the NOAA HDTA project. This tooling supports Approaches 1 and 2 using CSV files.
Each of the process phases require a dictionary to drive the workflow.
project_layout = {
'Content_Version': '',
'Daily_Input_Files': '',
'Raw_Details': '',
'Missing_Details': '',
'Station_Summary': '',
'Station_Details': '',
Process Phase | Function to run |
Phase 1 Approach 1 | noaa_run_phase1_approach1(project_layout) |
Phase 2 Approach 1 | noaa_run_phase2_approach1(project_layout) |
import os
import time
import glob
import struct
import collections
import pandas as pd
# Create a collection for returning multiple lists of tuples
approach1_bag = collections.namedtuple('GatherBag', ['raw', 'missing'])
# Historical Raw Daily Detail
raw_daily_detail_rec_template = {'StationID': "",
'Year': "",
'Month': "",
'Day': "",
'Type': "",
'FahrenheitTemp': "",
# Historical Daily Missing Record Detail
missing_detail_rec_template = {'StationID': "",
'Year': "",
'Month': "",
'Day': "",
'Type': "",
def get_filename(pathname):
'''Fetch filename portion of pathname.'''
plist = pathname.split('/')
fname, fext = os.path.splitext(plist[len(plist)-1])
return fname
def elapsed_time(secs):
'''Compute formated time stamp given seconds elapsed. '''
m, s = divmod(secs, 60)
h, m = divmod(m, 60)
et = "%d:%02d:%02d" % (h, m, s)
return et
def noaa_convert_c2f(noaa_temp):
'''Returns Fahrenheit temperature value from a NOAA temperature (tenths of degrees C) '''
celsius = int(noaa_temp)/10
fahrenheit = 9.0/5.0 * celsius + 32
return round(fahrenheit,1)
def noaa_gather_lines(lines):
'''Return dataframes for raw and missing detail from list of lines.'''
# Create list of tuples
raw_list = []
missing_list = []
for index, line in enumerate(lines):
#print("Processing line {0}.").format(index)
r = noaa_gather_daily_detail(line)
raw_list += r.raw
missing_list += r.missing
# Construct dataframes
df_raw = pd.DataFrame(raw_list)
df_missing = pd.DataFrame(missing_list)
return approach1_bag(df_raw, df_missing)
def noaa_gather_daily_detail(line):
'''Extract content from daily record, create raw and missing tuples.'''
station_time_element = struct.unpack('11s4s2s4s', line[0:21])
raw_tuple_list = []
missing_tuple_list = []
if station_time_element[3] == 'TMIN' or station_time_element[3] == 'TMAX':
values = line[21:]
day_of_month = 0
while(len(values) > 7):
day_of_month = day_of_month + 1
day_measure = struct.unpack('5ssss', values[0:8])
if day_measure[0] != '-9999':
raw_tuple = dict(raw_daily_detail_rec_template)
# Compute degrees fahrenheit
fahrenheit = noaa_convert_c2f(int(day_measure[0]))
# Construct raw detail record
raw_tuple['StationID'] = station_time_element[0]
raw_tuple['Year'] = station_time_element[1]
raw_tuple['Month']= station_time_element[2]
raw_tuple['Day'] = day_of_month
raw_tuple['Type'] = station_time_element[3]
raw_tuple['FahrenheitTemp'] = fahrenheit
# Construct missing detail record
missing_tuple = dict(missing_detail_rec_template)
missing_tuple['StationID'] = station_time_element[0]
missing_tuple['Year'] = station_time_element[1]
missing_tuple['Month']= station_time_element[2]
missing_tuple['Day'] = day_of_month
missing_tuple['Type'] = station_time_element[3]
# Adjust offest for next day
values = values[8:]
# Return new tuples
return approach1_bag(raw_tuple_list, missing_tuple_list)
def noaa_process_hcn_daily_file(fname):
'''Return dataframes for raw and missing detail from lines in file.'''
print("Extracting content from file {0}.").format(fname)
x = 0
raw_cols = ['StationID', 'Year', 'Month', 'Day', 'Type', 'FahrenheitTemp']
missing_cols = ['StationID', 'Year', 'Month', 'Day', 'Type']
# Create list of tuples
raw_list = []
missing_list = []
# Start Timer
start_time = time.time()
with open(fname,'r') as f:
lines = f.readlines()
# Changed next 2 lines only.
for line in lines:
x += 1
#print(" .... Processing line {0}.").format(x)
r = noaa_gather_daily_detail(line)
raw_list += r.raw
missing_list += r.missing
seconds = (time.time() - start_time)
print(">> Processing Complete: {0} lines of file {1}.").format(x, fname)
print(">> Elapsed file execution time {0}").format(elapsed_time(seconds))
# Capture and Sort Results in DataFrames
df_raw = pd.DataFrame(raw_list)
df_missing = pd.DataFrame(missing_list)
r = df_raw.sort(raw_cols).reindex(columns=raw_cols)
m = df_missing.sort(missing_cols).reindex(columns=missing_cols)
return approach1_bag(r, m)
def noaa_run_phase1_approach1(project_layout):
'''Process corpus of daily files and store results in CSV files.'''
if not project_layout['Daily_Input_Files']:
raise Exception("Incomplete or missing dictionary of project folder details.")
print(">> Processing Started ...")
# Start Timer
start_time = time.time()
for index, fname in enumerate(glob.glob(project_layout['Daily_Input_Files'])):
station_name = get_filename(fname)
print(">> Processing file {0}: {1}").format(index, station_name)
raw_file = os.path.join(project_layout['Raw_Details'], '', station_name + '_raw.csv')
missing_file = os.path.join(project_layout['Missing_Details'], '', station_name + '_mis.csv')
r = noaa_process_hcn_daily_file(fname)
seconds = (time.time() - start_time)
print(">> Processing Complete.")
print(">> Elapsed corpus execution time {0}").format(elapsed_time(seconds))
except Exception as e:
print(">> Processing Failed: Error {0}").format(e.message)
import os
import glob
import time
import datetime
import collections
import pandas as pd
import traceback
# Create a collection for returning multiple lists of tuples
approach2_bag = collections.namedtuple('GatherBag', ['DailySummary', 'DailyDetail'])
# Historical Daily Summary
summary_template = {'StationID': "",
'Month': "",
'Day': "",
'FirstYearOfRecord': "",
'TMin': "",
'TMinRecordYear': "",
'TMax': "",
'TMaxRecordYear': "",
'CurTMinMaxDelta': "",
'CurTMinRecordDur': "",
'CurTMaxRecordDur': "",
'MaxDurTMinRecord': "",
'MinDurTMinRecord': "",
'MaxDurTMaxRecord': "",
'MinDurTMaxRecord': "",
'TMinRecordCount': "",
'TMaxRecordCount': ""
summary_cols = ['StationID', 'Month', 'Day', 'FirstYearOfRecord',
'TMin', 'TMinRecordYear', 'TMax', 'TMaxRecordYear',
'CurTMinMaxDelta', 'CurTMinRecordDur','CurTMaxRecordDur',
'MaxDurTMinRecord', 'MinDurTMinRecord',
'MaxDurTMaxRecord', 'MinDurTMaxRecord',
'TMinRecordCount', 'TMaxRecordCount'
# Historical Daily Detail
detail_template = {'StationID': "",
'Year': "",
'Month': "",
'Day': "",
'Type': "",
'OldTemp': "",
'NewTemp': "",
'TDelta': ""
detail_cols = ['StationID', 'Year', 'Month', 'Day', 'Type',
'NewTemp', 'OldTemp', 'TDelta'
def get_filename(pathname):
'''Fetch filename portion of pathname.'''
plist = pathname.split('/')
fname, fext = os.path.splitext(plist[len(plist)-1])
return fname
def elapsed_time(secs):
'''Compute formated time stamp given seconds elapsed. '''
m, s = divmod(secs, 60)
h, m = divmod(m, 60)
et = "%d:%02d:%02d" % (h, m, s)
return et
def get_key_list(hdf5file, type='raw_detail'):
'''Return a list of keys for requested type from specified HDF file.'''
print("Fetching keys for type = {0}").format(type)
keylist = []
store = None
store = pd.HDFStore(hdf5file,'r')
h5keys = store.keys()
for k in h5keys:
if k.find(type) > -1:
if store:
return keylist
def cleans_invalid_days(df):
'''Return a dataframe void of invalid days'''
ShortMths = {4,6,9,11}
df_clean = df.query('(((Month not in @ShortMths) & (Day != 31)) and ((Month != 2) or (Day < 30)) )')
return df_clean
def noaa_process_phase2_records(raw_tuples):
'''Compute formated time stamp given seconds elapsed. '''
# Sample Tuple:
# (0, 'USC00011084', '1926', '01', 21, 'TMAX', 73.400000000000006)
# Create several 12x31 matrices to store daily detail per metric of interest.
fyr_online_for_day = [[9999 for x in range(32)] for x in range(13)]
tmin_history = [[-99 for x in range(32)] for x in range(13)]
tmax_history = [[-99 for x in range(32)] for x in range(13)]
tminyr_history = [[-99 for x in range(32)] for x in range(13)]
tmaxyr_history = [[-99 for x in range(32)] for x in range(13)]
tminrc_history = [[0 for x in range(32)] for x in range(13)]
tmaxrc_history = [[0 for x in range(32)] for x in range(13)]
tmax_max_life = [[0 for x in range(32)] for x in range(13)]
tmax_min_life = [[9999 for x in range(32)] for x in range(13)]
tmin_max_life = [[0 for x in range(32)] for x in range(13)]
tmin_min_life = [[9999 for x in range(32)] for x in range(13)]
# Capture Station ID (all raw-tuples are per station)
station_ID = raw_tuples[0][1]
# Process each raw daily tuple: create daily retail tuples while updating matrices.
detail_list = []
for t in raw_tuples:
detail_row = dict(detail_template)
detail_row['StationID'] = t[1]
detail_row['Year'] = t[2]
detail_row['Month'] = t[3]
detail_row['Day'] = str(t[4])
month = int(t[3])-1
day = t[4]-1
# For this day, what was the first year in which this station was operational?
if fyr_online_for_day[month][day] > int(t[2]):
fyr_online_for_day[month][day] = int(t[2])
# Handle TMAX
if (t[5] == 'TMAX'):
# Handle TMAX for first record
if (tmax_history[month][day] == -99):
# Handle TMAX for first
detail_row['Type'] = 'TMAX'
detail_row['OldTemp'] = round(t[6],1)
detail_row['NewTemp'] = round(t[6],1)
detail_row['TDelta'] = 0
tmax_history[month][day] = round(t[6],1)
tmaxyr_history[month][day] = int(t[2])
tmaxrc_history[month][day] = 1
tmax_min_life[month][day] = 0
tmax_max_life[month][day] = 0
# Add new daily detail row
# Handle TMAX for new daily record
elif (round(t[6],1) > tmax_history[month][day]):
detail_row['Type'] = 'TMAX'
detail_row['OldTemp'] = tmax_history[month][day]
detail_row['NewTemp'] = round(t[6],1)
detail_row['TDelta'] = round(t[6],1) - tmax_history[month][day]
current_tmin_duration = int(t[2]) - tminyr_history[month][day]
current_tmax_duration = int(t[2]) - tmaxyr_history[month][day]
if tmin_max_life[month][day] == 0:
tmin_max_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
if tmax_max_life[month][day] == 0:
tmax_max_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
if current_tmax_duration > tmax_max_life[month][day]:
tmax_max_life[month][day] = current_tmax_duration
if current_tmin_duration < tmin_max_life[month][day]:
tmin_max_life[month][day] = current_tmax_duration
tmax_history[month][day] = round(t[6],1)
tmaxyr_history[month][day] = int(t[2])
tmaxrc_history[month][day] += 1
# Add new daily detail row
if (t[5] == 'TMIN'):
# Handle TMIN for first record
if (tmin_history[month][day] == -99):
# Handle TMIN for first
detail_row['Type'] = 'TMIN'
detail_row['OldTemp'] = round(t[6],1)
detail_row['NewTemp'] = round(t[6],1)
detail_row['TDelta'] = 0
tmin_history[month][day] = round(t[6],1)
tminyr_history[month][day] = int(t[2])
tminrc_history[month][day] = 1
tmin_min_life[month][day] = 0
tmin_max_life[month][day] = 0
# Add new daily detail row
# Handle TMIN for new daily record
elif (round(t[6],1) < tmin_history[month][day]):
detail_row['Type'] = 'TMIN'
detail_row['OldTemp'] = tmin_history[month][day]
detail_row['NewTemp'] = round(t[6],1)
detail_row['TDelta'] = tmin_history[month][day] - round(t[6],1)
current_tmin_duration = int(t[2]) - tminyr_history[month][day]
current_tmax_duration = int(t[2]) - tmaxyr_history[month][day]
if tmax_min_life[month][day] == 0:
tmax_min_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
if tmin_min_life[month][day] == 0:
tmin_min_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
if current_tmax_duration > tmax_min_life[month][day]:
tmax_min_life[month][day] = current_tmin_duration
if current_tmin_duration < tmin_min_life[month][day]:
tmin_min_life[month][day] = current_tmin_duration
tmin_history[month][day] = round(t[6],1)
tminyr_history[month][day] = int(t[2])
tminrc_history[month][day] += 1
# Add new daily detail row
# Create a daily summary record for each day of the year using our matrices.
summary_list = []
now =
for mth in xrange(1,13):
for day in xrange(1,32):
m = mth-1
d= day-1
summary_row = dict(summary_template)
summary_row['StationID'] = station_ID
summary_row['Month'] = mth
summary_row['Day'] = day
summary_row['FirstYearOfRecord'] = fyr_online_for_day[m][d]
summary_row['TMin'] = tmin_history[m][d]
summary_row['TMinRecordYear'] = tminyr_history[m][d]
summary_row['TMax'] = tmax_history[m][d]
summary_row['TMaxRecordYear'] = tmaxyr_history[m][d]
summary_row['CurTMinMaxDelta'] = summary_row['TMax'] - summary_row['TMin']
summary_row['CurTMinRecordDur'] = int(now.year) - summary_row['TMinRecordYear']
summary_row['CurTMaxRecordDur'] = int(now.year) - summary_row['TMaxRecordYear']
summary_row['MaxDurTMinRecord'] = tmax_min_life[m][d] # Can not explain
summary_row['MinDurTMinRecord'] = tmin_min_life[m][d]
summary_row['MaxDurTMaxRecord'] = tmax_max_life[m][d]
summary_row['MinDurTMaxRecord'] = tmin_max_life[m][d] # Can not explain
summary_row['TMinRecordCount'] = tminrc_history[m][d]
summary_row['TMaxRecordCount'] = tmaxrc_history[m][d]
# Add new daily summary row
return approach2_bag(summary_list, detail_list)
def noaa_run_phase2_approach1(project_layout,create_details=False):
'''Parse raw CVS files to create derived datasets.'''
summary_list = []
detail_list = []
# Start Key Processing Timer
start_time = time.time()
raw_files = os.path.join(project_layout['Raw_Details'], '', '*_raw.csv')
for index, fname in enumerate(glob.glob(raw_files)):
f = get_filename(fname).split('_')[0]
print("Processing dataset {0}: {1}").format(index,f)
summary_file = os.path.join(project_layout['Station_Summary'], '', f + '_sum.csv')
detail_file = os.path.join(project_layout['Station_Details'], '', f + '_std.csv')
dataset = pd.DataFrame.from_csv(fname)
raw_tuples = list(dataset.itertuples())
r = noaa_process_phase2_records(raw_tuples)
df_summary = pd.DataFrame(r.DailySummary).sort(summary_cols).reindex(columns=summary_cols)
df_cleaned_summary = cleans_invalid_days(df_summary)
df_detail = pd.DataFrame(r.DailyDetail).sort(detail_cols).reindex(columns=detail_cols)
df_cleaned_detail = cleans_invalid_days(df_detail)
seconds = (time.time() - start_time)
print(">> Processing Complete.")
print(">> Elapsed corpus execution time {0}").format(elapsed_time(seconds))
except Exception as e:
var = traceback.format_exc()
print var
print(">> Processing Failed: Error {0}").format(e.message)
Takes a dictionary of project folder details to drive the processing of Gather Phase 2 Approach 1 using CSV files.
with create_details=True
. You will need additional free space to support this feature. Estimated requirement: **150MB**
