In [ ]:
import numpy as np
import os
import pickle
import pandas as pd
import matplotlib.pyplot as plt
import random
import glob
import wfdb
import pandas as pd
from sklearn.model_selection import KFold
from keras.layers import Input,Flatten, Convolution1D,BatchNormalization,Dense,Input,Dropout,MaxPool1D,GlobalAvgPool1D,\
AveragePooling1D,concatenate,Activation
from keras.models import Model
from keras.callbacks import EarlyStopping,ReduceLROnPlateau,ModelCheckpoint
from keras import backend as K
from keras.backend.tensorflow_backend import clear_session
from keras.regularizers import L1L2
In [ ]:
def calculate_scores(prob,y_true,verbose=1):
y_pred=[]
for pb in prob:
if np.argmax(pb)==0:
y_pred.append(np.array([1,0]))
if np.argmax(pb)==1:
y_pred.append(np.array([0,1]))
y_pred=np.array(y_pred)
accuracy=np.sum(y_pred[:,0]==y_true[:,0])/y_true.shape[0]
tp=0
fp=0
tn=0
fn=0
for y_p,y_t in zip(y_pred,y_true):
if y_p[1]==1 and y_t[1]==1:
tp=tp+1
if y_p[1]==1 and y_t[1]==0:
fp=fp+1
if y_p[1]==0 and y_t[1]==0:
tn=tn+1
if y_p[1]==0 and y_t[1]==1:
fn=fn+1
if (tp+fn)==0:
sensitivity='nan'
else:
sensitivity=tp/(tp+fn)
if (tn+fp)==0:
specificity='nan'
else:
specificity=tn/(tn+fp)
scores={'accuracy':accuracy,'sensitivity':sensitivity,'specificity':specificity}
if verbose:
print('accuracy: {}\t sensitivity: {}\t specificity: {}'.format\
(accuracy,sensitivity,specificity))
return scores
def get_patient_data(ind_train,ind_test,patients):
patients=np.array(patients)
patient_train=patients[ind_train]
patient_test=patients[ind_test]
X_train=[]
X_test=[]
X_train1=[]
X_test1=[]
X_train2=[]
X_test2=[]
X_train3=[]
X_test3=[]
y_train=[]
y_test=[]
for patient in patient_train:
patient_keys=[key for key in data_dict.keys() if patient in key]
for key in patient_keys:
segments,label_bin=data_dict[key]
X_train1=X_train1+list(np.reshape(segments[:,0,:],[segments.shape[0],segments.shape[2],1]))
X_train2=X_train2+list(np.reshape(segments[:,1,:],[segments.shape[0],segments.shape[2],1]))
X_train3=X_train3+list(np.reshape(segments[:,2,:],[segments.shape[0],segments.shape[2],1]))
y_train=y_train+list(np.tile(label_bin,[segments.shape[0],1]))
X_train1=np.array(X_train1)
X_train2=np.array(X_train2)
X_train3=np.array(X_train3)
y_train=np.array(y_train)
for patient in patient_test:
patient_keys=[key for key in data_dict.keys() if patient in key]
for key in patient_keys:
segments,label_bin=data_dict[key]
X_test1=X_test1+list(np.reshape(segments[:,0,:],[segments.shape[0],segments.shape[2],1]))
X_test2=X_test2+list(np.reshape(segments[:,1,:],[segments.shape[0],segments.shape[2],1]))
X_test3=X_test3+list(np.reshape(segments[:,2,:],[segments.shape[0],segments.shape[2],1]))
y_test=y_test+list(np.tile(label_bin,[segments.shape[0],1]))
X_test1=np.array(X_test1)
X_test2=np.array(X_test2)
X_test3=np.array(X_test3)
y_test=np.array(y_test)
X_train=[X_train1,X_train2,X_train3]
X_test=[X_test1,X_test2,X_test3]
return X_train,y_train,X_test,y_test
In [ ]:
data_dict=pickle.load(open(os.path.join('..','data','imi_hc_64Hz_3_lead.bin'),'rb'))
In [ ]:
patients=np.array(list(set([ key.split('/')[-2] for key in list(data_dict.keys())])))
kfold_patient= KFold(n_splits=len(patients),shuffle=False)
In [ ]:
def conv_bn(filters,kernel_size,input_layer):
x=Convolution1D(filters=filters,kernel_size=kernel_size,padding='same',
kernel_regularizer=None)(input_layer)
x=BatchNormalization()(x)
x=Activation('relu')(x)
x=MaxPool1D(pool_size=2)(x)
return x
def inception_block(input_layer):
conv3=conv_bn(4,3,input_layer)
conv5=conv_bn(4,5,input_layer)
conv7=conv_bn(4,7,input_layer)
conv9=conv_bn(4,9,input_layer)
conv16=conv_bn(4,16,input_layer)
conv32=conv_bn(4,32,input_layer)
conv64=conv_bn(4,64,input_layer)
return concatenate([conv3,conv5,conv7,conv9,conv16,conv32,conv64])
def get_model(input_shape):
input_layer1= Input(shape=input_shape)
block1_ch1=inception_block(input_layer1)
input_layer2= Input(shape=input_shape)
block1_ch2=inception_block(input_layer2)
input_layer3= Input(shape=input_shape)
block1_ch3=inception_block(input_layer3)
x=concatenate([block1_ch1,block1_ch2,block1_ch3])
x=GlobalAvgPool1D()(x)
output_layer=Dense(2,activation='softmax',kernel_regularizer=L1L2(l1=0.0,l2=0.001))(x)
model_paper=Model(inputs=[input_layer1,input_layer2,input_layer3],outputs=output_layer)
model_paper.compile(loss='categorical_crossentropy',metrics=['accuracy'],optimizer='adam')
return model_paper
model=get_model([196,1])
model.summary()
In [ ]:
cvscores=[]
i=0
for ind_train,ind_test in kfold_patient.split(X=patients,y=[0]*len(patients)):
i=i+1
print('fold: {}/{}'.format(i,kfold_patient.n_splits))
X_train,y_train,X_test,y_test=get_patient_data(ind_train,ind_test,patients)
model_paper=get_model(X_train[0].shape[1:])
K.set_value(model_paper.optimizer.lr,1e-3)
model_paper.fit(
x=X_train,
y=y_train,
batch_size=32,
epochs=200,
shuffle=True,
validation_data=(X_test,y_test),
verbose=1,
callbacks=[
EarlyStopping(monitor='loss',min_delta=0.0,patience=10,verbose=1),
ReduceLROnPlateau(min_lr=1e-5,factor=.1,monitor='loss',epsilon=0.0001,patience=5,verbose=1,),
]
)
prob = model_paper.predict(X_test)
scores=calculate_scores(prob,y_test)
cvscores.append(scores)
clear_session()
In [ ]:
sensitivity=[]
specificity=[]
accuracy=[]
for score in cvscores:
if score['sensitivity']!='nan':
sensitivity.append(score['sensitivity'])
if score['specificity']!='nan':
specificity.append(score['specificity'])
accuracy.append(score['accuracy'])
np.mean(np.array(accuracy)),np.mean(np.array(sensitivity)),np.mean(np.array(specificity))