In [ ]:
%matplotlib inline
import pandas as pd
import numpy as np
import nivapy3 as nivapy
import matplotlib.pyplot as plt
from sqlalchemy import types

plt.style.use('ggplot')

In [ ]:
# Connect to db
eng = nivapy.da.connect()

Upload ICPW template to RESA2

This notebook can be used to upload data from the ICPW template to the RESA2 database.

1. Format the template

1.1. General QA

  • Are the correct ICPW codes present?
  • Remove any blank rows
  • Do the numbers look reasonable?
  • Do any cells contain anything other than numbers or '<'?

1.2. Check column headings

The tidied template should have a single row of column headings where each entry corresponds to a method in the WC_METHOD_DEFINITIONS table. Open WC_METHOD_DEFINITIONS and filter LABORATORY=ICPW. You then need to find the correct method with the correct units and set the column heading in the template equal to the NAME attribute in the WC_METHOD_DEFINITIONS table.

NB: Take particular care with alkalinity (which is often reported awkwardly) and also with whether people are reporting e.g. $NO_3$ or $NO_3-N$.

Delete the Name column - the only identifier columns should be Code and Date.

1.3. Check data types

Delete any unused columns from the template (and also delete Bente's QA/QC columns).

Make sure the Date column is formated as an Excel date using dd.mm.yyyy and format all other columns as text.

2. Check data


In [ ]:
# Path to template to process
in_xlsx = (r'../../../Call_For_data_2018/replies'
           r'/netherlands/icpw_toc_trends_nl_tidied_core.xls')

In [ ]:
# Read useful tables from database
# Stations
sql = ('SELECT UNIQUE(station_code) '
       'FROM resa2.stations')
stn_df = pd.read_sql_query(sql, eng)

# Methods
sql = ("SELECT UNIQUE(name) "
       "FROM resa2.wc_method_definitions "
       "WHERE laboratory='ICPW'")
meth_df = pd.read_sql_query(sql, eng)

In [ ]:
# Read template
df = pd.read_excel(in_xlsx, sheet_name='Data')

# Check template
# 1. Check code and date cols exist
assert list(df.columns[:2]) == ['Code', 'Date'], 'The first two columns must be "Code" and "Date".'

# 2. Station codes are already in database?
if not set(stn_df['station_code']).issuperset(set(df['Code'])):
    print ('The following site codes are not in the database:')
    for site in list(set(df['Code']) - set(stn_df['station_code'])):
        print ('    %s' % site)
        
# 3. Check method column headings
temp_meths = df.columns
temp_meths = [i for i in temp_meths if i not in ('Code', 'Date')]

if not set(meth_df['name']).issuperset(set(temp_meths)):
    print ('The following methods are not in the database:')
    for meth in list(set(temp_meths) - set(meth_df['name'])):
        print ('    %s' % meth)
        
# 4. Check for duplicates
if df.duplicated(['Code', 'Date']).sum() > 0:
    print ('There are %s duplicated records:' % df.duplicated(['Code', 'Date']).sum())
    print ('\n', df[['Code', 'Date']][df.duplicated(['Code', 'Date'], keep=False)])

In [ ]:
# Drop duplicates (at random - use with caution!)
df = df.drop_duplicates(subset=['Code', 'Date'])

3. Append sample dates


In [ ]:
# Append sample dates
# Get station_ids
sql = ('SELECT station_id, station_code '
       'FROM resa2.stations')
stn_df = pd.read_sql_query(sql, eng)

# Join
samp_df = pd.merge(df, stn_df, how='left',
                   left_on='Code', right_on='station_code')

# Get cols and rename
ws_df = samp_df[['station_id', 'Date']].copy()
ws_df.columns = ['station_id', 'sample_date']

# Assume all depths are zero
ws_df['depth1'] = 0
ws_df['depth2'] = 0

ws_df.head()

In [ ]:
# Improve performance by explicitly setting dtypes. See
# https://stackoverflow.com/a/42769557/505698
dtypes = {c:types.VARCHAR(ws_df[c].str.len().max())
          for c in ws_df.columns[ws_df.dtypes == 'object'].tolist()}

# Add data to 'water_samples'
ws_df.to_sql(name='water_samples', 
             schema='resa2',
             con=eng, 
             if_exists='append', 
             index=False,
             dtype=dtypes)

4. Restructure chemistry data


In [ ]:
def f(row):
    """ Function to deal with flags.
    """
    if '<' in row['value_']:
        val = '<'
    elif '>' in row['value_']:
        val = '>'
    else:
        val = np.nan
    return val

In [ ]:
# Get unique list of stations associated with these samples
stn_ids = samp_df['station_id'].unique()

# Get list of sample_ids for these samples
if len(stn_ids) == 1:
    sql = ('SELECT water_sample_id, station_id, sample_date '
           'FROM resa2.water_samples '
           'WHERE station_id = %s' % stn_ids[0])
else:    
    stn_ids = str(tuple(stn_ids))
    sql = ('SELECT water_sample_id, station_id, sample_date '
           'FROM resa2.water_samples '
           'WHERE station_id IN %s' % stn_ids)
ws_df = pd.read_sql_query(sql, eng)

# Join sample id to chemistry
chem_df = pd.merge(samp_df, ws_df, how='left',
                   left_on=['station_id', 'Date'],
                   right_on=['station_id', 'sample_date'])

# Extract just cols of interest
chem_df = chem_df[['water_sample_id',] + temp_meths]

# Convert to long format
chem_df = pd.melt(chem_df, id_vars='water_sample_id')

# Get method ids
sql = ("SELECT wc_method_id, name "
       "FROM resa2.wc_method_definitions "
       "WHERE laboratory='ICPW'")
meth_df = pd.read_sql_query(sql, eng)

# Join to chem
chem_df = pd.merge(chem_df, meth_df, how='left',
                   left_on='variable', right_on='name')
chem_df = chem_df[['water_sample_id', 'wc_method_id', 'value']]
chem_df.columns = ['sample_id', 'method_id', 'value_']

# Drop NaNs
chem_df.dropna(how='any', inplace=True)

# Deal with flags
chem_df['value_'] = chem_df['value_'].astype(str)
chem_df['flag1'] = chem_df.apply(f, axis=1)

# Extract numeric chars
chem_df['value'] = chem_df['value_'].str.extract("([-+]?\d*\.\d+|\d+)", expand=True)
chem_df['value'] = chem_df['value'].astype(float)
del chem_df['value_']

# Reorder cols
chem_df = chem_df[['sample_id', 'method_id', 'value', 'flag1']]

# Check flags are consistent
if not pd.isnull(chem_df['flag1']).all():
    if not set(chem_df['flag1'].unique()).issubset(['<', '>', np.nan]):
        print ('Some flags are not valid:')
        print (chem_df['flag1'].unique())

chem_df.head()

5. Load chemistry data


In [ ]:
# Improve performance by explicitly setting dtypes. See
# https://stackoverflow.com/a/42769557/505698
dtypes = {c:types.VARCHAR(chem_df[c].str.len().max())
          for c in chem_df.columns[chem_df.dtypes == 'object'].tolist()}

# Add data to 'water_chemistry_values2'
chem_df.to_sql(name='water_chemistry_values2', 
               schema='resa2',
               con=eng, 
               if_exists='append', 
               index=False,
               dtype=dtypes)