Distributed Deep Learning with Apache Spark and Keras

Joeri Hermans (Technical Student, IT-DB-SAS, CERN)
Departement of Knowledge Engineering
Maastricht University, The Netherlands


In [1]:
!(date +%d\ %B\ %G)


06 April 2017

This presentation will give the reader an introduction to the topic of distributed deep learning (DDL) and to the issues which need to be taken into consideration when applying this technique. We will also introduce a DDL framework based on a fast and general engine for large-scale data processing called Apache Spark and the neural network library Keras.

The project was initially initiated by the CMS experiment. CMS is exploring the possibility to use a deep learning model for the high level trigger in order to be able to handle the data rates for LHC run 3 and up. Furthermore, they would like to be able to train their models faster using distributed algorithms which allows them to tune their models with an increased frequency. An other requirement was those models should be trained on their complete dataset, which is in the order of a TB. At this point, production use-cases for ATLAS are also being evaluated. These focus more on the serving of models to classify instances.

Distributed Deep Learning, an introduction.

Unsupervised feature learning and deep learning has shown that being able to train large models can dramatically improve performance. However, consider the problem of training a deep network with billions of parameters. How do we achieve this without waiting for days, or even weeks, and thus leaving more time to tune the model? Dean et al. [1] proposed an training paradigm which allows us to train a model on multiple physical machines. The authors describe two methods to achieve this, i.e., data parallelism and model parallelism1.

Model parallelism2

In model parallelism a single model is distributed over multiple machines. The performance benefits of distributing a deep network across multiple machines depends mainly on the structure and of the model. Models with a large number of parameters typically benefit from access to more CPUs and memory, up to the point where communication costs, i.e., propagation of the weight updates and synchronization mechanisms, dominate [1].

Data parallelism

As stated in the introduction, in order to train a large network in a reasonable amount of time, we need to parallize the optimization process (which is the learning of the model). In this settings, we take several model replicas, and distribute them over multiple machines. Of course, it would also be possible to combine this with the model parallelism approach. However, for the sake of simplicity, let us assume that a model (or several models) can be contained on a single machine. In order to parallelize the training, and to improve the usage of the resources of the cluster, we distribute the models over several machines.

In order to build a distributed learning scheme using data parallelism, you would in the most simple case need at least 1 parameter server. A parameter server is basically a thread (or a collection of threads) which aggregate the incoming gradient updates of the workers into a so-called center variable, which acts as a global consensus variable. Finally, the weights which are stored in the center variable will eventually be used by the produced model.

There are two general approaches towards solving data parallelism. The most straightforward is a synchronous method. In short, a synchronous data parallel method will wait for all workers to finish the current mini-batch or stochastic sample before continuing to the next iteration. Synchronous methods have the advantage that all workers will use the most recent center variable, i.e., a worker knows that all other workers will use the same center variable. However, the main disadvantage of this method is the synchronization itself. A synchronous method will never be truly synchronous. This is due to the many, and possibly different, machines. Furthermore, every machine could have a different workload, which would influence the training speed of a worker. As a result, synchronous methods need additional waiting mechanisms to synchronize all workers. These locking mechanisms will make sure that that all workers compute the next gradient based on the same center variable. However, locking mechanisms induce a significant wait which will significantly influence the training speed. For example, imagine a cluster node with an unusual high load. This high load will, due to CPU sharing, cause the training procedure to slow down. Which in turn, will cause the other workers to wait for this single node. Of course, this is just a simple and possibly extreme example, but this example shows how a single worker could significantly influence the training time of all workers.

A very simple, but radical "solution" for this synchronization problem is to not synchronize the workers :) Workers simply fetch the center variable and update the parameter server with the computed gradient whenever a worker is ready. This approach is called an asynchronous data parallel method. Asynchronous methods, compared to synchronous methods, will have a different set of problems. One of these is a so-called stale gradient. This a gradient based on an older version of the center variable while the current center variable is a center variable which has already been updated by other workers. One approach to solve this is to induce and exponential decay factor to the gradient updates. However, this would waste computational resources, but of course, one could just get the most recent weights from the parameter server and then start again. However, as we will show later, it is actually stale gradients (result of asynchrony) that induce implicit momentum to the learning process [2].

At this point you probably ask the question: why does this actually work? A lot of people suggest this is due to the sparsity of the gradients. Intuitively, image having multiple workers processing different data (since every worker has its own dat partition), chances are the weights updates will be totally dissimilar since we are training a large network with a lot of tunable parameters. Furthermore, techniques such as dropout (if they are applied differently among the replicas) only increase the sparsity updates3.

Formalization

We would also like to inform the reader that the general problem to be solved is the so-called global consensus optimization problem. A popular approach towards solving this is using the Alternating Direction Method of Multipliers (ADMM) [3] [4]. However, since this is outside the scope of this notebook we will not review this in-depth. But, we would like to note that the Elastic Averaging methods [5] by Zhang et al., which we included in Distributed Keras are based on ADMM.

**1:** Hybrids are possible as well.
**2:** This is mainly used for the computation of the network outputs [[1]](https://papers.nips.cc/paper/4687-large-scale-distributed-deep-networks.pdf).
3: A way to check the sparsity between 2 gradients is to put all the weights in to a 1 dimensional vector, and then compute the cosine similarity.

Distributed Keras

Distributed Keras is a framework which uses Apache Spark and Keras. We chose for Spark because of the distributed environment. This allows us to preprocess the data in a distributed manner, and train our deep learning models on the same architecture, while still having the modeling simplicity of Keras.

Architecture

Our architecture is very similar to the architecture discussed in [1]. However, we employ Apache Spark for data parallel reading and handling larger than memory datasets. The parameter server will always be created in the Spark Driver. This is the program which creates the Spark Context. For example, if the Jupyter installation of this notebook is running on the Spark cluster, then a cluster node will host the parameter server. However, if you run a Python script, which connects to a remote Spark cluster, then your computer will run the Spark Driver, and as a result will run the parameter server. In that case, be sure your network connection is able to handle the load, else your computer will be the bottleneck in the learning process.

Implementation of costum distributed optimizer

In order to implement your own optimizer you need 2 classes. First, define your optimizer using the Trainer interface. We already supplied an AsynchronousDistributedTrainer, and an SynchronousDistributedTrainer. However, if you require an other procedure, please feel free to do so. Finally, you need a worker class. This class must have a train method with the required arguments, as specified by Apache Spark.

Usage

In the following sections, we will give you an example how a complete workflow will look like. This includes setting up a Spark context, reading, preprocessing, and normalizing the data. Finally, we create a relatively simple model (feel free to adjust the parameters) with Keras and optimize it using the different distributed optimizers which are included by default.

Dataset

We are using the ATLAS Higgs dataset constructed for the Kaggle machine learning challenge. This dataset is quite limited, it contains only 250000 instances. 40% of which we will be using as a test set. For future experiments, it would be usefull to integrate well understood datasets such as CIFAR or MNIST to evaluate against other optimizers. However, it would be nice to have a "well understood" HEP (High Energy Physics) dataset for this task :)


In [2]:
import numpy as np

import time

import requests

from keras.optimizers import *
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation

from pyspark import SparkContext
from pyspark import SparkConf

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *


Using TensorFlow backend.

In [3]:
# Modify these variables according to your needs.
application_name = "Distributed Keras Notebook"
using_spark_2 = False
local = False
if local:
    # Tell master to use local resources.
    master = "local[*]"
    num_cores = 3
    num_executors = 1
else:
    # Tell master to use YARN.
    master = "yarn-client"
    num_executors = 6
    num_cores = 2

In [4]:
# This variable is derived from the number of cores and executors, and will be used to assign the number of model trainers.
num_workers = num_executors * num_cores

print("Number of desired executors: " + `num_executors`)
print("Number of desired cores / executor: " + `num_cores`)
print("Total number of workers: " + `num_workers`)


Number of desired executors: 6
Number of desired cores / executor: 2
Total number of workers: 12

In [5]:
import os

# Use the DataBricks CSV reader, this has some nice functionality regarding invalid values.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'

Preparing a Spark Context

In order to read our (big) dataset into our Spark Cluster, we first need a Spark Context. However, since Spark 2.0 there are some changes regarding the initialization of a Spark Context. For example, SQLContext and HiveContext do not have to be initialized separately anymore.


In [6]:
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_cores`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

# Check if the user is running Spark 2.0 +
if using_spark_2:
    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
else:
    # Create the Spark context.
    sc = SparkContext(conf=conf)
    # Add the missing imports
    from pyspark import SQLContext
    sqlContext = SQLContext(sc)

In [8]:
# Check if we are using Spark 2.0
if using_spark_2:
    reader = sc
else:
    reader = sqlContext
# Read the dataset.
raw_dataset = reader.read.format('com.databricks.spark.csv') \
                    .options(header='true', inferSchema='true').load("data/atlas_higgs.csv")

In [9]:
# Double-check the inferred schema, and get fetch a row to show how the dataset looks like.
raw_dataset.printSchema()


root
 |-- EventId: integer (nullable = true)
 |-- DER_mass_MMC: double (nullable = true)
 |-- DER_mass_transverse_met_lep: double (nullable = true)
 |-- DER_mass_vis: double (nullable = true)
 |-- DER_pt_h: double (nullable = true)
 |-- DER_deltaeta_jet_jet: double (nullable = true)
 |-- DER_mass_jet_jet: double (nullable = true)
 |-- DER_prodeta_jet_jet: double (nullable = true)
 |-- DER_deltar_tau_lep: double (nullable = true)
 |-- DER_pt_tot: double (nullable = true)
 |-- DER_sum_pt: double (nullable = true)
 |-- DER_pt_ratio_lep_tau: double (nullable = true)
 |-- DER_met_phi_centrality: double (nullable = true)
 |-- DER_lep_eta_centrality: double (nullable = true)
 |-- PRI_tau_pt: double (nullable = true)
 |-- PRI_tau_eta: double (nullable = true)
 |-- PRI_tau_phi: double (nullable = true)
 |-- PRI_lep_pt: double (nullable = true)
 |-- PRI_lep_eta: double (nullable = true)
 |-- PRI_lep_phi: double (nullable = true)
 |-- PRI_met: double (nullable = true)
 |-- PRI_met_phi: double (nullable = true)
 |-- PRI_met_sumet: double (nullable = true)
 |-- PRI_jet_num: integer (nullable = true)
 |-- PRI_jet_leading_pt: double (nullable = true)
 |-- PRI_jet_leading_eta: double (nullable = true)
 |-- PRI_jet_leading_phi: double (nullable = true)
 |-- PRI_jet_subleading_pt: double (nullable = true)
 |-- PRI_jet_subleading_eta: double (nullable = true)
 |-- PRI_jet_subleading_phi: double (nullable = true)
 |-- PRI_jet_all_pt: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Label: string (nullable = true)

Dataset preprocessing and normalization

Since Spark's MLlib has some nice features for distributed preprocessing, we made sure we comply to the DataFrame API in order to ensure compatibility. What it basically boils down to, is that all the features (which can have different type) will be aggregated into a single column. More information on Spark MLlib (and other APIs) can be found here: http://spark.apache.org/docs/latest/ml-guide.html

In the following steps we will show you how to extract the desired columns from the dataset and prepare the for further processing.


In [10]:
# First, we would like to extract the desired features from the raw dataset.
# We do this by constructing a list with all desired columns.
features = raw_dataset.columns
features.remove('EventId')
features.remove('Weight')
features.remove('Label')
# Next, we use Spark's VectorAssembler to "assemble" (create) a vector of all desired features.
# http://spark.apache.org/docs/latest/ml-features.html#vectorassembler
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
# This transformer will take all columns specified in features, and create an additional column "features" which will contain all the desired features aggregated into a single vector.
dataset = vector_assembler.transform(raw_dataset)

# Show what happened after applying the vector assembler.
# Note: "features" column got appended to the end.
dataset.select("features").take(1)


Out[10]:
[Row(features=DenseVector([138.47, 51.655, 97.827, 27.98, 0.91, 124.711, 2.666, 3.064, 41.928, 197.76, 1.582, 1.396, 0.2, 32.638, 1.017, 0.381, 51.626, 2.273, -2.414, 16.824, -0.277, 258.733, 2.0, 67.435, 2.15, 0.444, 46.062, 1.24, -2.475, 113.497]))]

In [11]:
# Apply feature normalization with standard scaling. This will transform a feature to have mean 0, and std 1.
# http://spark.apache.org/docs/latest/ml-features.html#standardscaler
standard_scaler = StandardScaler(inputCol="features", outputCol="features_normalized", withStd=True, withMean=True)
standard_scaler_model = standard_scaler.fit(dataset)
dataset = standard_scaler_model.transform(dataset)

In [12]:
# If we look at the dataset, the Label column consists of 2 entries, i.e., b (background), and s (signal).
# Our neural network will not be able to handle these characters, so instead, we convert it to an index so we can indicate that output neuron with index 0 is background, and 1 is signal.
# http://spark.apache.org/docs/latest/ml-features.html#stringindexer
label_indexer = StringIndexer(inputCol="Label", outputCol="label_index").fit(dataset)
dataset = label_indexer.transform(dataset)

# Show the result of the label transformation.
dataset.select("Label", "label_index").take(5)


Out[12]:
[Row(Label=u's', label_index=1.0),
 Row(Label=u'b', label_index=0.0),
 Row(Label=u'b', label_index=0.0),
 Row(Label=u'b', label_index=0.0),
 Row(Label=u'b', label_index=0.0)]

In [13]:
# Define some properties of the neural network for later use.
nb_classes = 2 # Number of output classes (signal and background)
nb_features = len(features)

In [ ]:
# We observe that Keras is not able to work with these indexes.
# What it actually expects is a vector with an identical size to the output layer.
# Our framework provides functionality to do this with ease.
# What it basically does, given an expected vector dimension, 
# it prepares zero vector with the specified dimensionality, and will set the neuron
# with a specific label index to one. (One-Hot encoding)

# For example:
# 1. Assume we have a label index: 3
# 2. Output dimensionality: 5
# With these parameters, we obtain the following vector in the DataFrame column: [0,0,0,1,0]

transformer = OneHotTransformer(output_dim=nb_classes, input_col="label_index", output_col="label")
dataset = transformer.transform(dataset)
# Only select the columns we need (less data shuffling) while training.
dataset = dataset.select("features_normalized", "label_index", "label")

# Show the expected output vectors of the neural network.
dataset.select("label_index", "label").take(1)

Warning: shuffling on a large dataset will take some time.

We recommend users to first preprocess and shuffle their data, as is described in the data preprocessing notebook.


In [ ]:
# Shuffle the dataset.
dataset = shuffle(dataset)

# Note: we also support shuffling in the trainers by default.
# However, since this would require a shuffle for every training we will only do it once here.
# If you want, you can enable the training shuffling by specifying shuffle=True in the train() function.

In [ ]:
# Finally, we create a trainingset and a testset.
(training_set, test_set) = dataset.randomSplit([0.6, 0.4])
training_set.cache()
test_set.cache()

Model construction

We will now construct a relatively simple Keras model (without any modifications) which, hopefully, will be able to classify the dataset.


In [ ]:
model = Sequential()
model.add(Dense(500, input_shape=(nb_features,)))
model.add(Activation('relu'))
model.add(Dropout(0.4))
model.add(Dense(500))
model.add(Activation('relu'))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))

model.summary()
Worker Optimizer and Loss

In order to evaluate the gradient on the model replicas, we have to specify an optimizer and a loss method. For this, we just follow the Keras API as defined in the documentation: https://keras.io/optimizers/ and https://keras.io/objectives/.


In [ ]:
optimizer = 'adagrad'
loss = 'categorical_crossentropy'

Training

In the following cells we will train and evaluate the model using different distributed trainers, however, we will as well provide a baseline metric using a SingleTrainer, which is basically an instance of the Adagrad optimizer running on Spark.

Furthermore, we will also evaluate every training using Spark's MulticlassClassificationEvaluator https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.MulticlassClassificationEvaluator.

Evaluation

We will evaluate all algorithms using the F1 https://en.wikipedia.org/wiki/F1_score and accuracy metric.


In [ ]:
def evaluate_accuracy(model):
    global test_set
    
    # Allocate a Distributed Keras Accuracy evaluator.
    evaluator = AccuracyEvaluator(prediction_col="prediction_index", label_col="label_index")
    # Clear the prediction column from the testset.
    test_set = test_set.select("features_normalized", "label_index", "label")
    # Apply a prediction from a trained model.
    predictor = ModelPredictor(keras_model=trained_model, features_col="features_normalized")
    test_set = predictor.predict(test_set)
    # Allocate an index transformer.
    index_transformer = LabelIndexTransformer(output_dim=nb_classes)
    # Transform the prediction vector to an indexed label.
    test_set = index_transformer.transform(test_set)
    # Fetch the score.
    score = evaluator.evaluate(test_set)
    
    return score

In [ ]:
def add_result(trainer, accuracy, dt):
    global results;
    
    # Store the metrics.
    results[trainer] = {}
    results[trainer]['accuracy'] = accuracy;
    results[trainer]['time_spent'] = dt
    # Display the metrics.
    print("Trainer: " + str(trainer))
    print(" - Accuracy: " + str(accuracy))
    print(" - Training time: " + str(dt))

But first, we will allocate a simple datastructure which will hold the results.


In [ ]:
results = {}
SingleTrainer

A SingleTrainer is used as a benchmarking trainer to compare to distributed trainer. However, one could also use this trainer if the dataset is too big to fit in memory.


In [ ]:
trainer = SingleTrainer(keras_model=model, worker_optimizer=optimizer,
                        loss=loss, features_col="features_normalized",
                        label_col="label", num_epoch=1, batch_size=32)
trained_model = trainer.train(training_set)

In [ ]:
# Fetch the evaluation metrics.
accuracy = evaluate_accuracy(trained_model)
dt = trainer.get_training_time()
# Add the metrics to the results.
add_result('single', accuracy, dt)
Asynchronous EASGD

EASGD based methods, proposed by Zhang et al., transmit the complete parametrization instead of the gradient. These methods will then "average" the difference of the center variable and the backpropagated worker variable. This is used to compute a new master variable, on which the worker nodes will base their backpropagation in the next iteration on.

Asynchronous EASGD will do this in an asynchronous fashion, meaning, whenever a worker node is done processing its mini-batch after a certain amount of iterations (communication window), then the computed parameter will be communicated with the parameter server, which will update the center (master) variable immediately without waiting for other workers.


In [ ]:
trainer = AEASGD(keras_model=model, worker_optimizer=optimizer, loss=loss, num_workers=num_workers, 
                 batch_size=32, features_col="features_normalized", label_col="label", num_epoch=1,
                 communication_window=32, rho=5.0, learning_rate=0.1)
trainer.set_parallelism_factor(1)
trained_model = trainer.train(training_set)

In [ ]:
# Fetch the evaluation metrics.
accuracy = evaluate_accuracy(trained_model)
dt = trainer.get_training_time()
# Add the metrics to the results.
add_result('aeasgd', accuracy, dt)
Asynchronous EAMSGD

The only difference between asynchronous EAMSGD and asynchronous EASGD is the possibility of specifying an explicit momentum term.


In [ ]:
trainer = EAMSGD(keras_model=model, worker_optimizer=optimizer, loss=loss, num_workers=num_workers,
                 batch_size=32, features_col="features_normalized", label_col="label", num_epoch=1,
                 communication_window=32, rho=5.0, learning_rate=0.1, momentum=0.6)
trainer.set_parallelism_factor(1)
trained_model = trainer.train(training_set)

In [ ]:
# Fetch the evaluation metrics.
accuracy = evaluate_accuracy(trained_model)
dt = trainer.get_training_time()
# Add the metrics to the results.
add_result('eamsgd', accuracy, dt)
DOWNPOUR SGD

In [ ]:
trainer = DOWNPOUR(keras_model=model, worker_optimizer=optimizer, loss=loss, num_workers=num_workers,
                   batch_size=32, communication_window=5, learning_rate=0.05, num_epoch=1,
                   features_col="features_normalized", label_col="label")
trainer.set_parallelism_factor(1)
trained_model = trainer.train(training_set)

In [ ]:
# Fetch the evaluation metrics.
accuracy = evaluate_accuracy(trained_model)
dt = trainer.get_training_time()
# Add the metrics to the results.
add_result('downpour', accuracy, dt)

Experimental observations

  • DOWNPOUR converges well when a small communication window is used $< 5$.
  • EASGD based methods on the other hand, thrive using large communication windows $> 25$.
  • Asynchronous methods induce implicit momentum.

Summary

Distributed Deep Learning can significantly speedup the learning process. We provide such a framework built on top of Keras and Apache Spark. The latter provides a nice framework for distributed data processing and model evaluation. We can easily integrate our workflows using Apache Spark, and thus speeding up the data preprocessing, and our model optimization procedure while still having the same modelling simplicity.

Our group is always open on further collaboration on this work, and would like to assist the physics community in their machine learning efforts.

Contact: joeri.hermans@cern.ch luca.canali@cern.ch zbigniew.baranowski@cern.ch

Future work

  • Understanding "theoretical" meaning of a communication window.
  • Apply compression for big weight updates when sending updates to the parameter server.
  • Keep track of a gradient residual (this will reduce the bandwidth due to sparsity).
  • Evaluation of algorithms with GPU's.
  • Optimization of parameter sharing (e.g., sockets instead of REST API).
  • Use "famous" ConvNet architectures with well known datasets in order to have a more sound evaluation.
  • Add threaded queue to process asynchronous updates.
  • Training accuracy while training.
  • Stop on target loss.

Acknowledgments

Many thanks to Zbigniew Baranowski and Luca Canali of the IT-DB group, and to Jean-Roch Vlimant, Maurizio Pierini, and Federico Presutti of the EP-UCM group for their collaboration on this work.

GitHub repository

https://github.com/JoeriHermans/dist-keras/