CIFAR-10 Preprocessing

Joeri Hermans (Technical Student, IT-DB-SAS, CERN)
Departement of Data Science & Knowledge Engineering
Maastricht University, The Netherlands

In this notebook we download the CIFAR-10 dataset, and prepare it in such a way it can be processed by Spark.


In [1]:
import cPickle as pickle

import csv

import numpy as np

from pyspark import SparkContext
from pyspark import SparkConf

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder

from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *


Using TensorFlow backend.

Downloading and decompressing the dataset


In [2]:
!rm cifar-10-python.tar.gz
!rm -r cifar-10-batches-py
!wget https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz


--2017-01-26 15:42:04--  https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
Resolving www.cs.toronto.edu... 128.100.3.30
Connecting to www.cs.toronto.edu|128.100.3.30|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 170498071 (163M) [application/x-gzip]
Saving to: “cifar-10-python.tar.gz”

100%[======================================>] 170,498,071 4.88M/s   in 33s     

2017-01-26 15:42:40 (4.89 MB/s) - “cifar-10-python.tar.gz” saved [170498071/170498071]


In [3]:
!tar -xvzf cifar-10-python.tar.gz


cifar-10-batches-py/
cifar-10-batches-py/data_batch_4
cifar-10-batches-py/readme.html
cifar-10-batches-py/test_batch
cifar-10-batches-py/data_batch_3
cifar-10-batches-py/batches.meta
cifar-10-batches-py/data_batch_2
cifar-10-batches-py/data_batch_5
cifar-10-batches-py/data_batch_1

Loading the dataset in memory for further processing


In [4]:
# Define the required datastructures.
training_instances = []
training_labels = []

# Iterate through all training batches, and load them in memory.
for i in range(1, 6):
    path = "cifar-10-batches-py/data_batch_" + str(i)
    fd = open(path, "rb")
    d = pickle.load(fd)
    fd.close()
    # Add the training data to our datastructures.
    num_instances = len(d['data'])
    for j in range(0, num_instances):
        training_instances.append(d['data'][j])
        training_labels.append(d['labels'][j])
        
print("Number of training instances: " + str(len(training_instances)))


Number of training instances: 50000

In [5]:
# Define the reuiqred datastructures.
test_instances = []
test_labels = []

# Load the test batch.
path = "cifar-10-batches-py/test_batch"
fd = open(path, "rb")
d = pickle.load(fd)
fd.close()
# Add the testset to our datastructures.
num_instances = len(d['data'])
for j in range(0, num_instances):
    test_instances.append(d['data'][j])
    test_labels.append(d['labels'][j])
    
print("Number of test instances: " + str(len(test_instances)))


Number of test instances: 10000

At this point we have the training and test set in memory. At this point we basically have 2 options to prepare it for Apache Spark. First, we simply "parallelize" the data, and continue from there. However, this requires some additional logic. The second approach is to write it to a file which Spark will be able to read (CSV, Parquet, Avro...). Due to the simplicity of the second approach, we will choose to write the contents of our datastructures in a CSV file.


In [6]:
# First, prepare the column names.
columns = ['label']
# Now, add the pixel column names. Note, first 1024 pixels are red, then green and finally blue.
for c in ['r','g','b']:
    for i in range(0, 1024):
        column_name = "p_" + str(i) + "_" + c
        columns.append(column_name)
        
# Now, we should have 3072 (data) + 1 (label) column names.
print("Number of columns: " + str(len(columns)))


Number of columns: 3073

In [7]:
training_set = []
test_set = []

# Prepare the training set.
for i in range(0, len(training_instances)):
    row = np.insert(training_instances[i], 0, training_labels[i])
    training_set.append(row)

# Prepare the test set.
for i in range(0, len(test_instances)):
    row = np.insert(test_instances[i], 0, test_labels[i])
    test_set.append(row)
    
print("Size training set: " + str(len(training_set)))
print("Size test set: " + str(len(test_set)))


Size training set: 50000
Size test set: 10000

In [8]:
def save(path, columns, dataset):
    with open(path, 'wb') as f:
        w = csv.writer(f)
        # Write the columns.
        w.writerow(columns)
        # Iterate through all instances in the training set.
        n = len(dataset)
        for i in range(0, n):
            w.writerow(dataset[i].tolist())

In [9]:
# Save the datasets to disk.
save("cifar-10-training.csv", columns, training_set)
save("cifar-10-test.csv", columns, test_set)

In [10]:
# Confirming that produced CSV's are present
!ls | grep cifar | grep csv


cifar-10-test.csv
cifar-10-training.csv

In [11]:
# Remove the old training and test set from HDFS.
!hdfs dfs -rm data/cifar-10-training.csv
!hdfs dfs -rm data/cifar-10-test.csv
# Copy the training and test set to HDFS.
!hdfs dfs -copyFromLocal cifar-10-training.csv data/cifar-10-training.csv
!hdfs dfs -copyFromLocal cifar-10-test.csv data/cifar-10-test.csv


Deleted data/cifar-10-training.csv
Deleted data/cifar-10-test.csv

Further distributed preprocessing with Apache Spark

Setting up a Spark Context


In [12]:
# Modify these variables according to your needs.
application_name = "CIFAR-10 Preprocessing Notebook"
using_spark_2 = False
local = False
path_train = "data/cifar-10-training.csv"
path_test = "data/cifar-10-test.csv"
if local:
    # Tell master to use local resources.
    master = "local[*]"
    num_processes = 3
    num_executors = 1
else:
    # Tell master to use YARN.
    master = "yarn-client"
    num_executors = 20
    num_processes = 1
    
num_workers = num_executors * num_processes

In [13]:
import os

# Use the DataBricks CSV reader, this has some nice functionality regarding invalid values.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'

In [14]:
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_processes`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.executor.memory", "4g")
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

# Check if the user is running Spark 2.0 +
if using_spark_2:
    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
else:
    # Create the Spark context.
    sc = SparkContext(conf=conf)
    # Add the missing imports
    from pyspark import SQLContext
    sqlContext = SQLContext(sc)

Reading the raw CSV files


In [15]:
# Check if we are using Spark 2.0
if using_spark_2:
    reader = sc
else:
    reader = sqlContext
# Read the training set.
raw_dataset_train = reader.read.format('com.databricks.spark.csv') \
                          .options(header='true', inferSchema='true') \
                          .load(path_train)
# Read the testing set.
raw_dataset_test = reader.read.format('com.databricks.spark.csv') \
                         .options(header='true', inferSchema='true') \
                         .load(path_test)

In [16]:
# Count the number of instances in the training and test set (to check).
print("Training set size: " + str(raw_dataset_train.count()))
print("Test set size: " + str(raw_dataset_test.count()))


Training set size: 50000
Test set size: 10000

Preparing for further preprocessing, training and testing

In order to ensure compatibility with Apache Spark, we vectorize the columns, and add the resulting vectors as a seperate column. However, in order to achieve this, we first need a list of the required columns. This is shown in the cell below.


In [17]:
features = raw_dataset_train.columns
features.remove('label')

Once we have a list of columns names, we can pass this to Spark's VectorAssembler. This VectorAssembler will take a list of features, vectorize them, and place them in a column defined in outputCol.


In [18]:
# Assemble the columns.
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
dataset_train = vector_assembler.transform(raw_dataset_train)
dataset_test = vector_assembler.transform(raw_dataset_test)
# Repartition the dataset.
dataset_train = dataset_train.repartition(num_workers)
dataset_test = dataset_test.repartition(num_workers)

Once we have the inputs for our Neural Network (features column) after applying the VectorAssembler, we should also define the outputs. Since we are dealing with a classification task, the output of our Neural Network should be a one-hot encoded vector with 10 elements. For this, we provide a OneHotTransformer which accomplish this exact task.


In [19]:
nb_classes = 10
encoder = OneHotTransformer(nb_classes, input_col="label", output_col="label_encoded")
dataset_train = encoder.transform(dataset_train)
dataset_test = encoder.transform(dataset_test)

Finally, normalize the pixel intensities with the range [0, 1].


In [20]:
# Allocate a MinMaxTransformer.
transformer = MinMaxTransformer(n_min=0.0, n_max=1.0, \
                                o_min=0.0, o_max=250.0, \
                                input_col="features", \
                                output_col="features_normalized")
# Transform the datasets.
dataset_train = transformer.transform(dataset_train)
dataset_test = transformer.transform(dataset_test)

Saving the datasets to Parquet.


In [21]:
# Delete the old preprocessed Parquet files.
!hdfs dfs -rm -r data/cifar-10-train-preprocessed.parquet
!hdfs dfs -rm -r data/cifar-10-test-preprocessed.parquet


Deleted data/cifar-10-train-preprocessed.parquet
Deleted data/cifar-10-test-preprocessed.parquet

In [22]:
dataset_train.write.parquet("data/cifar-10-train-preprocessed.parquet")
dataset_test.write.parquet("data/cifar-10-test-preprocessed.parquet")