In [1]:
%matplotlib inline
import pandas as pd, imp
from sqlalchemy import create_engine
Tore has previously written code to calculate Mann-Kendall (M-K) trend statistics and Sen's slope estimates for data series in RESA2. According to my notes from a meeting with Tore on 13/05/2016, the workflow goes something like this:
Run code to extract and summarise time series from RESA2, insert this data into Mann-Kendall_Sen.xls, then read the results back into a new table in RESA2 called e.g. ICPW_STATISTICS
.
Run the ICPStat
query in Icp-waters2001_2000.accdb to summarise the data in ICPW_STATISTICS
. This creates a new table currently called aaa
, but Tore says he'll rename it to something more descriptive before he leaves.
Run the export()
subroutine in the Export
module of Icp-waters2001_2000.accdb to reformat the aaa
table and write the results to an Excel file.
Mann-Kendall_Sen.xls is an early version of the popular Excel macro MULTIMK/CONDMK, which Tore has modified slightly for use in this analysis. (A more recent version of the same file is available here). This Excel macro permits some quite sophisticated multivariate and conditional analyses, but as far as I can tell the TOC trends code is only making use of the most basic functionality - performing repeated independent trend tests on annually summarised time series.
Unfortunately, although the workflow above makes sense, I've so far failed to find and run Tore's code for step 1 (I can find everything for steps 2 and 3, but not the code for interacting with the Excel workbook). It also seems a bit messy to be switching back and forth between RESA2, Excel and Access in this way, so the code here is a first step towards refactoring the whole analysis into Python.
The Mann-Kendall_Sen.xls file on the network already had some example ICPW data in it, which I can use to test my code. The raw input data and the results obtained from the Excel macro are saved as mk_sen_test_data.xlsx.
In [2]:
# Read data and results from the Excel macro
in_xlsx = (r'C:\Data\James_Work\Staff\Heleen_d_W\ICP_Waters\TOC_Trends_Analysis_2015'
r'\Data\mk_sen_test_data.xlsx')
raw_df = pd.read_excel(in_xlsx, sheetname='input')
res_df = pd.read_excel(in_xlsx, sheetname='results')
raw_df
Out[2]:
In [3]:
res_df
Out[3]:
Looking at the output in the ICPW_STATISTICS3
table of RESA2, we need to calculate the following statistcs (only some of which are output by the Excel macro):
Most of these should be quite straightforward. We'll start off by defining a function to calculate the M-K statistic (note that Scipy already has a function for the Theil-Sen slope). We'll also define another function to bundle everything together and return a dataframe of the results.
In [4]:
def mk_test(x, stn_id, par, alpha=0.05):
""" Adapted from http://pydoc.net/Python/ambhas/0.4.0/ambhas.stats/
by Sat Kumar Tomer.
Perform the MK test for monotonic trends. Uses the "normal
approximation" to determine significance and therefore should
only be used if the number of values is >= 10.
Args:
x: 1D array of data
name: Name for data series (string)
alpha: Significance level
Returns:
var_s: Variance of test statistic
s: M-K test statistic
z: Normalised test statistic
p: p-value of the significance test
trend: Whether to reject the null hypothesis (no trend) at
the specified significance level. One of:
'increasing', 'decreasing' or 'no trend'
"""
import numpy as np
from scipy.stats import norm
n = len(x)
if n < 10:
print (' Data series for %s at site %s has fewer than 10 non-null values. '
'Significance estimates may be unreliable.' % (par, int(stn_id)))
# calculate S
s = 0
for k in xrange(n-1):
for j in xrange(k+1,n):
s += np.sign(x[j] - x[k])
# calculate the unique data
unique_x = np.unique(x)
g = len(unique_x)
# calculate the var(s)
if n == g: # there is no tie
var_s = (n*(n-1)*(2*n+5))/18.
else: # there are some ties in data
tp = np.zeros(unique_x.shape)
for i in xrange(len(unique_x)):
tp[i] = sum(unique_x[i] == x)
# Sat Kumar's code has "+ np.sum", which is incorrect
var_s = (n*(n-1)*(2*n+5) - np.sum(tp*(tp-1)*(2*tp+5)))/18.
if s>0:
z = (s - 1)/np.sqrt(var_s)
elif s == 0:
z = 0
elif s<0:
z = (s + 1)/np.sqrt(var_s)
else:
z = np.nan
# calculate the p_value
p = 2*(1-norm.cdf(abs(z))) # two tail test
h = abs(z) > norm.ppf(1-alpha/2.)
if (z<0) and h:
trend = 'decreasing'
elif (z>0) and h:
trend = 'increasing'
elif np.isnan(z):
trend = np.nan
else:
trend = 'no trend'
return var_s, s, z, p, trend
def wc_stats(raw_df, st_yr=None, end_yr=None):
""" Calculate key statistics for the TOC trends analysis:
'station_id'
'par_id'
'non_missing'
'median'
'mean'
'std_dev'
'period'
'mk_std_dev'
'mk_stat'
'norm_mk_stat'
'mk_p_val'
'trend'
'sen_slp'
Args:
raw_df: Dataframe with annual data for a single station. Columns must
be: [station_id, year, par1, par2, ... parn]
st_yr: First year to include in analysis. Pass None to start
at the beginning of the series
end_year: Last year to include in analysis. Pass None to start
at the beginning of the series
Returns:
df of key statistics.
"""
import numpy as np, pandas as pd
from scipy.stats import theilslopes
# Checking
df = raw_df.copy()
assert list(df.columns[:2]) == ['STATION_ID', 'YEAR'], 'Columns must be: [STATION_ID, YEAR, par1, par2, ... parn]'
assert len(df['STATION_ID'].unique()) == 1, 'You can only process data for one site at a time'
# Get just the period of interest
if st_yr:
df = df.query('YEAR >= @st_yr')
if end_yr:
df = df.query('YEAR <= @end_yr')
# Get stn_id
stn_id = df['STATION_ID'].iloc[0]
# Tidy up df
df.index = df['YEAR']
df.sort_index(inplace=True)
del df['STATION_ID'], df['YEAR']
# Container for results
data_dict = {'station_id':[],
'par_id':[],
'non_missing':[],
'median':[],
'mean':[],
'std_dev':[],
'period':[],
'mk_std_dev':[],
'mk_stat':[],
'norm_mk_stat':[],
'mk_p_val':[],
'trend':[],
'sen_slp':[]}
# Loop over pars
for col in df.columns:
# 1. Station ID
data_dict['station_id'].append(stn_id)
# 2. Par ID
data_dict['par_id'].append(col)
# 3. Non-missing
data_dict['non_missing'].append(pd.notnull(df[col]).sum())
# 4. Median
data_dict['median'].append(df[col].median())
# 5. Mean
data_dict['mean'].append(df[col].mean())
# 6. Std dev
data_dict['std_dev'].append(df[col].std())
# 7. Period
st_yr = df.index.min()
end_yr = df.index.max()
per = '%s-%s' % (st_yr, end_yr)
data_dict['period'].append(per)
# 8. M-K test
# Drop missing values
mk_df = df[[col]].dropna(how='any')
# Only run stats if more than 1 valid value
if len(mk_df) > 1:
var_s, s, z, p, trend = mk_test(mk_df[col].values, stn_id, col)
data_dict['mk_std_dev'].append(np.sqrt(var_s))
data_dict['mk_stat'].append(s)
data_dict['norm_mk_stat'].append(z)
data_dict['mk_p_val'].append(p)
data_dict['trend'].append(trend)
# 8. Sen's slope
# First element of output gives median slope. Other results could
# also be useful - see docs
sslp = theilslopes(mk_df[col].values, mk_df.index, 0.95)[0]
data_dict['sen_slp'].append(sslp)
# Otherwise all NaN
else:
for par in ['mk_std_dev', 'mk_stat', 'norm_mk_stat',
'mk_p_val', 'trend', 'sen_slp']:
data_dict[par].append(np.nan)
# Build to df
res_df = pd.DataFrame(data_dict)
res_df = res_df[['station_id', 'par_id', 'period', 'non_missing',
'mean', 'median', 'std_dev', 'mk_stat', 'norm_mk_stat',
'mk_p_val', 'mk_std_dev', 'trend', 'sen_slp']]
return res_df
In [5]:
# Run analysis on test data and print results
out_df = wc_stats(raw_df)
del out_df['station_id']
out_df
Out[5]:
And below is the output from the Excel macro for comparison.
In [6]:
res_df
Out[6]:
My code gives near-identical results to those from the Excel macro, although there are a few edge cases that might be worth investigating further. For example, if there are fewer than 10 non-null values, my code currently prints a warning. I'm not sure exactly what the Excel macro does yet, but in general it seems that for fewer than 10 values it's necessary to use a lookup table (see e.g. the Instructions
sheet of the file here).
The next step is to read the correct data directly from RESA2 and summarise it to look like raw_df
, above. Start off by connecting to the database.
In [7]:
# Use custom RESA2 function to connect to db
r2_func_path = r'C:\Data\James_Work\Staff\Heleen_d_W\ICP_Waters\Upload_Template\useful_resa2_code.py'
resa2 = imp.load_source('useful_resa2_code', r2_func_path)
engine, conn = resa2.connect_to_resa2()
Looking at the ICPW_STATISTICS
table in RESA2, it seems as though trends have been assessed for 14 parameters and several different time periods for each site of interest. The length and number of time periods vary from site to site, so I'll need to check with Heleen regarding how these varaibles should be chosen. The 14 parameters are as follows:
Many of these quantities are unfamiliar to me, but presumably the equations for calculating them can be found in Tore's code (which I can't find at present). Check with Heleen whether all of these are still required and find equations as necessary.
The other issue is how to aggregate the values in the database from their original temporal resolution to annual summaries. I assume the median annual value is probably appropriate in most cases, but it would be good to know what Tore did previosuly.
For now, I'll focus on:
Extracting the data from the database for a specified time period,
Calculating the required water chemistry parameters,
Taking annual medians and
Estimating the trend statistics.
It should then be fairly easy to modify this code later as necessary.
Some of the quantities listed above are straightforward to calculate.
The Es in the parameter names are just unit conversions to micro-equivalents per litre:
$$EPAR \; (\mu eq/l) = \frac{1.10^6 * valency}{molar \; mass \; (g/mol)} * PAR \; (g/l)$$Molar masses and valencies for the key species listed above are given in the table below.
In [8]:
# Tabulate chemical properties
chem_dict = {'molar_mass':[96, 35, 40, 24, 14],
'valency':[2, 1, 2, 2, 1],
'resa2_ref_ratio':[0.103, 1., 0.037, 0.196, 'N/A']}
chem_df = pd.DataFrame(chem_dict, index=['SO4', 'Cl', 'Ca', 'Mg', 'NO3-N'])
chem_df = chem_df[['molar_mass', 'valency', 'resa2_ref_ratio']]
chem_df
Out[8]:
The Xs are sea-salt corrected values (also sometimes denoted with an asterisk e.g. Ca*). They are calculated by comparison to chloride concentrations, which are generall assumed to be conservative. The usual equation is:
$$PARX = PAR_{sample} - \left[ \left( \frac{PAR}{Cl} \right)_{ref} * Cl_{sample} \right]$$where $PAR_{sample}$ and $Cl_{sample}$ are the concentrations measured in the lake or river and $\left( \frac{PAR}{Cl} \right)_{ref}$ is (ideally) the long-term average concentration in incoming rainwater. In some cases the reference values are simply taken from sea water concentrations (ignoring effects such as evaporative fractionation etc.).
I'm not sure what values to assume, but by rearranging the above equations and applying it to data extarcted from RESA2 I can back-calculate the reference values. For example, brief testing using data from Italy, Switzerland and the Czech Republic implies that RESA2 uses a standard reference value for sulphate of 0.103.
The reference ratios inferred from RESA2 for the key species listed are given in the table above.
NB: In doing this I've identified some additional erros in the database, where this correction has not beeen performed correctly. For some reason, ESO4X values have been set to zero, despite valid ESO4 and ECl measurements being available. The problem only affects a handful od sample, but could be enough to generate false trends. Return to this later?
NB2: Leah's experiences with the RECOVER project suggest that assuming a single reference concentration for all countires in the world is a bad idea. For example, I believe in e.g. the Czech Republic and Italy it is usual not to calculate sea-salt corrected concentrations at all, because most of the chloride input comes from industry rather than marine sources. Rainwater concentrations are also likely to vary dramatically from place to place, especially given the range of geographic and climatic conditions covered by this project. Check with Heleen.
Need to calculate this ANC, ALK, HPLUS and ENO3DIVENO3ESO4X.
In [9]:
# Get stations for a specified list of projects
proj_list = ['ICPW_TOCTRENDS_2015_CZ', 'ICPW_TOCTRENDS_2015_IT']
sql = ('SELECT station_id, station_code '
'FROM resa2.stations '
'WHERE station_id IN (SELECT UNIQUE(station_id) '
'FROM resa2.projects_stations '
'WHERE project_id IN (SELECT project_id '
'FROM resa2.projects '
'WHERE project_name IN %s))'
% str(tuple(proj_list)))
stn_df = pd.read_sql(sql, engine)
stn_df
Out[9]:
In [10]:
# Specify parameters of interest
par_list = ['SO4', 'Cl', 'Ca', 'Mg', 'NO3-N', 'TOC', 'Al']
if 'DOC' in par_list:
print ('The database treats DOC and TOC similarly.\n'
'You should probably enter "TOC" instead')
# Check pars are valid
if len(par_list)==1:
sql = ("SELECT * FROM resa2.parameter_definitions "
"WHERE name = '%s'" % par_list[0])
else:
sql = ('SELECT * FROM resa2.parameter_definitions '
'WHERE name in %s' % str(tuple(par_list)))
par_df = pd.read_sql_query(sql, engine)
assert len(par_df) == len(par_list), 'One or more parameters not valid.'
In [11]:
# Get results for ALL pars for sites and period of interest
if len(stn_df)==1:
sql = ("SELECT * FROM resa2.water_chemistry_values2 "
"WHERE sample_id IN (SELECT water_sample_id FROM resa2.water_samples "
"WHERE station_id = %s)"
% stn_df['station_id'].iloc[0])
else:
sql = ("SELECT * FROM resa2.water_chemistry_values2 "
"WHERE sample_id IN (SELECT water_sample_id FROM resa2.water_samples "
"WHERE station_id IN %s)"
% str(tuple(stn_df['station_id'].values)))
wc_df = pd.read_sql_query(sql, engine)
# Get all sample dates for sites and period of interest
if len(stn_df)==1:
sql = ("SELECT water_sample_id, station_id, sample_date "
"FROM resa2.water_samples "
"WHERE station_id = %s " % stn_df['station_id'].iloc[0])
else:
sql = ("SELECT water_sample_id, station_id, sample_date "
"FROM resa2.water_samples "
"WHERE station_id IN %s " % str(tuple(stn_df['station_id'].values)))
samp_df = pd.read_sql_query(sql, engine)
# Join in par IDs based on method IDs
sql = ('SELECT * FROM resa2.wc_parameters_methods')
meth_par_df = pd.read_sql_query(sql, engine)
wc_df = pd.merge(wc_df, meth_par_df, how='left',
left_on='method_id', right_on='wc_method_id')
# Get just the parameters of interest
wc_df = wc_df.query('wc_parameter_id in %s' % str(tuple(par_df['parameter_id'].values)))
# Join in sample dates
wc_df = pd.merge(wc_df, samp_df, how='left',
left_on='sample_id', right_on='water_sample_id')
# Join in parameter units
sql = ('SELECT * FROM resa2.parameter_definitions')
all_par_df = pd.read_sql_query(sql, engine)
wc_df = pd.merge(wc_df, all_par_df, how='left',
left_on='wc_parameter_id', right_on='parameter_id')
# Join in station codes
wc_df = pd.merge(wc_df, stn_df, how='left',
left_on='station_id', right_on='station_id')
# Convert units
wc_df['value'] = wc_df['value'] * wc_df['conversion_factor']
# Extract columns of interest
wc_df = wc_df[['station_id', 'sample_date', 'name', 'value']]
# Unstack
wc_df.set_index(['station_id', 'sample_date', 'name'], inplace=True)
wc_df = wc_df.unstack(level='name')
wc_df.columns = wc_df.columns.droplevel()
wc_df.reset_index(inplace=True)
wc_df.columns.name = None
wc_df.head()
Out[11]:
In [12]:
# Extract year from date column
wc_df['year'] = wc_df['sample_date'].map(lambda x: x.year)
del wc_df['sample_date']
# Groupby station_id and year
grpd = wc_df.groupby(['station_id', 'year'])
# Calculate median
wc_df = grpd.agg('median')
wc_df.head()
Out[12]:
In [13]:
# 1. Convert to ueq/l
for par in ['SO4', 'Cl', 'Mg', 'Ca', 'NO3-N']:
val = chem_df.ix[par, 'valency']
mm = chem_df.ix[par, 'molar_mass']
if par == 'NO3-N':
wc_df['ENO3'] = wc_df[par] * val / mm
else:
wc_df['E%s' % par] = wc_df[par] * val * 1000. / mm
# 2. Apply sea-salt correction
for par in ['ESO4', 'EMg', 'ECa']:
ref = chem_df.ix[par[1:], 'resa2_ref_ratio']
wc_df['%sX' % par] = wc_df[par] - (ref*wc_df['ECl'])
# 3. Calculate combinations
# 3.1. ESO4 + ECl
wc_df['ESO4_ECl'] = wc_df['ESO4'] + wc_df['ECl']
# 3.2. ECa + EMg
wc_df['ECa_EMg'] = wc_df['ECa'] + wc_df['EMg']
# 3.3. ECaX + EMgX
wc_df['ECaX_EMgX'] = wc_df['ECaX'] + wc_df['EMgX']
# 3.4. ESO4 + ECl + ENO3
wc_df['ESO4_ECl_ENO3'] = wc_df['ESO4'] + wc_df['ECl'] + wc_df['ENO3']
# 4. Delete unnecessary columns and tidy
for col in ['SO4', 'Cl', 'Mg', 'Ca', 'NO3-N']:
del wc_df[col]
wc_df.reset_index(inplace=True)
wc_df.head()
Out[13]:
In [14]:
def process_water_chem_df(stn_df, wc_df, st_yr=None, end_yr=None):
""" Calculate statistics for the stations, parameters and time
periods specified.
Args:
stn_df: Dataframe of station_ids
wc_df: Dataframe of water chemistry time series for stations
and parameters of interest
st_yr: First year to include in analysis. Pass None to start
at the beginning of the series
end_year: Last year to include in analysis. Pass None to start
at the beginning of the series
Returns:
Dataframe of statistics
"""
# Container for output
df_list = []
# Loop over sites
for stn_id in stn_df['station_id']:
# Extract data for this site
df = wc_df.query('station_id == @stn_id')
# Modify col names
names = list(df.columns)
names[:2] = ['STATION_ID', 'YEAR']
df.columns = names
# Run analysis
df_list.append(toc_stats(df, st_yr=st_yr, end_yr=end_yr))
res_df = pd.concat(df_list, axis=0)
return res_df
res_df = process_water_chem_df(stn_df, wc_df)
res_df.head()
Out[14]:
This seems to be working OK so far, but I need to do some more testing to see that my results more-or-less agree with those calculated previously by Tore. As a start, let's compare the results above with those in the ICPW_STATISTICS3
table of RESA2, which is where (I think) Tore has saved his previous output.
In [15]:
# Get results for test sites from RESA2
sql = ('SELECT * FROM resa2.icpw_statistics3 '
'WHERE station_id IN %s'
% str(tuple(stn_df['station_id'].values)))
stat_df = pd.read_sql(sql, engine)
# Get just the cols to compare to my output
stat_df = stat_df[['station_id', 'parameter', 'period', 'nonmiss',
'average', 'median', 'stdev', 'test_stat',
'mk_stat', 'mkp', 'senslope']]
stat_df.head(14).sort_values(by='parameter')
Out[15]:
For e.g. site 23499, I can now re-run my code for the period from 1990 to 2004 and compare my results to those above.
In [16]:
# Re-run python analysis for the period 1990 - 2004
res_df = process_water_chem_df(stn_df, wc_df, st_yr=1990, end_yr=2004)
# Delete mk_std_dev as not relevant here
del res_df['mk_std_dev']
res_df.head(14).sort_values(by='par_id')
Out[16]:
The numbers in the above two tables are almost identical, which is actually pretty remarkable given that I'm second-guessing a lot of the decisions in Tore's analysis (having never seen his code) and also recoding everything from scratch. It certainly looks as though this will be a viable alternative for re-running the trends analysis.
There are still some loose ends to tie up. In particular, I need to add a few more parameters to the trends calculation, but that shouldn't be difficult once I've spoken to Heleen to find out what they are and how to calculate them. In the meantime, I'm sufficiently happy with this output to move the code into a separate module and then continue to explore the data in a new notebook.
NB: A nice way to visualise these results would be to create a google map, where each point is coloured according to 'increasing', 'decreasing' or 'no trend'. A pop-up on each point could then give the main summary statistics and a time series plot with the Sen's slope line overlaid. This would result in lots of points on top on each other, but users could filter the map to just show one parameter at a time to avoid things becoming too cluttered.