In [1]:
import pandas as pd
import datetime
import os
DATA_DIR = '../data/raw/'
#The files are given as fixed width files.
#See Documentation in ../data/
#All columns in the WY2 files. Most of this information isn't useful to me
#WY2_colspecs = [(0, 5), (5, 6), (6, 16), (16, 20), (20, 24),
# (24, 26), (26, 30), (30, 32), (32, 36), (36, 38),
# (38, 42), (42, 43), (43, 47), (47, 48), (48, 52),
# (52, 53), (53, 57), (57, 58), (58, 60), (60, 61),
# (61, 65), (65, 66), (66, 70), (70, 71), (71, 75),
# (75, 76), (76, 84), (84, 85), (85, 90), (90, 91),
# (91, 92), (92, 95), (95, 96), (96, 100), (100, 101),
# (101, 104), (104, 105), (105, 109), (109, 110),
# (110, 112), (112, 113), (113, 115), (115, 116),
# (116, 117), (117, 118)]
There is a metadata file that contains lat long coordinates of stations, identified by their WBAN number.
Further, the time recordings at each station are given in "local standard time" (LST), and the mlong coordinate provides a means to convert to UTC (universal time): LST = UTC - MLONG/15, or UTC = LST + MLONG/15 (all units in hours). I will convert all of the times into UTC
In [2]:
col_names = ['Name', 'WBAN', 'lat', 'long', 'mlong', 'first_year', 'last_year']
loc_cols = [(0,24), (24, 30), (45, 52), (52, 58), (59, 65), (74, 76), (77, 79)]
In [3]:
D_loc = pd.read_fwf(DATA_DIR + 'locations.txt', colspecs = loc_cols, comment = '#',
header = None, names = col_names)
In [4]:
len(D_loc) #Total number of time series records.
Out[4]:
In [5]:
D_loc.head()
Out[5]:
In [6]:
#The year is specified only with the last 2 digits
#but, data collection started in after 1950 and ended before 2050
def fix_year(yr : int):
if yr > 50:
yr += 1900
else:
yr += 2000
return yr
In [7]:
D_loc.loc[:, ['first_year', 'last_year']] = D_loc.loc[:, ['first_year', 'last_year']].applymap(fix_year)
In [8]:
D_loc.head()
Out[8]:
In [9]:
def time_correction(mlong : float):
'''The time delta to add to an LST time to yield a UTC time,
given the prime meridian mlong in degrees.'''
return datetime.timedelta(hours = mlong / 15)
In [10]:
D_loc['time_correction'] = D_loc.loc[:, 'mlong'].apply(time_correction)
In [11]:
D_loc.head()
Out[11]:
In [12]:
del D_loc['mlong'] #No longer needed
In [13]:
D_loc.head()
Out[13]:
In [14]:
def wban_fname(wban):
'''Convert the WBAN string into the filename we need to look for'''
for root, dirs, files in os.walk(DATA_DIR):
for f in files:
if f.endswith('WY2') and f.startswith(wban):
return root + '/' + f
return 404
%timeit wban_fname('xxx') #check time to traverse whole directory structure
In [15]:
D_loc['WBAN_file'] = D_loc['WBAN'].apply(wban_fname)
In [16]:
D_loc.head()
Out[16]:
In [17]:
WY2_cols = [(6, 16), (91, 95), (95, 96)] #Time, temperature, temp flag
col_names = ['Time', 'T', 'T_flag'] #Flags indicate missing or estimated data
In [18]:
#Hours are from 01...24, so we must subtract 1 from the hour
#Parsing the date takes by far the most time
def process_wy2f(file_path, time_correction):
#0: Observed
#-1: Missing data
#1: Algorithmically adjusted
#2: Hand estimate
#3: Interpolated
#4: Determined from a model
#5: Derived
Tflags = {'': 0,'9': -1, 'A': 1, 'E': 2, 'I': 3, 'M': 4, 'Q': 5}
convert_Tflag = lambda tf: Tflags[tf]
#date_parser = lambda d: pd.to_datetime(str(int(d) - 1), format = '%Y%m%d%H') + td_1hr
#This is about 5x faster
date_parser = lambda d: datetime.datetime(int(d[0:4]), int(d[4:6]), int(d[6:8]), int(d[8:10]) - 1)
D = pd.read_fwf(file_path, colspecs = WY2_cols, nrows = 100,
header = None, names = col_names, parse_dates = ['Time'],
converters = {'T_flag' : convert_Tflag},
date_parser = date_parser)
D['T_flag'].fillna('0')
D.loc[:, 'T'] = D.loc[:, 'T'] / 10
D.loc[:, 'Time'] = D.loc[:, 'Time'] + time_correction
D.index = pd.DatetimeIndex(D.loc[:, 'Time'])
del D['Time']
return D
In [23]:
D = process_wy2f(D_loc['WBAN_file'][0], D_loc['time_correction'][0])
%timeit process_wy2f(D_loc['WBAN_file'][0], D_loc['time_correction'][0])
In [24]:
D.head()
Out[24]:
In [25]:
D.to_hdf('test.hdf', key = 'test')
D = pd.read_hdf('test.hdf', key = 'test')
D.head()
Out[25]:
In [26]:
#Iterator on row pd.Series objects
#This is used with map to process
#all the files in parallel.
(r for i, r in D_loc.iterrows())
Out[26]:
In [ ]: