In [2]:
from com.yahoo.ml.caffe.DisplayUtils import *
from com.yahoo.ml.caffe.CaffeOnSpark import *
from com.yahoo.ml.caffe.Config import *
from com.yahoo.ml.caffe.DataSource import *
In [1]:
df = sqlCtx.read.parquet('/Users/afeng/dev/ml/CaffeOnSpark/data/mnist_test_dataframe')
In [3]:
show_df(df,5)
Out[3]:
In [4]:
cos=CaffeOnSpark(sc)
In [5]:
args={}
args['conf']='/Users/afeng/dev/ml/CaffeOnSpark/data/lenet_dataframe_solver.prototxt'
args['model']='file:///tmp/lenet.model'
args['devices']='1'
args['clusterSize']='1'
cfg=Config(sc,args)
In [6]:
dl_train_source = DataSource(sc).getSource(cfg,True)
dl_validation_source = DataSource(sc).getSource(cfg,False)
In [7]:
cos.train(dl_train_source)
In [8]:
validation_result_df = cos.trainWithValidation(dl_train_source, dl_validation_source)
In [31]:
validation_result_df.count()
Out[31]:
In [39]:
validation_result_df.show(20)
In [12]:
%matplotlib notebook
from matplotlib import pyplot as plt
In [14]:
validation_panda_df = validation_result_df.toPandas()
validation_panda_df.accuracy.apply(lambda x: x[0]).plot()
validation_panda_df.loss.apply(lambda x: x[0]).plot()
plt.grid()
plt.xlabel('Iteration')
plt.legend(('Accuracy', 'Loss'))
Out[14]:
In [15]:
dl_test_source = DataSource(sc).getSource(cfg,False)
In [16]:
test_result=cos.test(dl_test_source)
In [17]:
test_result
Out[17]:
In [18]:
args['features']='accuracy,ip1,ip2'
args['label']='label'
cfg=Config(sc,args)
In [19]:
dl_feature_source = DataSource(sc).getSource(cfg,False)
In [20]:
f=cos.features(dl_feature_source)
In [21]:
f.show(5)
In [22]:
def maxScoreAndIndex(array_of_scores):
return max(enumerate(array_of_scores), key=lambda x: x[1])
g = sqlContext.createDataFrame(f.map(lambda row: (
row.SampleID,
row.accuracy[0],
row.ip2,
maxScoreAndIndex(row.ip2)[1],
maxScoreAndIndex(row.ip2)[0],
int(row.label[0]))),
["SampleID", "Accuracy", "Scores", "MaxScore", "Prediction", "Label"])
g.toPandas()[:5]
Out[22]:
In [23]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
In [24]:
data = f.map(lambda row: LabeledPoint(row.label[0], Vectors.dense(row.ip1)))
In [25]:
lr = LogisticRegressionWithLBFGS.train(data, numClasses=10, iterations=10)
In [26]:
predictions = lr.predict(data.map(lambda pt : pt.features))
In [27]:
predictions.take(5)
Out[27]: