Kafka and Spark High Throughput Deep Learning Production Pipeline

Joeri Hermans (Technical Student, IT-DB-SAS, CERN)
Departement of Knowledge Engineering
Maastricht University, The Netherlands


In [1]:
!(date +%d\ %B\ %G)


15 November 2016

In this notebook we will inform the reader how to set up a production ready machine learning pipeline using Apache Kafka and Apache Spark, together with our Distributed Deep Learning framework Distributed Keras which is built using Keras.

Note before starting this notebook: Do not forget to run the Kafka producer (as explained in this notebook).


In [2]:
import json

from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer

from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *


Using TensorFlow backend.

Problem statement

The problem of building an efficient machine learning production pipeline is quite similar to building an efficient training procedure. However, in contrast to the training procedure, in production (model serving) most of this data will arrive in a streaming fashion. Usually, one just reads from a particular source using Spark Streaming. However, intergration with Apache Kafka is also possible. Kafka allows us to scale our streaming application if a bottleneck would occur. At CERN we employ Apache Kafka with different use-cases in IT (IT Group), BE (Beams Group), and ATLAS.

However, building a distributed streaming application has some practical considerations as mentioned in [1]. This includes specifying the retention (i.e., how much time is the data allowed to stay in the buffer, or what is the maximum size of the buffer before discarding older data) of the data in your buffer, usage of compression, number of brokers, partitions, and how to throttle incoming data. Of course, these settings are always application and infrastructure depended. But since this is a general-purpose framework, we will show in the following sections how to build a scalable deep learning production (model serving) pipeline using the technologies mentioned above.

Preliminaries

Installation and requirements

Cluster requirements

We will assume that you will already have a running Kafka and Spark cluster. Furthermore, in order to run this example, we require that the topic "Machine_Learning" is available on this Kafka cluster.

Kafka Python

In order to manage your Python dependencies, it is recommended to install a Python distribution like Anaconda. In the following sections, we assume that Spark is already added to your PATH variable. In order to run our Kafka producer (located in the examples directory). We first need Kafka Python. This is done by simply running Pip in your shell:

pip install kafka-python

Pretrained model

In order to run a production classification pipeline you should have access to a trained model. Keras provides an API to load and store trained models. The same procedures can be used with Distributed Keras and Spark to load a pretrained model for production use-cases. However, in this example, we will construct a Neural Network with randomly initialized weights (which will simulate such a pretrained model). The structure of the model (input and output data) will be equivalent to the neural network in the workflow notebook. So if anyone wants to use the distributed training methods described in the workflow notebook to train a model, and afterwards save it to use the trained model in this notebook, you should not experience any problems. Just make sure the model variable is set to your trained Keras model.

As defined in the workflow notebook, our neural network will use 30 features and will be trained to classify two classes (signal and background).


In [3]:
nb_features = 30
nb_classes = 2

As described above, we construct a randomly initialized neural network to simulate a pretrained network.


In [4]:
model = Sequential()
model.add(Dense(500, input_shape=(nb_features,)))
model.add(Activation('relu'))
model.add(Dropout(0.4))
model.add(Dense(1000))
model.add(Activation('relu'))
model.add(Dropout(0.4))
model.add(Dense(500))
model.add(Activation('relu'))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))

model.summary()


____________________________________________________________________________________________________
Layer (type)                     Output Shape          Param #     Connected to                     
====================================================================================================
dense_1 (Dense)                  (None, 500)           15500       dense_input_1[0][0]              
____________________________________________________________________________________________________
activation_1 (Activation)        (None, 500)           0           dense_1[0][0]                    
____________________________________________________________________________________________________
dropout_1 (Dropout)              (None, 500)           0           activation_1[0][0]               
____________________________________________________________________________________________________
dense_2 (Dense)                  (None, 1000)          501000      dropout_1[0][0]                  
____________________________________________________________________________________________________
activation_2 (Activation)        (None, 1000)          0           dense_2[0][0]                    
____________________________________________________________________________________________________
dropout_2 (Dropout)              (None, 1000)          0           activation_2[0][0]               
____________________________________________________________________________________________________
dense_3 (Dense)                  (None, 500)           500500      dropout_2[0][0]                  
____________________________________________________________________________________________________
activation_3 (Activation)        (None, 500)           0           dense_3[0][0]                    
____________________________________________________________________________________________________
dense_4 (Dense)                  (None, 2)             1002        activation_3[0][0]               
____________________________________________________________________________________________________
activation_4 (Activation)        (None, 2)             0           dense_4[0][0]                    
====================================================================================================
Total params: 1018002
____________________________________________________________________________________________________

Kafka producer

In order to run the Kafka producer, change the directory to the examples directory. Next, fetch the address of a bootstrap server. Once you have this address, run the following command in a seperate shell to run the Kafka producer:

python kafka_producer.py [bootstrap_server]

Usage

In the following cell, please modify the required parameters according to your requirements.


In [5]:
# Modify these variables according to your needs.
application_name = "Distributed Keras Kafka Pipeline"
using_spark_2 = False
local = False
if local:
    # Tell master to use local resources.
    master = "local[*]"
    num_cores = 3
    num_executors = 1
else:
    # Tell master to use YARN.
    master = "yarn-client"
    num_executors = 8
    num_cores = 2
# Define Kafka specific metrics.
zk = "zookeeper_host:2181";             # ZooKeeper address
topic = "Machine_Learning"              # Topic name
consumer_name = "dist-keras-consumer"   # Consumer identifier
# Define Spark streaming specific parameters.
batch_duriation = 10 # In seconds.

We will allocate a Spark Context (sc) with a Spark Streaming Context (ssc) using the parameters you provided above.


In [6]:
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_cores`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

# Check if the user is running Spark 2.0 +
if using_spark_2:
    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
else:
    # Create the Spark context.
    sc = SparkContext(conf=conf)
    # Add the missing imports
    from pyspark import SQLContext
    sqlContext = SQLContext(sc)
# Allocate the streaming context with a batch duration of 10 seconds.
ssc = StreamingContext(sc, batch_duriation)

Next, we allocate a Kafka Stream using the previously defined parameters. However, the final parameter, which is passed as a dictionary, will tell the consumer group to read from (in this case) 3 different partitions at once.

For additional and more detailed information on Spark's Kafka API, we will refer to their documentation http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html.


In [7]:
# Allocate a Kafka stream.
kafkaStream = KafkaUtils.createStream(ssc, zk, consumer_name, {topic: 3})

In [8]:
def predict(df):
    """This method will add a prediction column to the specified DataFrame using the pretrained model."""
    predictor = ModelPredictor(keras_model=model, features_col="features_normalized", output_col="prediction")
    
    return predictor.predict(df)

In [9]:
def post_process(df):
    """
    Will add a column to the specified DataFrame by converting the raw
    model prediction (which is an array) to a predicted class (identifier by an index).
    Since we only have two classes, the output dimension is 2. This will cause the
    LabelIndexTransformer to output a 0 or a 1 given the raw neural network classification.
    """
    transformer = LabelIndexTransformer(output_dim=2, input_col="prediction", output_col="predicted_index")
    
    return transformer.transform(df)

In [10]:
def prepare_dataframe(df):
    """
    Takes the specified dataframe and add two columns:
    
    1. features
       Every row will hold a vector of the specified features.
    2. features_normalized
       Every row will hold a normalized vector of features based
       on the features vector created before.
    """
    features = df.columns
    features.remove('EventId')
    vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
    df = vector_assembler.transform(df)
    normalizer = Normalizer(inputCol="features", outputCol="features_normalized", p=2.0)
    df = normalizer.transform(df)
    
    return df

In process_instances we will process the incoming RDD's into predictions. Of course, since there is no real goal to this notebook besides demonstration purposes, we just print the number of instances which were classified as "signal" by the pretrained model.


In [11]:
def process_instances(rdd):
    # Check if there is new data available.
    if not rdd.isEmpty():
        df = rdd.toDF()             # Convert the RDD to a Spark DataFrame.
        df = prepare_dataframe(df)  # Create a feature column and normalize the batch.
        df = predict(df)            # Add the raw Neural Network predictions.
        df = post_process(df)       # Convert the raw Neural Network predictions to a class (index).
        # Extract the instances which are interesting (signal).
        df = df.filter(df['predicted_index'] == 0)
        # TODO: Do something with your DataFrame (e.g., storing to HDFS).
        print(df.count())

In [ ]:
# Fetch the raw instances from the Kafka stream.
raw_instances = kafkaStream.map(lambda x: x[1])
# Convert the raw instances (which are JSON strings) to Spark rows.
instances = raw_instances.map(json_to_dataframe_row)
# Process every RDD in the DStream.
instances.foreachRDD(process_instances)

In [ ]:
ssc.start()
ssc.awaitTermination()


33023
46801
45446
48116
22459
45999

Experiments

TODO

Conclusion

In this notebook we demonstrated how to construct a high throughput model serving pipeline using Apache Spark, Apache Kafka and Distributed Keras. Furthermore, we also showed that this infrastructure provides an easily scalable approach for production use-cases. However, since Distributed Keras is still being developed, some bugs might still show up. So please notify us when any of these occur on your system.

Contact: joeri.hermans@cern.ch luca.canali@cern.ch zbigniew.baranowski@cern.ch

Acknowledgements

Many thanks to Zbigniew Baranowski and Luca Canali of the IT-DB group for their collaboration on this work.