Joeri Hermans (Technical Student, IT-DB-SAS, CERN)
Departement of Knowledge Engineering
Maastricht University, The Netherlands
In [1]:
!(date +%d\ %B\ %G)
In this notebook we will inform the reader how to set up a production ready machine learning pipeline using Apache Kafka and Apache Spark, together with our Distributed Deep Learning framework Distributed Keras which is built using Keras.
Note before starting this notebook: Do not forget to run the Kafka producer (as explained in this notebook).
In [2]:
import json
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *
The problem of building an efficient machine learning production pipeline is quite similar to building an efficient training procedure. However, in contrast to the training procedure, in production (model serving) most of this data will arrive in a streaming fashion. Usually, one just reads from a particular source using Spark Streaming. However, intergration with Apache Kafka is also possible. Kafka allows us to scale our streaming application if a bottleneck would occur. At CERN we employ Apache Kafka with different use-cases in IT (IT Group), BE (Beams Group), and ATLAS.
However, building a distributed streaming application has some practical considerations as mentioned in [1]. This includes specifying the retention (i.e., how much time is the data allowed to stay in the buffer, or what is the maximum size of the buffer before discarding older data) of the data in your buffer, usage of compression, number of brokers, partitions, and how to throttle incoming data. Of course, these settings are always application and infrastructure depended. But since this is a general-purpose framework, we will show in the following sections how to build a scalable deep learning production (model serving) pipeline using the technologies mentioned above.
We will assume that you will already have a running Kafka and Spark cluster. Furthermore, in order to run this example, we require that the topic "Machine_Learning" is available on this Kafka cluster.
In order to manage your Python dependencies, it is recommended to install a Python distribution like Anaconda. In the following sections, we assume that Spark is already added to your PATH variable. In order to run our Kafka producer (located in the examples directory). We first need Kafka Python. This is done by simply running Pip in your shell:
pip install kafka-python
In order to run a production classification pipeline you should have access to a trained model. Keras provides an API to load and store trained models. The same procedures can be used with Distributed Keras and Spark to load a pretrained model for production use-cases. However, in this example, we will construct a Neural Network with randomly initialized weights (which will simulate such a pretrained model). The structure of the model (input and output data) will be equivalent to the neural network in the workflow notebook. So if anyone wants to use the distributed training methods described in the workflow notebook to train a model, and afterwards save it to use the trained model in this notebook, you should not experience any problems. Just make sure the model variable is set to your trained Keras model.
As defined in the workflow notebook, our neural network will use 30 features and will be trained to classify two classes (signal and background).
In [3]:
nb_features = 30
nb_classes = 2
As described above, we construct a randomly initialized neural network to simulate a pretrained network.
In [4]:
model = Sequential()
model.add(Dense(500, input_shape=(nb_features,)))
model.add(Activation('relu'))
model.add(Dropout(0.4))
model.add(Dense(1000))
model.add(Activation('relu'))
model.add(Dropout(0.4))
model.add(Dense(500))
model.add(Activation('relu'))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))
model.summary()
In the following cell, please modify the required parameters according to your requirements.
In [5]:
# Modify these variables according to your needs.
application_name = "Distributed Keras Kafka Pipeline"
using_spark_2 = False
local = False
if local:
# Tell master to use local resources.
master = "local[*]"
num_cores = 3
num_executors = 1
else:
# Tell master to use YARN.
master = "yarn-client"
num_executors = 8
num_cores = 2
# Define Kafka specific metrics.
zk = "zookeeper_host:2181"; # ZooKeeper address
topic = "Machine_Learning" # Topic name
consumer_name = "dist-keras-consumer" # Consumer identifier
# Define Spark streaming specific parameters.
batch_duriation = 10 # In seconds.
We will allocate a Spark Context (sc) with a Spark Streaming Context (ssc) using the parameters you provided above.
In [6]:
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_cores`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
# Check if the user is running Spark 2.0 +
if using_spark_2:
sc = SparkSession.builder.config(conf=conf) \
.appName(application_name) \
.getOrCreate()
else:
# Create the Spark context.
sc = SparkContext(conf=conf)
# Add the missing imports
from pyspark import SQLContext
sqlContext = SQLContext(sc)
# Allocate the streaming context with a batch duration of 10 seconds.
ssc = StreamingContext(sc, batch_duriation)
Next, we allocate a Kafka Stream using the previously defined parameters. However, the final parameter, which is passed as a dictionary, will tell the consumer group to read from (in this case) 3 different partitions at once.
For additional and more detailed information on Spark's Kafka API, we will refer to their documentation http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html.
In [7]:
# Allocate a Kafka stream.
kafkaStream = KafkaUtils.createStream(ssc, zk, consumer_name, {topic: 3})
In [8]:
def predict(df):
"""This method will add a prediction column to the specified DataFrame using the pretrained model."""
predictor = ModelPredictor(keras_model=model, features_col="features_normalized", output_col="prediction")
return predictor.predict(df)
In [9]:
def post_process(df):
"""
Will add a column to the specified DataFrame by converting the raw
model prediction (which is an array) to a predicted class (identifier by an index).
Since we only have two classes, the output dimension is 2. This will cause the
LabelIndexTransformer to output a 0 or a 1 given the raw neural network classification.
"""
transformer = LabelIndexTransformer(output_dim=2, input_col="prediction", output_col="predicted_index")
return transformer.transform(df)
In [10]:
def prepare_dataframe(df):
"""
Takes the specified dataframe and add two columns:
1. features
Every row will hold a vector of the specified features.
2. features_normalized
Every row will hold a normalized vector of features based
on the features vector created before.
"""
features = df.columns
features.remove('EventId')
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
df = vector_assembler.transform(df)
normalizer = Normalizer(inputCol="features", outputCol="features_normalized", p=2.0)
df = normalizer.transform(df)
return df
In process_instances we will process the incoming RDD's into predictions. Of course, since there is no real goal to this notebook besides demonstration purposes, we just print the number of instances which were classified as "signal" by the pretrained model.
In [11]:
def process_instances(rdd):
# Check if there is new data available.
if not rdd.isEmpty():
df = rdd.toDF() # Convert the RDD to a Spark DataFrame.
df = prepare_dataframe(df) # Create a feature column and normalize the batch.
df = predict(df) # Add the raw Neural Network predictions.
df = post_process(df) # Convert the raw Neural Network predictions to a class (index).
# Extract the instances which are interesting (signal).
df = df.filter(df['predicted_index'] == 0)
# TODO: Do something with your DataFrame (e.g., storing to HDFS).
print(df.count())
In [ ]:
# Fetch the raw instances from the Kafka stream.
raw_instances = kafkaStream.map(lambda x: x[1])
# Convert the raw instances (which are JSON strings) to Spark rows.
instances = raw_instances.map(json_to_dataframe_row)
# Process every RDD in the DStream.
instances.foreachRDD(process_instances)
In [ ]:
ssc.start()
ssc.awaitTermination()
TODO
In this notebook we demonstrated how to construct a high throughput model serving pipeline using Apache Spark, Apache Kafka and Distributed Keras. Furthermore, we also showed that this infrastructure provides an easily scalable approach for production use-cases. However, since Distributed Keras is still being developed, some bugs might still show up. So please notify us when any of these occur on your system.
Contact: joeri.hermans@cern.ch luca.canali@cern.ch zbigniew.baranowski@cern.ch