This document is meant to illustrate QA method applied to our first version of public weather observation, https://data.planetos.com/datasets/noaa_synop2
In the first iteration we apply a simple QA, based on 15 year data history and statistical distribution analysis. This way we can filter out a portion of clearly bad values, but a large number of suspicious values are not discovered by tis method.
For statistical distribution, we take a simple approach, find the 05, 95 quantiles for each station by month, find their difference and use the difference + certain multiplier above the 95 and below the 05 quantiles, as boundaries for accepting values. This way we get very few or no false positives for variables like temperature and pressure, but a significant amount of false negatives remain in the data.
In [5]:
%matplotlib notebook
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import requests
import time
import pickle
from ipywidgets import FloatProgress
from IPython.display import display
#from API_client.python.datahub import datahub
#from API_client.python.lib.dataset import dataset
from apiclient.datahub import datahub
from apiclient.dataset import dataset
In [67]:
month = 12
In [68]:
%%time
data_file_path = '/data/files/eneli/synop_data_history_decoded/' ## insert data file path here
dtypes = {'cloud_below_station_type':str,
'cloud_location_type_AC':str,
'cloud_location_type_AS':str,
'cloud_location_type_CB':str,
'cloud_location_type_CI':str,
'cloud_location_type_CS':str,
'cloud_location_type_CU':str,
'cloud_location_type_ST':str,
'cloud_location_type_SC':str,
'cloud_type_high_compass_dir':str,
'cloud_type_low_compass_dir':str,
'cloud_type_middle_compass_dir':str,
'hoar_frost':str,
'hoar_frost_phenom_descr':str,
'weather_present_simple':str,
'state_of_ground_snow':str}
#fls = glob.glob('/data/files/eneli/synop_data_state_csv_metaf2xml/2018/5/*/*')
fls = glob.glob(data_file_path + '/2017/{0}/*'.format(month))
fin_csvs = []
usecols = ['time','station','pressure','temperature','station_pressure']
f = FloatProgress(min = 0, max = len(fls))
display(f)
for i in fls:
#try:
fin_csvs.append(pd.read_csv(i, usecols=usecols, dtype=dtypes))
#except:
# print("cannot read ", i)
# with open(i) as fff:
# print(len(fff.readlines()))
f.value += 1
fin = pd.concat(fin_csvs)
In [69]:
stations = fin['station'].unique()
In [70]:
physical_limits = {
"temperature": [-80, 65],
"wind_speed": [0, 90],
"pressure": [800,1100],
"station_pressure": [400,1100],
"precipitation_1_hour_accumulation": [0,200],
"precipitation_2_hour_accumulation": [0,250],
"precipitation_3_hour_accumulation": [0,400],
"precipitation_6_hour_accumulation": [0,400],
"precipitation_9_hour_accumulation": [0,400],
"precipitation_12_hour_accumulation": [0,400],
"precipitation_15_hour_accumulation": [0,400],
"precipitation_18_hour_accumulation": [0,400],
"precipitation_24_hour_accumulation": [0,400],
"rel_humidity1":[0,105],
"rel_humidity2":[0,105],
"rel_humidity3":[0,105],
"rel_humidity4":[0,105]
}
In [71]:
%%time
## st_table -- statistics of observations by station and month
st_table = {}
f = FloatProgress(min = 0, max = len(stations))
display(f)
for st in stations:
stn = "{0:05d}".format(int(st))
st_table[stn] = {}
t_temp = fin[fin['station'] == st]
varis = [i for i in t_temp.columns if not i in ['time','elevation','lon','lat','station']]
f.value+=1
for v in varis:
v_temp = t_temp[v]
if v_temp.dtype == np.float64:
st_table[stn][v] = {}
if ~np.all(v_temp.isnull()):
st_table[stn][v] = {}
st_table[stn][v][v+'_max'] = v_temp.max()
st_table[stn][v][v+'_min'] = v_temp.min()
st_table[stn][v][v+'_count'] = len(v_temp)
st_table[stn][v][v+'_quantiles'] = list(v_temp.quantile([0.05,0.25,0.5,0.75,0.95]))
In [72]:
def make_limit_dictionary(stations, physical_limits, monthlist, varlist):
ret_dict = {}
for st in stations:
stn = "{0:05d}".format(int(st))
ret_dict[stn] = {}
for mon in monthlist:
ret_dict[stn][mon] = {}
for var in varlist:
ret_dict[stn][mon][var] = {}
if var in physical_limits:
ret_dict[stn][mon][var][var+'_max'] = physical_limits[var][1]
ret_dict[stn][mon][var][var+'_min'] = physical_limits[var][0]
return ret_dict
In [73]:
%%time
## limit_dict -- dictionary with stations and limitations
## cols = [i for i in fin.columns if not i in ['time','elevation','lon','lat','station','station_name','report_modifier','station_name','station_type','synop_code']]
monthlist=list([month,])
limit_dict = make_limit_dictionary(stations, physical_limits, monthlist, physical_limits.keys())
In [74]:
def add_criteria(station, month, variable, st_table, limit_dict):
def get_vbounds():
if variable == 'temperature':
maxmindiff = 3 * (st_table[station][variable][variable + '_quantiles'][4] - st_table[station][variable][variable + '_quantiles'][0])
vmax = maxmindiff + st_table[station][variable][variable + '_quantiles'][4]
vmin = -maxmindiff + st_table[station][variable][variable + '_quantiles'][0]
elif variable in ['station_pressure', 'pressure']:
vmax = st_table[station][variable][variable + '_quantiles'][4] * 1.05
vmin = st_table[station][variable][variable + '_quantiles'][0] / 1.05
else:
vmax = st_table[station][variable][variable + '_quantiles'][4] * 1.5
vmin = st_table[station][variable][variable + '_quantiles'][0] / 1.5
return vmax, vmin
assert type(station) == str
assert type(variable) == str
assert type(st_table) == dict
if not variable in st_table[station]:
#print("no variable, returning", variable)
return
## if data is too sparse, better leave it
if not variable + '_count' in st_table[station][variable]:
return
if st_table[station][variable][variable + '_count'] < 2000:
return
if variable + '_max' in st_table[station][variable]:
#print("var found", variable)
#vmax = st_table[station][variable][variable + '_quantiles'][4] * 1.5
#vmin = st_table[station][variable][variable + '_quantiles'][0] / 1.5
vmax, vmin = get_vbounds()
if not variable + '_max' in limit_dict[station][month][variable]:
limit_dict[station][month][variable][variable + '_max'] = vmax
limit_dict[station][month][variable][variable + '_min'] = vmin
else:
if limit_dict[station][month][variable][variable + '_max'] > vmax:
limit_dict[station][month][variable][variable + '_max'] = vmax
if limit_dict[station][month][variable][variable + '_min'] < vmin:
limit_dict[station][month][variable][variable + '_min'] = vmin
else:
pass
##print("var not found in",st_table[station][variable], variable)
Combine statistics and hard limits
In [75]:
%%time
for strange in range(len(stations)):
for i in limit_dict["{0:05d}".format(stations[strange])][month].keys():
add_criteria("{0:05d}".format(stations[strange]),month,i,st_table, limit_dict)
Dump the file for later use
In [76]:
pickle.dump(limit_dict, open('limit_dict_{0}.pickle'.format(month),'wb'))
Helper methods to plot the problematic data
In [ ]:
# var = 'station_pressure'
def find_stlistike():
stlistike = []
f = FloatProgress(min=0, max=len(stations))
display(f)
var = 'pressure'
for st in range(len(stations)):
if np.any(fin[fin['station']==stations[st]][var] > limit_dict["{0:05d}".format(stations[st])][month][var][var + '_max']):
print(st,)
stlistike.append(st)
f.value += 1
return stlistike
In [ ]:
def plot_suspicious(var, stlistike):
for st in stlistike:
trtr = fin[fin['station']==stations[st]][['time',var,'station']].drop_duplicates().sort_values('time')
trtr['datetime'] = (trtr['time']*1.e9).apply(pd.to_datetime)
fig=plt.figure(figsize=(15,10))
plt.plot(trtr['datetime'],trtr[var],'*')
plt.plot(trtr['datetime'],np.ones(len(trtr))*limit_dict["{0:05d}".format(stations[st])][month][var][var + '_max'])
plt.plot(trtr['datetime'],np.ones(len(trtr))*limit_dict["{0:05d}".format(stations[st])][month][var][var + '_min'])
plt.title("{0} {1}".format(st, len(trtr.index)))
plt.show()
In [ ]:
def check_suspicious(var, st):
st = 410
fig=plt.figure(figsize=(15,10))
trtr = fin[fin['station']==stations[st]][['time',var,'station']].drop_duplicates().sort_values('time')
trtr['datetime'] = (trtr['time']*1.e9).apply(pd.to_datetime)
trfilt=trtr[trtr['datetime'].apply(lambda x: x.year) == 2010]
print(len(trfilt))
plt.plot(trfilt['datetime'],trfilt['temperature'])
plt.show()
In [ ]:
stl = find_stlistike()
In [ ]:
plot_suspicious('pressure',stl[160:200])
Merge all .pickle files in folder
In [85]:
a = [pickle.load(open(i,'rb')) for i in glob.glob('limit_dict_*.pickle')]
In [ ]:
from itertools import groupby
In [93]:
fd = {}
for oo in range(1,13):
a = pickle.load(open('limit_dict_{0}.pickle'.format(oo),'rb'))
for k,v in a.items(): # k is station ID
if not k in fd.keys():
fd[k] = v
else:
fd[k].update(v)
pickle.dump(fd, open('limit_dict_year.pickle'.format(month),'wb'))
In [92]:
fd['01001'][12]
Out[92]: