In [1]:
!(date +%d\ %B\ %G)
To get started, we first load all the required imports. Please make sure you installed dist-keras, and seaborn. Furthermore, we assume that you have access to an installation which provides Apache Spark.
Before you start this notebook, place the MNIST dataset (which is provided in a zip in examples/data within this repository) on HDFS. Or in the case HDFS is not available, place it on the local filesystem. But make sure the path to the file is identical for all computing nodes.
In [2]:
%matplotlib inline
import numpy as np
import seaborn as sns
import time
from pyspark import SparkContext
from pyspark import SparkConf
from matplotlib import pyplot as plt
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from distkeras.transformers import *
from distkeras.utils import *
In the following cell, adapt the parameters to fit your personal requirements.
In [3]:
# Modify these variables according to your needs.
application_name = "MNIST Preprocessing"
using_spark_2 = False
local = False
path_train = "data/mnist_train.csv"
path_test = "data/mnist_test.csv"
if local:
# Tell master to use local resources.
master = "local[*]"
num_processes = 3
num_executors = 1
else:
# Tell master to use YARN.
master = "yarn-client"
num_executors = 20
num_processes = 1
In [4]:
# This variable is derived from the number of cores and executors, and will be used to assign the number of model trainers.
num_workers = num_executors * num_processes
print("Number of desired executors: " + `num_executors`)
print("Number of desired processes / executor: " + `num_processes`)
print("Total number of workers: " + `num_workers`)
In [5]:
import os
# Use the DataBricks CSV reader, this has some nice functionality regarding invalid values.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'
In [6]:
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_processes`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.executor.memory", "20g")
conf.set("spark.yarn.executor.memoryOverhead", "2")
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
# Check if the user is running Spark 2.0 +
if using_spark_2:
sc = SparkSession.builder.config(conf=conf) \
.appName(application_name) \
.getOrCreate()
else:
# Create the Spark context.
sc = SparkContext(conf=conf)
# Add the missing imports
from pyspark import SQLContext
sqlContext = SQLContext(sc)
In [7]:
# Record time of starting point.
time_start = time.time()
In [8]:
# Check if we are using Spark 2.0
if using_spark_2:
reader = sc
else:
reader = sqlContext
# Read the training set.
raw_dataset_train = reader.read.format('com.databricks.spark.csv') \
.options(header='true', inferSchema='true') \
.load(path_train)
# Read the test set.
raw_dataset_test = reader.read.format('com.databricks.spark.csv') \
.options(header='true', inferSchema='true') \
.load(path_test)
# Repartition the datasets.
raw_dataset_train = raw_dataset_train.repartition(num_workers)
raw_dataset_test = raw_dataset_test.repartition(num_workers)
As shown in the output of the cell above, we see that every pixel is associated with a seperate column. In order to ensure compatibility with Apache Spark, we vectorize the columns, and add the resulting vectors as a seperate column. However, in order to achieve this, we first need a list of the required columns. This is shown in the cell below.
In [9]:
# First, we would like to extract the desired features from the raw dataset.
# We do this by constructing a list with all desired columns.
features = raw_dataset_train.columns
features.remove('label')
Once we have a list of columns names, we can pass this to Spark's VectorAssembler. This VectorAssembler will take a list of features, vectorize them, and place them in a column defined in outputCol.
In [10]:
# Next, we use Spark's VectorAssembler to "assemble" (create) a vector of all desired features.
# http://spark.apache.org/docs/latest/ml-features.html#vectorassembler
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
# This transformer will take all columns specified in features, and create an additional column "features" which will contain all the desired features aggregated into a single vector.
training_set = vector_assembler.transform(raw_dataset_train)
test_set = vector_assembler.transform(raw_dataset_test)
Once we have the inputs for our Neural Network (features column) after applying the VectorAssembler, we should also define the outputs. Since we are dealing with a classification task, the output of our Neural Network should be a one-hot encoded vector with 10 elements. For this, we provide a OneHotTransformer which accomplish this exact task.
In [11]:
# Define the number of output classes.
nb_classes = 10
encoder = OneHotTransformer(nb_classes, input_col="label", output_col="label_encoded")
training_set = encoder.transform(training_set)
test_set = encoder.transform(test_set)
MNIST is a dataset of handwritten digits. Every image is a 28 by 28 pixel grayscale image. This means that every pixel has a value between 0 and 255. Some examples of instances within this dataset are shown in the cells below.
In [12]:
# Clear the datasets in the case you ran this cell before.
training_set = training_set.select("features", "label", "label_encoded")
test_set = test_set.select("features", "label", "label_encoded")
# Allocate a MinMaxTransformer using Distributed Keras.
# o_min -> original_minimum
# n_min -> new_minimum
transformer = MinMaxTransformer(n_min=0.0, n_max=1.0, \
o_min=0.0, o_max=250.0, \
input_col="features", \
output_col="features_normalized")
# Transform the datasets.
training_set = transformer.transform(training_set)
test_set = transformer.transform(test_set)
In [13]:
reshape_transformer = ReshapeTransformer("features_normalized", "matrix", (28, 28, 1))
training_set = reshape_transformer.transform(training_set)
test_set = reshape_transformer.transform(test_set)
In [14]:
dense_transformer = DenseTransformer(input_col="features_normalized", output_col="features_normalized_dense")
training_set = dense_transformer.transform(training_set)
test_set = dense_transformer.transform(test_set)
In [ ]:
df = training_set
expansion = 10
for i in range(0, expansion):
df = df.unionAll(training_set)
training_set = df
training_set.cache()
Out[ ]:
In [ ]:
training_set.write.parquet("data/mnist_train.parquet")
test_set.write.parquet("data/mnist_test.parquet")
In [ ]:
# Record end of transformation.
time_end = time.time()
In [ ]:
dt = time_end - time_start
print("Took " + str(dt) + " seconds.")
In [ ]:
!hdfs dfs -rm -r data/mnist_test.parquet
!hdfs dfs -rm -r data/mnist_train.parquet