In [1]:
from com.yahoo.ml.caffe.DisplayUtils import *
In [2]:
df = sqlCtx.read.parquet('mnist_test_dataframe')
In [3]:
show_df(df)
Out[3]:
In [4]:
show_network('lenet_dataframe_train_test.prototxt','LR')
Out[4]:
In [5]:
from com.yahoo.ml.caffe.CaffeOnSpark import *
from com.yahoo.ml.caffe.Config import *
from com.yahoo.ml.caffe.DataSource import *
from pyspark.sql import DataFrame
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
In [6]:
cos=CaffeOnSpark(sc,sqlContext)
In [7]:
args={}
args['conf']='lenet_dataframe_solver.prototxt'
args['model']='file:///tmp/lenet.model'
args['label']='label'
args['output']='file:outputlenet'
args['devices']='1'
args['outputFormat']='json'
args['train']='True'
In [ ]:
In [8]:
cfg=Config(sc,args)
dl_train_source = DataSource(sc).getSource(cfg,True)
In [ ]:
cos.train(dl_train_source)
In [ ]:
cfg=Config(sc)
cfg.protoFile='lenet_dataframe_solver.prototxt'
cfg.modelPath = 'file:/tmp/lenet.model'
cfg.label = 'label'
cfg.outputPath = 'file:outputlenet'
cfg.devices = 1
cfg.outputFormat = 'json'
cfg.isTest = True
cfg.clusterSize = 1
In [8]:
dl_test_source = DataSource(sc).getSource(cfg,False)
In [9]:
test_result=cos.test(dl_test_source)
In [10]:
test_result
Out[10]:
In [11]:
args['conf']='lenet_dataframe_solver.prototxt'
args['model']='file:/tmp/lenet.model'
args['features']='accuracy,ip2'
args['label']='label'
args['output']='file:outputlenet'
args['devices']='1'
args['outputFormat']='json'
In [12]:
cfg=Config(sc,args)
dl_features_source = DataSource(sc).getSource(cfg,False)
In [13]:
f=cos.features(dl_features_source)
In [14]:
f.show(10)
In [15]:
f.take(1)
Out[15]:
In [16]:
def maxScoreAndIndex(array_of_scores):
return max(enumerate(array_of_scores), key=lambda x: x[1])
In [17]:
g = sqlContext.createDataFrame(f.map(lambda row: (
row.SampleID,
row.accuracy[0],
row.ip2,
maxScoreAndIndex(row.ip2)[1],
maxScoreAndIndex(row.ip2)[0],
int(row.label[0]))), ["SampleID", "Accuracy", "Scores", "MaxScore", "Prediction", "Label"])
In [18]:
g.toPandas()[:10]
Out[18]:
In [ ]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
In [ ]:
data = f.map(lambda row: LabeledPoint(row.label[0], Vectors.dense(row.ip1)))
In [ ]:
lr = LogisticRegressionWithLBFGS.train(data, numClasses=10, iterations=10)
In [ ]:
predictions = lr.predict(data.map(lambda pt : pt.features))
In [ ]:
predictions.take(5)