Weather station data limit detection

This document is meant to illustrate QA method applied to our first version of public weather observation, https://data.planetos.com/datasets/noaa_synop2

In the first iteration we apply a simple QA, based on 15 year data history and statistical distribution analysis. This way we can filter out a portion of clearly bad values, but a large number of suspicious values are not discovered by tis method.

For statistical distribution, we take a simple approach, find the 05, 95 quantiles for each station by month, find their difference and use the difference + certain multiplier above the 95 and below the 05 quantiles, as boundaries for accepting values. This way we get very few or no false positives for variables like temperature and pressure, but a significant amount of false negatives remain in the data.


In [5]:
%matplotlib notebook
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import requests
import time
import pickle

from ipywidgets import FloatProgress
from IPython.display import display

#from API_client.python.datahub import datahub
#from API_client.python.lib.dataset import dataset

from apiclient.datahub import datahub
from apiclient.dataset import dataset

In [67]:
month = 12

In [68]:
%%time
data_file_path = '/data/files/eneli/synop_data_history_decoded/' ## insert data file path here

dtypes = {'cloud_below_station_type':str,
          'cloud_location_type_AC':str,
          'cloud_location_type_AS':str,
          'cloud_location_type_CB':str,
          'cloud_location_type_CI':str,
          'cloud_location_type_CS':str,
          'cloud_location_type_CU':str,
          'cloud_location_type_ST':str,
          'cloud_location_type_SC':str,
          'cloud_type_high_compass_dir':str,
          'cloud_type_low_compass_dir':str,
          'cloud_type_middle_compass_dir':str,
          'hoar_frost':str,
          'hoar_frost_phenom_descr':str,
          'weather_present_simple':str,
          'state_of_ground_snow':str}
#fls = glob.glob('/data/files/eneli/synop_data_state_csv_metaf2xml/2018/5/*/*')

fls = glob.glob(data_file_path + '/2017/{0}/*'.format(month))
fin_csvs = []
usecols = ['time','station','pressure','temperature','station_pressure']
f = FloatProgress(min = 0, max = len(fls))
display(f)
for i in fls:
    #try:
    fin_csvs.append(pd.read_csv(i, usecols=usecols, dtype=dtypes))
    #except:
    #    print("cannot read ", i)
    #    with open(i) as fff:
    #        print(len(fff.readlines()))
    f.value += 1
fin = pd.concat(fin_csvs)


CPU times: user 3.84 s, sys: 216 ms, total: 4.06 s
Wall time: 6.43 s

In [69]:
stations = fin['station'].unique()

In [70]:
physical_limits = {
    "temperature": [-80, 65],
    "wind_speed": [0, 90],
    "pressure": [800,1100],
    "station_pressure": [400,1100],
    "precipitation_1_hour_accumulation": [0,200],
    "precipitation_2_hour_accumulation": [0,250],
    "precipitation_3_hour_accumulation": [0,400],
    "precipitation_6_hour_accumulation": [0,400],
    "precipitation_9_hour_accumulation": [0,400],
    "precipitation_12_hour_accumulation": [0,400],
    "precipitation_15_hour_accumulation": [0,400],
    "precipitation_18_hour_accumulation": [0,400],
    "precipitation_24_hour_accumulation": [0,400],
    "rel_humidity1":[0,105],
    "rel_humidity2":[0,105],
    "rel_humidity3":[0,105],
    "rel_humidity4":[0,105]
  }

In [71]:
%%time
## st_table -- statistics of observations by station and month


st_table = {}
f = FloatProgress(min = 0, max = len(stations))
display(f)
for st in stations:
    stn = "{0:05d}".format(int(st))
    st_table[stn] = {}
    t_temp = fin[fin['station'] == st]
    varis = [i for i in t_temp.columns if not i in ['time','elevation','lon','lat','station']]
    f.value+=1
    for v in varis:
        v_temp = t_temp[v]
        if v_temp.dtype == np.float64:
            st_table[stn][v] = {}
            if ~np.all(v_temp.isnull()):
                st_table[stn][v] = {}
                st_table[stn][v][v+'_max'] = v_temp.max()
                st_table[stn][v][v+'_min'] = v_temp.min()
                st_table[stn][v][v+'_count'] = len(v_temp)
                st_table[stn][v][v+'_quantiles'] = list(v_temp.quantile([0.05,0.25,0.5,0.75,0.95]))


CPU times: user 47.8 s, sys: 2.41 s, total: 50.2 s
Wall time: 43.3 s

In [72]:
def make_limit_dictionary(stations, physical_limits, monthlist, varlist):
    ret_dict = {}
    for st in stations:
        stn = "{0:05d}".format(int(st))
        ret_dict[stn] = {}
        for mon in monthlist:
            ret_dict[stn][mon] = {}
            for var in varlist:
                ret_dict[stn][mon][var] = {}
                if var in physical_limits:
                    ret_dict[stn][mon][var][var+'_max'] = physical_limits[var][1]
                    ret_dict[stn][mon][var][var+'_min'] = physical_limits[var][0]
    return ret_dict

In [73]:
%%time
## limit_dict -- dictionary with stations and limitations

## cols = [i for i in fin.columns if not i in ['time','elevation','lon','lat','station','station_name','report_modifier','station_name','station_type','synop_code']]
monthlist=list([month,])
limit_dict = make_limit_dictionary(stations, physical_limits, monthlist, physical_limits.keys())


CPU times: user 228 ms, sys: 16.6 ms, total: 244 ms
Wall time: 244 ms

In [74]:
def add_criteria(station, month, variable, st_table, limit_dict):
    def get_vbounds():
        if variable == 'temperature':
            maxmindiff = 3 * (st_table[station][variable][variable + '_quantiles'][4] - st_table[station][variable][variable + '_quantiles'][0])
            vmax = maxmindiff + st_table[station][variable][variable + '_quantiles'][4]
            vmin = -maxmindiff + st_table[station][variable][variable + '_quantiles'][0]
        elif variable in ['station_pressure', 'pressure']:
            vmax = st_table[station][variable][variable + '_quantiles'][4] * 1.05
            vmin = st_table[station][variable][variable + '_quantiles'][0] / 1.05        
        else:
            vmax = st_table[station][variable][variable + '_quantiles'][4] * 1.5
            vmin = st_table[station][variable][variable + '_quantiles'][0] / 1.5
        return vmax, vmin   
    assert type(station) == str
    assert type(variable) == str
    assert type(st_table) == dict
    
    if not variable in st_table[station]:
        #print("no variable, returning", variable)
        return
    
    ## if data is too sparse, better leave it
    if not variable + '_count' in st_table[station][variable]:
        return
    
    if st_table[station][variable][variable + '_count'] < 2000:
        return
    
    if variable + '_max' in st_table[station][variable]:
        #print("var found", variable)
        #vmax = st_table[station][variable][variable + '_quantiles'][4] * 1.5
        #vmin = st_table[station][variable][variable + '_quantiles'][0] / 1.5
        vmax, vmin = get_vbounds()
        if not variable + '_max' in limit_dict[station][month][variable]:
            limit_dict[station][month][variable][variable + '_max'] = vmax
            limit_dict[station][month][variable][variable + '_min'] = vmin
        else:
            if limit_dict[station][month][variable][variable + '_max'] > vmax:
                limit_dict[station][month][variable][variable + '_max'] = vmax
            if limit_dict[station][month][variable][variable + '_min'] < vmin:
                limit_dict[station][month][variable][variable + '_min'] = vmin
    else:
        pass
        ##print("var not found in",st_table[station][variable], variable)

Combine statistics and hard limits


In [75]:
%%time
for strange in range(len(stations)):
    for i in limit_dict["{0:05d}".format(stations[strange])][month].keys():
        add_criteria("{0:05d}".format(stations[strange]),month,i,st_table, limit_dict)


CPU times: user 303 ms, sys: 2.47 ms, total: 305 ms
Wall time: 305 ms

Dump the file for later use


In [76]:
pickle.dump(limit_dict, open('limit_dict_{0}.pickle'.format(month),'wb'))

Helper methods to plot the problematic data


In [ ]:
# var = 'station_pressure'
def find_stlistike():
    stlistike = []
    f = FloatProgress(min=0, max=len(stations))
    display(f)
    var = 'pressure'
    for st in range(len(stations)):
        if np.any(fin[fin['station']==stations[st]][var] > limit_dict["{0:05d}".format(stations[st])][month][var][var + '_max']):
            print(st,)
            stlistike.append(st)
        f.value += 1
    return stlistike

In [ ]:
def plot_suspicious(var, stlistike):
    for st in stlistike:
        trtr = fin[fin['station']==stations[st]][['time',var,'station']].drop_duplicates().sort_values('time')    
        trtr['datetime'] = (trtr['time']*1.e9).apply(pd.to_datetime)
        fig=plt.figure(figsize=(15,10))
        plt.plot(trtr['datetime'],trtr[var],'*')
        plt.plot(trtr['datetime'],np.ones(len(trtr))*limit_dict["{0:05d}".format(stations[st])][month][var][var + '_max'])
        plt.plot(trtr['datetime'],np.ones(len(trtr))*limit_dict["{0:05d}".format(stations[st])][month][var][var + '_min'])
        plt.title("{0}   {1}".format(st, len(trtr.index)))
    plt.show()

In [ ]:
def check_suspicious(var, st):
    st = 410
    fig=plt.figure(figsize=(15,10))
    trtr = fin[fin['station']==stations[st]][['time',var,'station']].drop_duplicates().sort_values('time')
    trtr['datetime'] = (trtr['time']*1.e9).apply(pd.to_datetime)
    trfilt=trtr[trtr['datetime'].apply(lambda x: x.year) == 2010]
    print(len(trfilt))
    plt.plot(trfilt['datetime'],trfilt['temperature'])
    plt.show()

In [ ]:
stl = find_stlistike()

In [ ]:
plot_suspicious('pressure',stl[160:200])

Merge all .pickle files in folder


In [85]:
a = [pickle.load(open(i,'rb')) for i in glob.glob('limit_dict_*.pickle')]

In [ ]:
from itertools import groupby

In [93]:
fd = {}
for oo in range(1,13):
    a = pickle.load(open('limit_dict_{0}.pickle'.format(oo),'rb'))
    for k,v in a.items(): # k is station ID
        if not k in fd.keys():
            fd[k] = v
        else:
            fd[k].update(v)
pickle.dump(fd, open('limit_dict_year.pickle'.format(month),'wb'))

In [92]:
fd['01001'][12]


Out[92]:
{'temperature': {'temperature_max': 65, 'temperature_min': -80},
 'wind_speed': {'wind_speed_max': 90, 'wind_speed_min': 0},
 'pressure': {'pressure_max': 1100, 'pressure_min': 800},
 'station_pressure': {'station_pressure_max': 1100,
  'station_pressure_min': 400},
 'precipitation_1_hour_accumulation': {'precipitation_1_hour_accumulation_max': 200,
  'precipitation_1_hour_accumulation_min': 0},
 'precipitation_2_hour_accumulation': {'precipitation_2_hour_accumulation_max': 250,
  'precipitation_2_hour_accumulation_min': 0},
 'precipitation_3_hour_accumulation': {'precipitation_3_hour_accumulation_max': 400,
  'precipitation_3_hour_accumulation_min': 0},
 'precipitation_6_hour_accumulation': {'precipitation_6_hour_accumulation_max': 400,
  'precipitation_6_hour_accumulation_min': 0},
 'precipitation_9_hour_accumulation': {'precipitation_9_hour_accumulation_max': 400,
  'precipitation_9_hour_accumulation_min': 0},
 'precipitation_12_hour_accumulation': {'precipitation_12_hour_accumulation_max': 400,
  'precipitation_12_hour_accumulation_min': 0},
 'precipitation_15_hour_accumulation': {'precipitation_15_hour_accumulation_max': 400,
  'precipitation_15_hour_accumulation_min': 0},
 'precipitation_18_hour_accumulation': {'precipitation_18_hour_accumulation_max': 400,
  'precipitation_18_hour_accumulation_min': 0},
 'precipitation_24_hour_accumulation': {'precipitation_24_hour_accumulation_max': 400,
  'precipitation_24_hour_accumulation_min': 0},
 'rel_humidity1': {'rel_humidity1_max': 105, 'rel_humidity1_min': 0},
 'rel_humidity2': {'rel_humidity2_max': 105, 'rel_humidity2_min': 0},
 'rel_humidity3': {'rel_humidity3_max': 105, 'rel_humidity3_min': 0},
 'rel_humidity4': {'rel_humidity4_max': 105, 'rel_humidity4_min': 0}}