This importable notebook provides the tooling necessary to handle the processing for the Gather Phases in the ETL process for the NOAA HDTA project. This tooling supports Approaches 1 and 2 using CSV files.
Each of the process phases require a dictionary to drive the workflow.
project_layout = { 
    'Content_Version': '',
    'Daily_Input_Files': '',
    'Raw_Details': '',
    'Missing_Details': '',
    'Station_Summary': '', 
    'Station_Details': '',
    }
| Process Phase | Function to run | 
|---|---|
| Phase 1 Approach 1 | noaa_run_phase1_approach1(project_layout) | 
| Phase 2 Approach 1 | noaa_run_phase2_approach1(project_layout) | 
In [8]:
    
# <help>
    
In [9]:
    
# <api>
import os
import time
import glob
import struct
import collections
import pandas as pd
# Create a collection for returning multiple lists of tuples
approach1_bag = collections.namedtuple('GatherBag', ['raw', 'missing'])
# Historical Raw Daily Detail
raw_daily_detail_rec_template = {'StationID': "",
                                    'Year': "",
                                    'Month': "",
                                    'Day': "",
                                    'Type': "",
                                    'FahrenheitTemp': "",
                                }
# Historical Daily Missing Record Detail
missing_detail_rec_template = {'StationID': "",
                                'Year': "",
                                'Month': "",
                                'Day': "",
                                'Type': "",
                               }
def get_filename(pathname):
    '''Fetch filename portion of pathname.'''
    plist = pathname.split('/')
    fname, fext = os.path.splitext(plist[len(plist)-1])
    return fname
def elapsed_time(secs):
    '''Compute formated time stamp given seconds elapsed. '''
    m, s = divmod(secs, 60)
    h, m = divmod(m, 60)
    et = "%d:%02d:%02d" % (h, m, s)
    return et
def noaa_convert_c2f(noaa_temp):
    '''Returns Fahrenheit temperature value from a NOAA temperature (tenths of degrees C) '''
    celsius = int(noaa_temp)/10
    fahrenheit = 9.0/5.0 * celsius + 32
    return round(fahrenheit,1)
def noaa_gather_lines(lines):
    '''Return dataframes for raw and missing detail from list of lines.'''
    # Create list of tuples 
    raw_list = []
    missing_list = []
    for index, line in enumerate(lines):
        #print("Processing line {0}.").format(index)
        r = noaa_gather_daily_detail(line)
        raw_list += r.raw
        missing_list += r.missing
    # Construct dataframes
    df_raw = pd.DataFrame(raw_list)
    df_missing = pd.DataFrame(missing_list)
    return approach1_bag(df_raw, df_missing)
def noaa_gather_daily_detail(line):
    '''Extract content from daily record, create raw and missing tuples.'''
    station_time_element = struct.unpack('11s4s2s4s', line[0:21])
    raw_tuple_list = []
    missing_tuple_list = []
    if station_time_element[3] == 'TMIN' or station_time_element[3] == 'TMAX':
        values = line[21:]
        day_of_month = 0
        while(len(values) > 7):
            day_of_month = day_of_month + 1
            day_measure = struct.unpack('5ssss', values[0:8])
            if day_measure[0] != '-9999':
                raw_tuple = dict(raw_daily_detail_rec_template)
                # Compute degrees fahrenheit
                fahrenheit = noaa_convert_c2f(int(day_measure[0]))
                # Construct raw detail record
                raw_tuple['StationID'] = station_time_element[0]
                raw_tuple['Year'] = station_time_element[1]
                raw_tuple['Month']= station_time_element[2]
                raw_tuple['Day'] = day_of_month
                raw_tuple['Type'] = station_time_element[3]
                raw_tuple['FahrenheitTemp'] = fahrenheit
                raw_tuple_list.append(raw_tuple)
            else:
                # Construct missing detail record
                missing_tuple = dict(missing_detail_rec_template)
                missing_tuple['StationID'] = station_time_element[0]
                missing_tuple['Year'] = station_time_element[1]
                missing_tuple['Month']= station_time_element[2]
                missing_tuple['Day'] = day_of_month
                missing_tuple['Type'] = station_time_element[3]
                missing_tuple_list.append(missing_tuple)
            # Adjust offest for next day
            values = values[8:]    
    # Return new tuples
    return approach1_bag(raw_tuple_list, missing_tuple_list)    
def noaa_process_hcn_daily_file(fname):
    '''Return dataframes for raw and missing detail from lines in file.'''
    print("Extracting content from file {0}.").format(fname)
    x = 0
    raw_cols = ['StationID', 'Year', 'Month', 'Day', 'Type', 'FahrenheitTemp']
    missing_cols = ['StationID', 'Year', 'Month', 'Day', 'Type']
    # Create list of tuples 
    raw_list = []
    missing_list = []
    # Start Timer
    start_time = time.time()
    with open(fname,'r') as f:
        lines = f.readlines()
        # Changed next 2 lines only.
        for line in lines:
            x += 1
            #print("    .... Processing line {0}.").format(x)
            r = noaa_gather_daily_detail(line)
            raw_list += r.raw
            missing_list += r.missing
    f.close() 
    seconds = (time.time() - start_time)
    print(">> Processing Complete: {0} lines of file {1}.").format(x, fname)
    print(">>   Elapsed file execution time {0}").format(elapsed_time(seconds))
    # Capture and Sort Results in DataFrames
    df_raw = pd.DataFrame(raw_list)
    df_missing = pd.DataFrame(missing_list)
    r = df_raw.sort(raw_cols).reindex(columns=raw_cols)
    m = df_missing.sort(missing_cols).reindex(columns=missing_cols)
    return approach1_bag(r, m)
def noaa_run_phase1_approach1(project_layout):
    '''Process corpus of daily files and store results in CSV files.'''
    try:
        if not project_layout['Daily_Input_Files']:
            raise Exception("Incomplete or missing dictionary of project folder details.")
        print(">> Processing Started ...")
        # Start Timer
        start_time = time.time()
        for index, fname in enumerate(glob.glob(project_layout['Daily_Input_Files'])):
            station_name = get_filename(fname)
            print(">> Processing file {0}: {1}").format(index, station_name)
            raw_file = os.path.join(project_layout['Raw_Details'], '', station_name + '_raw.csv')
            missing_file = os.path.join(project_layout['Missing_Details'], '', station_name + '_mis.csv')
            r = noaa_process_hcn_daily_file(fname)
            r.raw.to_csv(raw_file)
            r.missing.to_csv(missing_file)
        seconds = (time.time() - start_time)
        print(">>      Processing Complete.")
        print(">>   Elapsed corpus execution time {0}").format(elapsed_time(seconds))
    except Exception as e:
        print(">> Processing Failed: Error {0}").format(e.message)
    
In [ ]:
    
# <help:noaa_run_phase1_approach1>
project_layout = { 
    'Content_Version': '',
    'Daily_Input_Files': '',
    'Raw_Details': '',
    'Missing_Details': '',
    'Station_Summary': '', 
    'Station_Details': '',
    }
noaa_run_phase1_approach1(project_layout)
    
In [1]:
    
# <api>
import os
import glob
import time
import datetime
import collections
import pandas as pd
import traceback
# Create a collection for returning multiple lists of tuples
approach2_bag = collections.namedtuple('GatherBag', ['DailySummary', 'DailyDetail'])
# Historical Daily Summary
summary_template = {'StationID': "", 
                    'Month': "", 
                    'Day': "",
                    'FirstYearOfRecord': "", 
                    'TMin': "", 
                    'TMinRecordYear': "", 
                    'TMax': "", 
                    'TMaxRecordYear': "", 
                    'CurTMinMaxDelta': "", 
                    'CurTMinRecordDur': "", 
                    'CurTMaxRecordDur': "", 
                    'MaxDurTMinRecord': "", 
                    'MinDurTMinRecord': "", 
                    'MaxDurTMaxRecord': "", 
                    'MinDurTMaxRecord': "", 
                    'TMinRecordCount': "", 
                    'TMaxRecordCount': "" 
                    }
summary_cols = ['StationID', 'Month', 'Day', 'FirstYearOfRecord', 
                'TMin', 'TMinRecordYear', 'TMax', 'TMaxRecordYear',
                'CurTMinMaxDelta', 'CurTMinRecordDur','CurTMaxRecordDur',
                'MaxDurTMinRecord', 'MinDurTMinRecord',
                'MaxDurTMaxRecord', 'MinDurTMaxRecord',
                'TMinRecordCount', 'TMaxRecordCount'            
                ]
# Historical Daily Detail
detail_template = {'StationID': "",
                    'Year': "",
                    'Month': "",
                    'Day': "",
                    'Type': "",
                    'OldTemp': "",
                    'NewTemp': "",
                    'TDelta': ""
                    }
detail_cols = ['StationID', 'Year', 'Month', 'Day', 'Type', 
               'NewTemp', 'OldTemp', 'TDelta'
              ]
def get_filename(pathname):
    '''Fetch filename portion of pathname.'''
    plist = pathname.split('/')
    fname, fext = os.path.splitext(plist[len(plist)-1])
    return fname
def elapsed_time(secs):
    '''Compute formated time stamp given seconds elapsed. '''
    m, s = divmod(secs, 60)
    h, m = divmod(m, 60)
    et = "%d:%02d:%02d" % (h, m, s)
    return et
def get_key_list(hdf5file, type='raw_detail'):
    '''Return a list of keys for requested type from specified HDF file.'''
    print("Fetching keys for type = {0}").format(type)
    keylist = []
    store = None
    try:
        store = pd.HDFStore(hdf5file,'r')
        h5keys = store.keys()
        store.close()
        for k in h5keys:
            if k.find(type) > -1:
                keylist.append(k)
    except:
        if store:
            store.close()
        raise
    return keylist
def cleans_invalid_days(df):
    '''Return a dataframe void of invalid days'''
    ShortMths = {4,6,9,11}
    df_clean = df.query('(((Month not in @ShortMths) & (Day != 31)) and ((Month != 2) or (Day < 30)) )')
    return df_clean
def noaa_process_phase2_records(raw_tuples):
    '''Compute formated time stamp given seconds elapsed. '''
    # Sample Tuple:
    # (0, 'USC00011084', '1926', '01', 21, 'TMAX', 73.400000000000006)
    #
    # Create several 12x31 matrices to store daily detail per metric of interest.
    fyr_online_for_day = [[9999 for x in range(32)] for x in range(13)]  
    tmin_history = [[-99 for x in range(32)] for x in range(13)] 
    tmax_history = [[-99 for x in range(32)] for x in range(13)] 
    tminyr_history = [[-99 for x in range(32)] for x in range(13)] 
    tmaxyr_history = [[-99 for x in range(32)] for x in range(13)] 
    tminrc_history = [[0 for x in range(32)] for x in range(13)]  
    tmaxrc_history = [[0 for x in range(32)] for x in range(13)]  
    tmax_max_life = [[0 for x in range(32)] for x in range(13)] 
    tmax_min_life = [[9999 for x in range(32)] for x in range(13)] 
    tmin_max_life = [[0 for x in range(32)] for x in range(13)]  
    tmin_min_life = [[9999 for x in range(32)] for x in range(13)]   
    # Capture Station ID (all raw-tuples are per station)
    station_ID  = raw_tuples[0][1]
    # Process each raw daily tuple: create daily retail tuples while updating matrices.
    detail_list = []
    for t in raw_tuples:
        detail_row = dict(detail_template)
        detail_row['StationID'] = t[1]
        detail_row['Year'] = t[2]
        detail_row['Month'] = t[3]
        detail_row['Day'] = str(t[4])
        month = int(t[3])-1
        day = t[4]-1
        # For this day, what was the first year in which this station was operational?
        if fyr_online_for_day[month][day] > int(t[2]):
            fyr_online_for_day[month][day] = int(t[2])
        # Handle TMAX
        if (t[5] == 'TMAX'):
            # Handle TMAX for first record
            if (tmax_history[month][day] == -99):
                # Handle TMAX for first 
                detail_row['Type'] = 'TMAX'
                detail_row['OldTemp'] = round(t[6],1)
                detail_row['NewTemp'] = round(t[6],1)
                detail_row['TDelta'] = 0
                tmax_history[month][day] = round(t[6],1)
                tmaxyr_history[month][day] = int(t[2])
                tmaxrc_history[month][day] = 1
                tmax_min_life[month][day] = 0
                tmax_max_life[month][day] = 0
                # Add new daily detail row
                detail_list.append(detail_row)
            # Handle TMAX for new daily record
            elif (round(t[6],1) > tmax_history[month][day]):
                detail_row['Type'] = 'TMAX'
                detail_row['OldTemp'] = tmax_history[month][day]
                detail_row['NewTemp'] = round(t[6],1)
                detail_row['TDelta'] = round(t[6],1) - tmax_history[month][day]
                current_tmin_duration = int(t[2]) - tminyr_history[month][day]
                current_tmax_duration = int(t[2]) - tmaxyr_history[month][day]
                if tmin_max_life[month][day] == 0:
                    tmin_max_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
                if tmax_max_life[month][day] == 0:
                    tmax_max_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
                if current_tmax_duration > tmax_max_life[month][day]:
                    tmax_max_life[month][day] = current_tmax_duration
                if current_tmin_duration < tmin_max_life[month][day]:
                    tmin_max_life[month][day] = current_tmax_duration
                tmax_history[month][day] = round(t[6],1)
                tmaxyr_history[month][day] = int(t[2])
                tmaxrc_history[month][day] += 1
                # Add new daily detail row
                detail_list.append(detail_row)
        if (t[5] == 'TMIN'):
            # Handle TMIN for first record
            if (tmin_history[month][day] == -99):
                # Handle TMIN for first 
                detail_row['Type'] = 'TMIN'
                detail_row['OldTemp'] = round(t[6],1)
                detail_row['NewTemp'] = round(t[6],1)
                detail_row['TDelta'] = 0
                tmin_history[month][day] = round(t[6],1)
                tminyr_history[month][day] = int(t[2])
                tminrc_history[month][day] = 1
                tmin_min_life[month][day] = 0
                tmin_max_life[month][day] = 0
                # Add new daily detail row
                detail_list.append(detail_row)
            # Handle TMIN for new daily record
            elif (round(t[6],1) < tmin_history[month][day]):
                detail_row['Type'] = 'TMIN'
                detail_row['OldTemp'] = tmin_history[month][day]
                detail_row['NewTemp'] = round(t[6],1)
                detail_row['TDelta'] = tmin_history[month][day] - round(t[6],1)
                current_tmin_duration = int(t[2]) - tminyr_history[month][day]
                current_tmax_duration = int(t[2]) - tmaxyr_history[month][day]
                if tmax_min_life[month][day] == 0:
                    tmax_min_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
                if tmin_min_life[month][day] == 0:
                    tmin_min_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
                if current_tmax_duration > tmax_min_life[month][day]:
                    tmax_min_life[month][day] = current_tmin_duration
                if current_tmin_duration < tmin_min_life[month][day]:
                    tmin_min_life[month][day] = current_tmin_duration
                tmin_history[month][day] = round(t[6],1)
                tminyr_history[month][day] = int(t[2])
                tminrc_history[month][day] += 1
                # Add new daily detail row
                detail_list.append(detail_row)
    # Create a daily summary record for each day of the year using our matrices.
    summary_list = []
    now = datetime.datetime.now()
    for mth in xrange(1,13):
        for day in xrange(1,32):
            m = mth-1
            d= day-1
            summary_row = dict(summary_template)
            summary_row['StationID'] = station_ID
            summary_row['Month'] = mth
            summary_row['Day'] = day
            summary_row['FirstYearOfRecord'] = fyr_online_for_day[m][d]
            summary_row['TMin'] = tmin_history[m][d]
            summary_row['TMinRecordYear'] = tminyr_history[m][d]
            summary_row['TMax'] = tmax_history[m][d] 
            summary_row['TMaxRecordYear'] = tmaxyr_history[m][d]
            summary_row['CurTMinMaxDelta'] = summary_row['TMax'] - summary_row['TMin']
            summary_row['CurTMinRecordDur'] = int(now.year) - summary_row['TMinRecordYear']
            summary_row['CurTMaxRecordDur'] = int(now.year) - summary_row['TMaxRecordYear']
            summary_row['MaxDurTMinRecord'] = tmax_min_life[m][d] # Can not explain
            summary_row['MinDurTMinRecord'] = tmin_min_life[m][d]
            summary_row['MaxDurTMaxRecord'] = tmax_max_life[m][d]
            summary_row['MinDurTMaxRecord'] = tmin_max_life[m][d] # Can not explain       
            summary_row['TMinRecordCount'] = tminrc_history[m][d]
            summary_row['TMaxRecordCount'] = tmaxrc_history[m][d]
            # Add new daily summary row
            summary_list.append(summary_row)
    return approach2_bag(summary_list, detail_list)
def noaa_run_phase2_approach1(project_layout,create_details=False):
    '''Parse raw CVS files to create derived datasets.'''
    summary_list = []
    detail_list = []
    try:                
        # Start Key Processing Timer
        start_time = time.time()
        raw_files = os.path.join(project_layout['Raw_Details'], '', '*_raw.csv')
        for index, fname in enumerate(glob.glob(raw_files)):
            f = get_filename(fname).split('_')[0]
            print("Processing dataset {0}: {1}").format(index,f)
            summary_file = os.path.join(project_layout['Station_Summary'], '', f + '_sum.csv')
            detail_file = os.path.join(project_layout['Station_Details'], '', f + '_std.csv')
            dataset = pd.DataFrame.from_csv(fname)
            raw_tuples = list(dataset.itertuples())
            r = noaa_process_phase2_records(raw_tuples)
            df_summary = pd.DataFrame(r.DailySummary).sort(summary_cols).reindex(columns=summary_cols)
            df_cleaned_summary = cleans_invalid_days(df_summary)
            df_cleaned_summary.to_csv(summary_file)
            df_detail = pd.DataFrame(r.DailyDetail).sort(detail_cols).reindex(columns=detail_cols)
            df_cleaned_detail = cleans_invalid_days(df_detail)
            df_cleaned_detail.to_csv(detail_file)
        seconds = (time.time() - start_time)
        print(">> Processing Complete.")
        print(">>   Elapsed corpus execution time {0}").format(elapsed_time(seconds))   
    except Exception as e:
        var = traceback.format_exc()
        print var
        print(">> Processing Failed: Error {0}").format(e.message)
    
Takes a dictionary of project folder details to drive the processing of Gather Phase 2 Approach 1 using CSV files.
noaa_run_phase2_approach2() with create_details=True. You will need additional free space to support this feature. Estimated requirement: **150MB**
In [ ]:
    
# <help:noaa_run_phase2_approach1>
project_layout = { 
    'Content_Version': '',
    'Daily_Input_Files': '',
    'Raw_Details': '',
    'Missing_Details': '',
    'Station_Summary': '', 
    'Station_Details': '',
    }
noaa_run_phase2_approach1(project_layout)