In [110]:
import pandas as pd
import numpy as np
import elm_autoencoder as elmae
from datetime import datetime
In [111]:
dat = pd.read_csv("data/LSJA24U64GS058596.csv")
In [112]:
dat['starttime'] = dat['starttime'].map(lambda x:datetime.strptime(x,"%Y-%m-%d %H:%M:%S"))
In [113]:
dat = dat.sort('starttime')
In [123]:
def train(training):
iternum = 5
score_mean = []
score_cov = []
for i in range(0,iternum):
model = elmae.ELMAutoEncoder(n_hidden=1000)
model.fit(training)
## 计算likelihood参数
diff_train = abs(model._get_predictions()-training)
score_mean.append(np.apply_along_axis(np.mean,0,diff_train))
score_cov.append(np.cov(diff_train.T))
score_mean_sum = score_mean[0]
score_cov_sum = score_cov[0]
for i in range(1,iternum):
score_mean_sum = score_mean_sum+score_mean[i]
score_cov_sum = score_cov_sum+score_cov[i]
score_mean = score_mean_sum/iternum
score_cov = score_cov_sum/iternum
np.savetxt("model/coef_hidden_.txt",model.coef_hidden_)
np.savetxt("model/intercept_hidden_.txt",model.intercept_hidden_)
np.savetxt("model/coef_output_.txt",model.coef_output_)
np.savetxt("model/score_mean.txt",score_mean)
np.savetxt("model/score_cov.txt",score_cov)
return model
In [124]:
def detect(elm,testing):
coef_hidden_ = np.loadtxt("model/coef_hidden_.txt")
intercept_hidden_ = np.loadtxt("model/intercept_hidden_.txt")
coef_output_ = np.loadtxt("model/coef_output_.txt")
#score_mean = np.loadtxt("model/score_mean.txt")
score_cov = np.loadtxt("model/score_cov.txt")
#anomaly_score = []
def predict(testing,coef_hidden_,intercept_hidden,coef_output_):
A = np.dot(testing,coef_hidden_)
A += intercept_hidden_
hidden_activations_ = np.tanh(A)
preds = np.dot(hidden_activations_, coef_output_)
return preds
#from sklearn.metrics import mean_squared_error
def anomaly_s(x,coef_hidden_,intercept_hidden_,coef_output_):
pred = predict(x,coef_hidden_,intercept_hidden_,coef_output_)
actual = x
d = np.array(abs(pred-actual))
a = np.dot(d.T,np.linalg.inv(score_cov))
b = np.dot(a,d)
#b = mean_squared_error(actual,pred)
return b
return np.apply_along_axis(anomaly_s,1,testing,coef_hidden_,intercept_hidden_,coef_output_)
In [125]:
axy = np.array(dat[['tboxaccelx','tboxaccely']])
#,'tboxaccelz','vehspeed','vehrpm', 'vehaccelpos', 'vehbrakepos'
axy = np.apply_along_axis(lambda x: (x-np.average(x))/np.sqrt(np.var(x)), 0, axy)
model = train(axy)
#n = int(len(axy)/1000)
#for i in range(0,n):
# model.add_fit(axy[i*1000:(i+1)*1000])
In [118]:
anomaly_score = detect(model,axy)
In [119]:
import matplotlib.pylab as plt
%matplotlib inline
plt.plot(np.log(anomaly_score))
Out[119]:
In [120]:
plt.hist(np.log(anomaly_score))
Out[120]:
In [121]:
dat['anomaly_score'] = np.log(anomaly_score)
In [122]:
dat.sort('anomaly_score',ascending=False)
Out[122]:
In [109]:
len(axy)
Out[109]:
In [ ]: