301 - Ingesting CIFAR Images into Spark DataFrames and Evaluating Pre-Trained CNTK Models


In [ ]:
from mmlspark import CNTKModel, ModelDownloader
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from os.path import abspath

Set some paths.


In [ ]:
cdnURL = "https://mmlspark.azureedge.net/datasets"

# Please note that this is a copy of the CIFAR10 dataset originally found here:
# http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
dataFile = "cifar-10-python.tar.gz"
dataURL = cdnURL + "/CIFAR10/" + dataFile

In [ ]:
modelName = "ConvNet"
modelDir = "wasb:///models/"

In [ ]:
modelName = "ConvNet"
modelDir = "file:" + abspath("models")

Get the model and extract the data.


In [ ]:
import os, tarfile, pickle
import urllib.request

d = ModelDownloader(spark, modelDir)
model = d.downloadByName(modelName)
if not os.path.isfile(dataFile):
    urllib.request.urlretrieve(dataURL, dataFile)
with tarfile.open(dataFile, "r:gz") as f:
    test_dict = pickle.load(f.extractfile("cifar-10-batches-py/test_batch"),
                            encoding="latin1")

Preprocess the images.


In [ ]:
import array
from pyspark.sql.functions import col
from pyspark.sql.types import *

def reshape_image(record):
    image, label, filename = record
    data = [float(x) for x in image.reshape(3,32,32).flatten()]
    return data, label, filename

convert_to_float = udf(lambda x: x, ArrayType(FloatType()))

image_rdd = zip(test_dict["data"], test_dict["labels"], test_dict["filenames"])
image_rdd = spark.sparkContext.parallelize(image_rdd).map(reshape_image)

imagesWithLabels = image_rdd.toDF(["images", "labels", "filename"])
imagesWithLabels = imagesWithLabels.withColumn("images", convert_to_float(col("images")))
imagesWithLabels.printSchema()

imagesWithLabels.cache()

Evaluate CNTK model.


In [ ]:
import time
start = time.time()

# Use CNTK model to get log probabilities
cntkModel = CNTKModel().setInputCol("images").setOutputCol("output") \
                       .setModelLocation(spark, model.uri).setOutputNodeName("z")
scoredImages = cntkModel.transform(imagesWithLabels)

# Transform the log probabilities to predictions
def argmax(x): return max(enumerate(x),key=lambda p: p[1])[0]

argmaxUDF = udf(argmax, IntegerType())
imagePredictions = scoredImages.withColumn("predictions", argmaxUDF("output")) \
                               .select("predictions", "labels")

numRows = imagePredictions.count()

end = time.time()
print("classifying {} images took {} seconds".format(numRows,end-start))

# Register the predictions as a temp table for further analysis using SQL
imagePredictions.registerTempTable("ImagePredictions")

Plot confusion matrix.


In [ ]:
%%sql -q -o imagePredictions
select * from ImagePredictions

In [ ]:
%%local
y, y_hat = imagePredictions["labels"], imagePredictions["predictions"]

In [ ]:
imagePredictions = imagePredictions.toPandas()
y, y_hat = imagePredictions["labels"], imagePredictions["predictions"]

In [ ]:
%matplotlib inline

import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(y, y_hat)

labels = ["airplane", "automobile", "bird", "cat", "deer", "dog", "frog",
          "horse", "ship", "truck"]
plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
plt.colorbar()
tick_marks = np.arange(len(labels))
plt.xticks(tick_marks, labels, rotation=90)
plt.yticks(tick_marks, labels)
plt.xlabel("Predicted label")
plt.ylabel("True Label")
plt.show()