MNIST Preprocessing

Joeri Hermans (Technical Student, IT-DB-SAS, CERN)
Departement of Knowledge Engineering
Maastricht University, The Netherlands


In [1]:
!(date +%d\ %B\ %G)


07 February 2017

Preparation

To get started, we first load all the required imports. Please make sure you installed dist-keras, and seaborn. Furthermore, we assume that you have access to an installation which provides Apache Spark.

Before you start this notebook, place the MNIST dataset (which is provided in a zip in examples/data within this repository) on HDFS. Or in the case HDFS is not available, place it on the local filesystem. But make sure the path to the file is identical for all computing nodes.


In [2]:
%matplotlib inline

import numpy as np

import seaborn as sns

import time

from pyspark import SparkContext
from pyspark import SparkConf

from matplotlib import pyplot as plt

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer

from distkeras.transformers import *
from distkeras.utils import *


Using TensorFlow backend.

In the following cell, adapt the parameters to fit your personal requirements.


In [3]:
# Modify these variables according to your needs.
application_name = "MNIST Preprocessing"
using_spark_2 = False
local = False
path_train = "data/mnist_train.csv"
path_test = "data/mnist_test.csv"
if local:
    # Tell master to use local resources.
    master = "local[*]"
    num_processes = 3
    num_executors = 1
else:
    # Tell master to use YARN.
    master = "yarn-client"
    num_executors = 20
    num_processes = 1

In [4]:
# This variable is derived from the number of cores and executors, and will be used to assign the number of model trainers.
num_workers = num_executors * num_processes

print("Number of desired executors: " + `num_executors`)
print("Number of desired processes / executor: " + `num_processes`)
print("Total number of workers: " + `num_workers`)


Number of desired executors: 20
Number of desired processes / executor: 1
Total number of workers: 20

In [5]:
import os

# Use the DataBricks CSV reader, this has some nice functionality regarding invalid values.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'

In [6]:
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_processes`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.executor.memory", "20g")
conf.set("spark.yarn.executor.memoryOverhead", "2")
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

# Check if the user is running Spark 2.0 +
if using_spark_2:
    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
else:
    # Create the Spark context.
    sc = SparkContext(conf=conf)
    # Add the missing imports
    from pyspark import SQLContext
    sqlContext = SQLContext(sc)

In [7]:
# Record time of starting point.
time_start = time.time()

In [8]:
# Check if we are using Spark 2.0
if using_spark_2:
    reader = sc
else:
    reader = sqlContext
# Read the training set.
raw_dataset_train = reader.read.format('com.databricks.spark.csv') \
                          .options(header='true', inferSchema='true') \
                          .load(path_train)
# Read the test set.
raw_dataset_test = reader.read.format('com.databricks.spark.csv') \
                         .options(header='true', inferSchema='true') \
                         .load(path_test)
# Repartition the datasets.
raw_dataset_train = raw_dataset_train.repartition(num_workers)
raw_dataset_test = raw_dataset_test.repartition(num_workers)

As shown in the output of the cell above, we see that every pixel is associated with a seperate column. In order to ensure compatibility with Apache Spark, we vectorize the columns, and add the resulting vectors as a seperate column. However, in order to achieve this, we first need a list of the required columns. This is shown in the cell below.


In [9]:
# First, we would like to extract the desired features from the raw dataset.
# We do this by constructing a list with all desired columns.
features = raw_dataset_train.columns
features.remove('label')

Once we have a list of columns names, we can pass this to Spark's VectorAssembler. This VectorAssembler will take a list of features, vectorize them, and place them in a column defined in outputCol.


In [10]:
# Next, we use Spark's VectorAssembler to "assemble" (create) a vector of all desired features.
# http://spark.apache.org/docs/latest/ml-features.html#vectorassembler
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
# This transformer will take all columns specified in features, and create an additional column "features" which will contain all the desired features aggregated into a single vector.
training_set = vector_assembler.transform(raw_dataset_train)
test_set = vector_assembler.transform(raw_dataset_test)

Once we have the inputs for our Neural Network (features column) after applying the VectorAssembler, we should also define the outputs. Since we are dealing with a classification task, the output of our Neural Network should be a one-hot encoded vector with 10 elements. For this, we provide a OneHotTransformer which accomplish this exact task.


In [11]:
# Define the number of output classes.
nb_classes = 10
encoder = OneHotTransformer(nb_classes, input_col="label", output_col="label_encoded")
training_set = encoder.transform(training_set)
test_set = encoder.transform(test_set)

MNIST

MNIST is a dataset of handwritten digits. Every image is a 28 by 28 pixel grayscale image. This means that every pixel has a value between 0 and 255. Some examples of instances within this dataset are shown in the cells below.

Normalization

In this Section, we will normalize the feature vectors between the 0 and 1 range.


In [12]:
# Clear the datasets in the case you ran this cell before.
training_set = training_set.select("features", "label", "label_encoded")
test_set = test_set.select("features", "label", "label_encoded")
# Allocate a MinMaxTransformer using Distributed Keras.
# o_min -> original_minimum
# n_min -> new_minimum
transformer = MinMaxTransformer(n_min=0.0, n_max=1.0, \
                                o_min=0.0, o_max=250.0, \
                                input_col="features", \
                                output_col="features_normalized")
# Transform the datasets.
training_set = transformer.transform(training_set)
test_set = transformer.transform(test_set)

Convolutions

In order to make the dense vectors compatible with convolution operations in Keras, we add another column which contains the matrix form of these images. We provide a utility class (MatrixTransformer), which helps you with this.


In [13]:
reshape_transformer = ReshapeTransformer("features_normalized", "matrix", (28, 28, 1))
training_set = reshape_transformer.transform(training_set)
test_set = reshape_transformer.transform(test_set)

Dense Transformation

At the moment, dist-keras does not support SparseVectors due to the numpy dependency. As a result, we have to convert the SparseVector to a DenseVector. We added a simple utility transformer which does this for you.


In [14]:
dense_transformer = DenseTransformer(input_col="features_normalized", output_col="features_normalized_dense")
training_set = dense_transformer.transform(training_set)
test_set = dense_transformer.transform(test_set)

Artificial Enlargement

We want to make the dataset 100 times larger to simulate larger datasets, and to evaluate optimizer performance.


In [ ]:
df = training_set
expansion = 10
for i in range(0, expansion):
    df = df.unionAll(training_set)
training_set = df
training_set.cache()


Out[ ]:
DataFrame[features: vector, label: bigint, label_encoded: vector, features_normalized: vector, matrix: array<array<array<double>>>, features_normalized_dense: vector]

Writing to HDFS

In order to prevent constant preprocessing, and ensure optimizer performance, we write the data to HDFS in a Parquet format.


In [ ]:
training_set.write.parquet("data/mnist_train.parquet")
test_set.write.parquet("data/mnist_test.parquet")

In [ ]:
# Record end of transformation.
time_end = time.time()

In [ ]:
dt = time_end - time_start
print("Took " + str(dt) + " seconds.")

In [ ]:
!hdfs dfs -rm -r data/mnist_test.parquet
!hdfs dfs -rm -r data/mnist_train.parquet