In [1]:
import sunspot as r
import scipy
import pandas
def dominant_trial(x):
"""Returns the trial with maximum fitness at the final update."""
final = x[x['update']==x['update'].max()]
dominant = final.ix[final['max_fitness'].idxmax()]
return x[x['trial']==dominant['trial']]
def error_indices(x):
"""Calculates a variety of error indices on x; x must have "observed" and "predicted" values.
Zero handling is a problem in these measurements, so we remove all values
where the denominator is zero. This means that these metrics should be interpreted
as being specific to the non-zero denom samples; i.e, MAPE is the mean absolute
percentage error WHEN THERE IS A NON-ZERO OBSERVED VALUE, MRAE is the mean relative absolute
error WHEN BASE ERROR IS NON-ZERO, and so on."""
# pandas series have some weird properties that make them less useful for these calculations, so let's
# convert them to numpy arrays:
obs = numpy.array(x["observed"], dtype=float)
pre = numpy.array(x["predicted"], dtype=float)
# these are all vectors:
error = obs - pre
sqr_error = error**2
abs_error = fabs(error)
pct_error = 100.0 * (error / obs)[obs!=0]
sym_error = 2.0 * (fabs(error) / (obs + pre))[(obs + pre)!=0]
base_error = obs[1:] - pre[:-1]
rel_error = (error[1:]/base_error)[base_error!=0]
# scalars:
maeb = mean(fabs(base_error))
mseb = mean(base_error**2)
rmseb = sqrt(mean(base_error**2))
return pandas.Series({
'MSE' : mean(sqr_error), # ==> Mean Squared Error = mean(e^2_t)
'RMSE' : sqrt(mean(sqr_error)), # ==> Root Mean Squared Error = sqrt(MSE)
'MAE' : mean(abs_error), # ==> Mean Absolute Error = mean(abs(e_t))
'MDAE' : median(abs_error), # ==> Median Absolute Error = median(abs(e_t))
'MAPE' : mean(fabs(pct_error)), # ==> Mean Absolute Percentage Error = mean((p_t))
'MDAPE' : median(fabs(pct_error)), # ==> Median Absolute Percentage Error = median((p_t))
'SMAPE' : mean(sym_error), # ==> Symmetric Mean Absolute Percentage Error = mean(2abs(Y_t - F_t)/(Y_t + F_t))
'SMDAPE' : median(sym_error), # ==> Symmetric Median Absolute Percentage Error = median(2abs(Y_t - F_t)/(Y_t + F_t))
'MRAE' : mean(fabs(rel_error)), # ==> Mean Relative Absolute Error = mean(abs(r_t))
'MDRAE' : median(fabs(rel_error)), # ==> Median Relative Absolute Error = median(abs(r_t))
'GMRAE' : scipy.stats.gmean(fabs(rel_error[rel_error!=0])), # ==> Geometric Mean Relative Absolute Error = gmean(abs(r_t))
'RELMAE' : mean(abs_error) / maeb, # ==> Relative Mean Absolute Error = MAE / MAE_b
'RELMSE' : mean(sqr_error) / mseb, # ==> Relative Mean Squared Error = MSE / MSE_b
'RELRMSE' : sqrt(mean(sqr_error)) / rmseb, # ==> Relative Root Mean Squared Error = RMSE / RMSE_b
'LMR' : log(sqrt(mean(sqr_error)) / rmseb), # ==> Log Mean Squared Error Ratio = log(RelRMSE)
'PB' : 100.0 * mean(rel_error[rel_error<1]), # ==> Percentage Better = 100 * mean(I{r_t < 1})
'PBMAE' : 100.0 * mean(abs_error[abs_error<maeb]), # ==> Percentage Better Mean Absolute Error = 100 * mean(I{MAE < MAE_b})
'PBMSE' : 100.0 * mean(sqr_error[sqr_error<mseb]), # ==> Percentage Better Mean Squared Error = 100 * mean(I{MSE < MSE_b})
'PBRMSE' : 100.0 * sqrt(mean(sqr_error[sqr_error<rmseb])) # ==> Percentage Better Root Mean Squared Error = 100 * sqrt(mean(I{MSE < MSE_b}))
})
def calc_errors(X):
"""Calculates all error indices for Dataframe X."""
T = X.groupby(["trial","t"]).apply(error_indices).reset_index()
return T
In [2]:
import os
import re
import pandas
import subprocess
import StringIO
def dk_treatment(filename):
m = re.search(r".*\/(\w+)_(\d+)\/[\w\.]*$", filename)
if m is not None:
return m.group(1)
else:
return None
def dk_trial(filename):
m = re.search(r".*\/(\w+)_(\d+)\/[\w\.]*$", filename)
if m is not None:
return m.group(2)
else:
return None
def load_files2(files, treatmentf=dk_treatment, trialf=dk_trial):
"""Load data from files into a pandas.DataFrame.
Keyword arguments:
files -- a list of files to load data from
treatmentf -- a function that assigns a treatment code given the filename
trialf -- a function that assigns a trial code given the filename
"""
D = None
for i in files:
# we have to do all this subprocess junk because the python
# gzip module is buggy...
cmd = 'gunzip -cqf ' + i
p = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
text, stderr = p.communicate()
fh = StringIO.StringIO(text)
d = pandas.read_table(fh, sep=r"\s+(?!$)")
treatment_code = treatmentf(i)
if treatment_code is not None:
d['treatment'] = treatment_code
trial_code = trialf(i)
if trial_code is not None:
d['trial'] = trialf(i)
if D is None:
D = d
else:
D = D.append(d, ignore_index=True)
return D
In [16]:
def sunspot_method(filename):
m = re.search(r".*_([a-zA-Z]+)_([a-zA-Z\d_])+_(\d+)\.txt$", filename)
if m is not None:
return m.group(1)
else:
return None
def sunspot_treatment(filename):
m = re.search(r".*_([a-zA-Z]+)_([a-zA-Z\d_])+_(\d+)\.txt$", filename)
if m is not None:
return m.group(2)
else:
return "*"
def sunspot_trial(filename):
m = re.search(r".*_([a-zA-Z]+)_([a-zA-Z\d_])+_(\d+)\.txt$", filename)
if m is not None:
return m.group(3)
else:
return None
def other_dominants(filenames):
E = {}
for i in filenames:
D = load_files2([i], treatmentf=sunspot_treatment, trialf=sunspot_trial)
err = calc_errors(D)
method = sunspot_method(i)
err['trial'] = sunspot_trial(i)
err['treatment'] = sunspot_treatment(i)
err['method'] = method
if method in E:
if err['RMSE'].sum() < E[method]['RMSE'].sum():
E[method] = err
else:
E[method] = err
return E
def dominants(filenames):
E = {}
for i in filenames:
D = load_files2([i])
err = calc_errors(D)
method = 'MN'
err['trial'] = dk_trial(i)
err['treatment'] = dk_treatment(i)
err['method'] = method
if method in E:
if err['RMSE'].sum() < E[method]['RMSE'].sum():
E[method] = err
else:
E[method] = err
return E
In [23]:
for i in ["19971998", "20002001", "20032004", "20092010"]:
E = other_dominants(r.find_files("/Users/dk/research/src/sunspot/var/daily-ssn/"+i, "PredictionForTestData_.*"))
print i
for k,v in E.items():
print k, v['RMSE'].sum()