Tools for CSV FIle Processing

Gather Phase Tools

This importable notebook provides the tooling necessary to handle the processing for the Gather Phases in the ETL process for the NOAA HDTA project. This tooling supports Approaches 1 and 2 using CSV files.

Each of the process phases require a dictionary to drive the workflow.

project_layout = { 
    'Content_Version': '',
    'Daily_Input_Files': '',
    'Raw_Details': '',
    'Missing_Details': '',
    'Station_Summary': '', 
    'Station_Details': '',
    }
Process Phase Function to run
Phase 1 Approach 1 noaa_run_phase1_approach1(project_layout)
Phase 2 Approach 1 noaa_run_phase2_approach1(project_layout)

In [8]:
# <help>

In [9]:
# <api>

import os
import time
import glob
import struct
import collections
import pandas as pd

# Create a collection for returning multiple lists of tuples
approach1_bag = collections.namedtuple('GatherBag', ['raw', 'missing'])

# Historical Raw Daily Detail
raw_daily_detail_rec_template = {'StationID': "",
                                    'Year': "",
                                    'Month': "",
                                    'Day': "",
                                    'Type': "",
                                    'FahrenheitTemp': "",
                                }

# Historical Daily Missing Record Detail
missing_detail_rec_template = {'StationID': "",
                                'Year': "",
                                'Month': "",
                                'Day': "",
                                'Type': "",
                               }

def get_filename(pathname):
    '''Fetch filename portion of pathname.'''
    plist = pathname.split('/')
    fname, fext = os.path.splitext(plist[len(plist)-1])
    return fname

def elapsed_time(secs):
    '''Compute formated time stamp given seconds elapsed. '''
    m, s = divmod(secs, 60)
    h, m = divmod(m, 60)
    et = "%d:%02d:%02d" % (h, m, s)
    return et

def noaa_convert_c2f(noaa_temp):
    '''Returns Fahrenheit temperature value from a NOAA temperature (tenths of degrees C) '''
    celsius = int(noaa_temp)/10
    fahrenheit = 9.0/5.0 * celsius + 32
    return round(fahrenheit,1)

def noaa_gather_lines(lines):
    '''Return dataframes for raw and missing detail from list of lines.'''
    # Create list of tuples 
    raw_list = []
    missing_list = []
    for index, line in enumerate(lines):
        #print("Processing line {0}.").format(index)
        r = noaa_gather_daily_detail(line)
        raw_list += r.raw
        missing_list += r.missing
    # Construct dataframes
    df_raw = pd.DataFrame(raw_list)
    df_missing = pd.DataFrame(missing_list)
    return approach1_bag(df_raw, df_missing)

def noaa_gather_daily_detail(line):
    '''Extract content from daily record, create raw and missing tuples.'''
    station_time_element = struct.unpack('11s4s2s4s', line[0:21])
    raw_tuple_list = []
    missing_tuple_list = []
    if station_time_element[3] == 'TMIN' or station_time_element[3] == 'TMAX':
        values = line[21:]
        day_of_month = 0
        while(len(values) > 7):
            day_of_month = day_of_month + 1
            day_measure = struct.unpack('5ssss', values[0:8])
            if day_measure[0] != '-9999':
                raw_tuple = dict(raw_daily_detail_rec_template)
                # Compute degrees fahrenheit
                fahrenheit = noaa_convert_c2f(int(day_measure[0]))
                # Construct raw detail record
                raw_tuple['StationID'] = station_time_element[0]
                raw_tuple['Year'] = station_time_element[1]
                raw_tuple['Month']= station_time_element[2]
                raw_tuple['Day'] = day_of_month
                raw_tuple['Type'] = station_time_element[3]
                raw_tuple['FahrenheitTemp'] = fahrenheit
                raw_tuple_list.append(raw_tuple)
            else:
                # Construct missing detail record
                missing_tuple = dict(missing_detail_rec_template)
                missing_tuple['StationID'] = station_time_element[0]
                missing_tuple['Year'] = station_time_element[1]
                missing_tuple['Month']= station_time_element[2]
                missing_tuple['Day'] = day_of_month
                missing_tuple['Type'] = station_time_element[3]
                missing_tuple_list.append(missing_tuple)
            # Adjust offest for next day
            values = values[8:]    
    # Return new tuples
    return approach1_bag(raw_tuple_list, missing_tuple_list)    

def noaa_process_hcn_daily_file(fname):
    '''Return dataframes for raw and missing detail from lines in file.'''
    print("Extracting content from file {0}.").format(fname)
    x = 0
    raw_cols = ['StationID', 'Year', 'Month', 'Day', 'Type', 'FahrenheitTemp']
    missing_cols = ['StationID', 'Year', 'Month', 'Day', 'Type']
    # Create list of tuples 
    raw_list = []
    missing_list = []
    # Start Timer
    start_time = time.time()
    with open(fname,'r') as f:
        lines = f.readlines()
        # Changed next 2 lines only.
        for line in lines:
            x += 1
            #print("    .... Processing line {0}.").format(x)
            r = noaa_gather_daily_detail(line)
            raw_list += r.raw
            missing_list += r.missing
    f.close() 
    seconds = (time.time() - start_time)
    print(">> Processing Complete: {0} lines of file {1}.").format(x, fname)
    print(">>   Elapsed file execution time {0}").format(elapsed_time(seconds))
    # Capture and Sort Results in DataFrames
    df_raw = pd.DataFrame(raw_list)
    df_missing = pd.DataFrame(missing_list)
    r = df_raw.sort(raw_cols).reindex(columns=raw_cols)
    m = df_missing.sort(missing_cols).reindex(columns=missing_cols)
    return approach1_bag(r, m)

def noaa_run_phase1_approach1(project_layout):
    '''Process corpus of daily files and store results in CSV files.'''
    try:
        if not project_layout['Daily_Input_Files']:
            raise Exception("Incomplete or missing dictionary of project folder details.")
        print(">> Processing Started ...")
        # Start Timer
        start_time = time.time()
        for index, fname in enumerate(glob.glob(project_layout['Daily_Input_Files'])):
            station_name = get_filename(fname)
            print(">> Processing file {0}: {1}").format(index, station_name)
            raw_file = os.path.join(project_layout['Raw_Details'], '', station_name + '_raw.csv')
            missing_file = os.path.join(project_layout['Missing_Details'], '', station_name + '_mis.csv')
            r = noaa_process_hcn_daily_file(fname)
            r.raw.to_csv(raw_file)
            r.missing.to_csv(missing_file)
        seconds = (time.time() - start_time)
        print(">>      Processing Complete.")
        print(">>   Elapsed corpus execution time {0}").format(elapsed_time(seconds))
    except Exception as e:
        print(">> Processing Failed: Error {0}").format(e.message)

noaa_run_phase1_approach1

Takes a dictionary of project folder details to drive the processing of Gather Phase 1 Approach 1 using CSV files.


In [ ]:
# <help:noaa_run_phase1_approach1>
project_layout = { 
    'Content_Version': '',
    'Daily_Input_Files': '',
    'Raw_Details': '',
    'Missing_Details': '',
    'Station_Summary': '', 
    'Station_Details': '',
    }
noaa_run_phase1_approach1(project_layout)

In [1]:
# <api>

import os
import glob
import time
import datetime
import collections
import pandas as pd
import traceback

# Create a collection for returning multiple lists of tuples
approach2_bag = collections.namedtuple('GatherBag', ['DailySummary', 'DailyDetail'])

# Historical Daily Summary
summary_template = {'StationID': "", 
                    'Month': "", 
                    'Day': "",
                    'FirstYearOfRecord': "", 
                    'TMin': "", 
                    'TMinRecordYear': "", 
                    'TMax': "", 
                    'TMaxRecordYear': "", 
                    'CurTMinMaxDelta': "", 
                    'CurTMinRecordDur': "", 
                    'CurTMaxRecordDur': "", 
                    'MaxDurTMinRecord': "", 
                    'MinDurTMinRecord': "", 
                    'MaxDurTMaxRecord': "", 
                    'MinDurTMaxRecord': "", 
                    'TMinRecordCount': "", 
                    'TMaxRecordCount': "" 
                    }

summary_cols = ['StationID', 'Month', 'Day', 'FirstYearOfRecord', 
                'TMin', 'TMinRecordYear', 'TMax', 'TMaxRecordYear',
                'CurTMinMaxDelta', 'CurTMinRecordDur','CurTMaxRecordDur',
                'MaxDurTMinRecord', 'MinDurTMinRecord',
                'MaxDurTMaxRecord', 'MinDurTMaxRecord',
                'TMinRecordCount', 'TMaxRecordCount'            
                ]

# Historical Daily Detail
detail_template = {'StationID': "",
                    'Year': "",
                    'Month': "",
                    'Day': "",
                    'Type': "",
                    'OldTemp': "",
                    'NewTemp': "",
                    'TDelta': ""
                    }

detail_cols = ['StationID', 'Year', 'Month', 'Day', 'Type', 
               'NewTemp', 'OldTemp', 'TDelta'
              ]

def get_filename(pathname):
    '''Fetch filename portion of pathname.'''
    plist = pathname.split('/')
    fname, fext = os.path.splitext(plist[len(plist)-1])
    return fname

def elapsed_time(secs):
    '''Compute formated time stamp given seconds elapsed. '''
    m, s = divmod(secs, 60)
    h, m = divmod(m, 60)
    et = "%d:%02d:%02d" % (h, m, s)
    return et

def get_key_list(hdf5file, type='raw_detail'):
    '''Return a list of keys for requested type from specified HDF file.'''
    print("Fetching keys for type = {0}").format(type)
    keylist = []
    store = None
    try:
        store = pd.HDFStore(hdf5file,'r')
        h5keys = store.keys()
        store.close()
        for k in h5keys:
            if k.find(type) > -1:
                keylist.append(k)
    except:
        if store:
            store.close()
        raise
    return keylist

def cleans_invalid_days(df):
    '''Return a dataframe void of invalid days'''
    ShortMths = {4,6,9,11}
    df_clean = df.query('(((Month not in @ShortMths) & (Day != 31)) and ((Month != 2) or (Day < 30)) )')
    return df_clean

def noaa_process_phase2_records(raw_tuples):
    '''Compute formated time stamp given seconds elapsed. '''
    # Sample Tuple:
    # (0, 'USC00011084', '1926', '01', 21, 'TMAX', 73.400000000000006)
    #
    # Create several 12x31 matrices to store daily detail per metric of interest.
    fyr_online_for_day = [[9999 for x in range(32)] for x in range(13)]  
    tmin_history = [[-99 for x in range(32)] for x in range(13)] 
    tmax_history = [[-99 for x in range(32)] for x in range(13)] 
    tminyr_history = [[-99 for x in range(32)] for x in range(13)] 
    tmaxyr_history = [[-99 for x in range(32)] for x in range(13)] 
    tminrc_history = [[0 for x in range(32)] for x in range(13)]  
    tmaxrc_history = [[0 for x in range(32)] for x in range(13)]  
    tmax_max_life = [[0 for x in range(32)] for x in range(13)] 
    tmax_min_life = [[9999 for x in range(32)] for x in range(13)] 
    tmin_max_life = [[0 for x in range(32)] for x in range(13)]  
    tmin_min_life = [[9999 for x in range(32)] for x in range(13)]   
    # Capture Station ID (all raw-tuples are per station)
    station_ID  = raw_tuples[0][1]
    # Process each raw daily tuple: create daily retail tuples while updating matrices.
    detail_list = []
    for t in raw_tuples:
        detail_row = dict(detail_template)
        detail_row['StationID'] = t[1]
        detail_row['Year'] = t[2]
        detail_row['Month'] = t[3]
        detail_row['Day'] = str(t[4])
        month = int(t[3])-1
        day = t[4]-1
        # For this day, what was the first year in which this station was operational?
        if fyr_online_for_day[month][day] > int(t[2]):
            fyr_online_for_day[month][day] = int(t[2])
        # Handle TMAX
        if (t[5] == 'TMAX'):
            # Handle TMAX for first record
            if (tmax_history[month][day] == -99):
                # Handle TMAX for first 
                detail_row['Type'] = 'TMAX'
                detail_row['OldTemp'] = round(t[6],1)
                detail_row['NewTemp'] = round(t[6],1)
                detail_row['TDelta'] = 0
                tmax_history[month][day] = round(t[6],1)
                tmaxyr_history[month][day] = int(t[2])
                tmaxrc_history[month][day] = 1
                tmax_min_life[month][day] = 0
                tmax_max_life[month][day] = 0
                # Add new daily detail row
                detail_list.append(detail_row)
            # Handle TMAX for new daily record
            elif (round(t[6],1) > tmax_history[month][day]):
                detail_row['Type'] = 'TMAX'
                detail_row['OldTemp'] = tmax_history[month][day]
                detail_row['NewTemp'] = round(t[6],1)
                detail_row['TDelta'] = round(t[6],1) - tmax_history[month][day]
                current_tmin_duration = int(t[2]) - tminyr_history[month][day]
                current_tmax_duration = int(t[2]) - tmaxyr_history[month][day]
                if tmin_max_life[month][day] == 0:
                    tmin_max_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
                if tmax_max_life[month][day] == 0:
                    tmax_max_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
                if current_tmax_duration > tmax_max_life[month][day]:
                    tmax_max_life[month][day] = current_tmax_duration
                if current_tmin_duration < tmin_max_life[month][day]:
                    tmin_max_life[month][day] = current_tmax_duration
                tmax_history[month][day] = round(t[6],1)
                tmaxyr_history[month][day] = int(t[2])
                tmaxrc_history[month][day] += 1
                # Add new daily detail row
                detail_list.append(detail_row)
        if (t[5] == 'TMIN'):
            # Handle TMIN for first record
            if (tmin_history[month][day] == -99):
                # Handle TMIN for first 
                detail_row['Type'] = 'TMIN'
                detail_row['OldTemp'] = round(t[6],1)
                detail_row['NewTemp'] = round(t[6],1)
                detail_row['TDelta'] = 0
                tmin_history[month][day] = round(t[6],1)
                tminyr_history[month][day] = int(t[2])
                tminrc_history[month][day] = 1
                tmin_min_life[month][day] = 0
                tmin_max_life[month][day] = 0
                # Add new daily detail row
                detail_list.append(detail_row)
            # Handle TMIN for new daily record
            elif (round(t[6],1) < tmin_history[month][day]):
                detail_row['Type'] = 'TMIN'
                detail_row['OldTemp'] = tmin_history[month][day]
                detail_row['NewTemp'] = round(t[6],1)
                detail_row['TDelta'] = tmin_history[month][day] - round(t[6],1)
                current_tmin_duration = int(t[2]) - tminyr_history[month][day]
                current_tmax_duration = int(t[2]) - tmaxyr_history[month][day]
                if tmax_min_life[month][day] == 0:
                    tmax_min_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
                if tmin_min_life[month][day] == 0:
                    tmin_min_life[month][day] = int(t[2]) - fyr_online_for_day[month][day]
                if current_tmax_duration > tmax_min_life[month][day]:
                    tmax_min_life[month][day] = current_tmin_duration
                if current_tmin_duration < tmin_min_life[month][day]:
                    tmin_min_life[month][day] = current_tmin_duration
                tmin_history[month][day] = round(t[6],1)
                tminyr_history[month][day] = int(t[2])
                tminrc_history[month][day] += 1
                # Add new daily detail row
                detail_list.append(detail_row)
    # Create a daily summary record for each day of the year using our matrices.
    summary_list = []
    now = datetime.datetime.now()
    for mth in xrange(1,13):
        for day in xrange(1,32):
            m = mth-1
            d= day-1
            summary_row = dict(summary_template)
            summary_row['StationID'] = station_ID
            summary_row['Month'] = mth
            summary_row['Day'] = day
            summary_row['FirstYearOfRecord'] = fyr_online_for_day[m][d]
            summary_row['TMin'] = tmin_history[m][d]
            summary_row['TMinRecordYear'] = tminyr_history[m][d]
            summary_row['TMax'] = tmax_history[m][d] 
            summary_row['TMaxRecordYear'] = tmaxyr_history[m][d]
            summary_row['CurTMinMaxDelta'] = summary_row['TMax'] - summary_row['TMin']
            summary_row['CurTMinRecordDur'] = int(now.year) - summary_row['TMinRecordYear']
            summary_row['CurTMaxRecordDur'] = int(now.year) - summary_row['TMaxRecordYear']
            summary_row['MaxDurTMinRecord'] = tmax_min_life[m][d] # Can not explain
            summary_row['MinDurTMinRecord'] = tmin_min_life[m][d]
            summary_row['MaxDurTMaxRecord'] = tmax_max_life[m][d]
            summary_row['MinDurTMaxRecord'] = tmin_max_life[m][d] # Can not explain       
            summary_row['TMinRecordCount'] = tminrc_history[m][d]
            summary_row['TMaxRecordCount'] = tmaxrc_history[m][d]
            # Add new daily summary row
            summary_list.append(summary_row)
    return approach2_bag(summary_list, detail_list)

def noaa_run_phase2_approach1(project_layout,create_details=False):
    '''Parse raw CVS files to create derived datasets.'''
    summary_list = []
    detail_list = []
    try:                
        # Start Key Processing Timer
        start_time = time.time()
        raw_files = os.path.join(project_layout['Raw_Details'], '', '*_raw.csv')
        for index, fname in enumerate(glob.glob(raw_files)):
            f = get_filename(fname).split('_')[0]
            print("Processing dataset {0}: {1}").format(index,f)
            summary_file = os.path.join(project_layout['Station_Summary'], '', f + '_sum.csv')
            detail_file = os.path.join(project_layout['Station_Details'], '', f + '_std.csv')
            dataset = pd.DataFrame.from_csv(fname)
            raw_tuples = list(dataset.itertuples())
            r = noaa_process_phase2_records(raw_tuples)
            df_summary = pd.DataFrame(r.DailySummary).sort(summary_cols).reindex(columns=summary_cols)
            df_cleaned_summary = cleans_invalid_days(df_summary)
            df_cleaned_summary.to_csv(summary_file)
            df_detail = pd.DataFrame(r.DailyDetail).sort(detail_cols).reindex(columns=detail_cols)
            df_cleaned_detail = cleans_invalid_days(df_detail)
            df_cleaned_detail.to_csv(detail_file)
        seconds = (time.time() - start_time)
        print(">> Processing Complete.")
        print(">>   Elapsed corpus execution time {0}").format(elapsed_time(seconds))   
    except Exception as e:
        var = traceback.format_exc()
        print var
        print(">> Processing Failed: Error {0}").format(e.message)

noaa_run_phase2_approach1

Takes a dictionary of project folder details to drive the processing of Gather Phase 2 Approach 1 using CSV files.

Disk Storage Requirements

  • This function creates a Station Summaries dataset that requires 25MB of free space.
  • This function can also create a Station Details dataset. If you require this dataset to be generated, modify the call to noaa_run_phase2_approach2() with create_details=True. You will need additional free space to support this feature. Estimated requirement: **150MB**

In [ ]:
# <help:noaa_run_phase2_approach1>
project_layout = { 
    'Content_Version': '',
    'Daily_Input_Files': '',
    'Raw_Details': '',
    'Missing_Details': '',
    'Station_Summary': '', 
    'Station_Details': '',
    }
noaa_run_phase2_approach1(project_layout)