Where Am I?

Startup.ML Conference - San Francisco - Jan 20, 2017

Who Am I?

Chris Fregly

Research Scientist @ PipelineIO

Video Series Author "High Performance Tensorflow in Production" @ OReilly (Coming Soon)

Founder @ Advanced Spark and Tensorflow Meetup

Github Repo

DockerHub Repo

Slideshare

YouTube

Who Was I?

Software Engineer @ Netflix, Databricks, IBM Spark Tech Center

1. Infrastructure and Tools

Docker

Images, Containers

Useful Docker Image: AWS + GPU + Docker + Tensorflow + Spark

Kubernetes

Container Orchestration Across Clusters

Weavescope

Kubernetes Cluster Visualization

Jupyter Notebooks

What We're Using Here for Everything!

Airflow

Invoke Any Type of Workflow on Any Type of Schedule

Github

Commit New Model to Github, Airflow Workflow Triggered for Continuous Deployment

DockerHub

Maintains Docker Images

Continuous Deployment

Not Just for Code, Also for ML/AI Models!

Canary Release

Deploy and Compare New Model Alongside Existing

Metrics and Dashboards

Not Just System Metrics, ML/AI Model Prediction Metrics

NetflixOSS-based

Prometheus

Grafana

Elasticsearch

Separate Cluster Concerns

Training/Admin Cluster

Prediction Cluster

Hybrid Cloud Deployment for eXtreme High Availability (XHA)

AWS and Google Cloud

Apache Spark

Tensorflow + Tensorflow Serving

2. Model Deployment Bundles

KeyValue

ie. Recommendations

In-memory: Redis, Memcache

On-disk: Cassandra, RocksDB

First-class Servable in Tensorflow Serving

PMML

It's Useful and Well-Supported

Apple, Cisco, Airbnb, HomeAway, etc

Please Don't Re-build It - Reduce Your Technical Debt!

Native Code

Hand-coded (Python + Pickling)

Generate Java Code from PMML?

Tensorflow Model Exports

freeze_graph.py: Combine Tensorflow Graph (Static) with Trained Weights (Checkpoints) into Single Deployable Model

3. Model Deployments and Rollbacks

Mutable

Each New Model is Deployed to Live, Running Container

Immutable

Each New Model is a New Docker Image

4. Optimizing Tensorflow Models for Serving

Python Scripts

optimize_graph_for_inference.py

Pete Warden's Blog

Graph Transform Tool

Compile (Tensorflow 1.0+)

XLA Compiler

Compiles 3 graph operations (input, operation, output) into 1 operation

Removes need for Tensorflow Runtime (20 MB is significant on tiny devices)

Allows new backends for hardware-specific optimizations (better portability)

tfcompile

Convert Graph into executable code

Compress/Distill Ensemble Models

Convert ensembles or other complex models into smaller models

Re-score training data with output of model being distilled

Train smaller model to produce same output

Output of smaller model learns more information than original label

5. Optimizing Serving Runtime Environment

Throughput

Option 1: Add more Tensorflow Serving servers behind load balancer

Option 2: Enable request batching in each Tensorflow Serving

Option Trade-offs: Higher Latency (bad) for Higher Throughput (good)

$TENSORFLOW_SERVING_HOME/bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server 
--port=9000 
--model_name=tensorflow_minimal 
--model_base_path=/root/models/tensorflow_minimal/export
--enable_batching=true
--max_batch_size=1000000
--batch_timeout_micros=10000
--max_enqueued_batches=1000000

Latency

The deeper the model, the longer the latency

Start inference in parallel where possible (ie. user inference in parallel with item inference)

Pre-load common inputs from database (ie. user attributes, item attributes)

Pre-compute/partial-compute common inputs (ie. popular word embeddings)

Memory

Word embeddings are huge!

Use hashId for each word

Off-load embedding matrices to parameter server and share between serving servers

6. Demos!!

Train and Deploy Tensorflow AI Model (Simple Model, Immutable Deploy)

Train Tensorflow AI Model


In [1]:
import numpy as np
import os
import tensorflow as tf
from tensorflow.contrib.session_bundle import exporter
import time

In [2]:
# make things wide
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.display import clear_output, Image, display, HTML

def strip_consts(graph_def, max_const_size=32):
    """Strip large constant values from graph_def."""
    strip_def = tf.GraphDef()
    for n0 in graph_def.node:
        n = strip_def.node.add() 
        n.MergeFrom(n0)
        if n.op == 'Const':
            tensor = n.attr['value'].tensor
            size = len(tensor.tensor_content)
            if size > max_const_size:
                tensor.tensor_content = "<stripped %d bytes>"%size
    return strip_def

def show_graph(graph_def=None, width=1200, height=800, max_const_size=32, ungroup_gradients=False):
    if not graph_def:
        graph_def = tf.get_default_graph().as_graph_def()
        
    """Visualize TensorFlow graph."""
    if hasattr(graph_def, 'as_graph_def'):
        graph_def = graph_def.as_graph_def()
    strip_def = strip_consts(graph_def, max_const_size=max_const_size)
    data = str(strip_def)
    if ungroup_gradients:
        data = data.replace('"gradients/', '"b_')
        #print(data)
    code = """
        <script>
          function load() {{
            document.getElementById("{id}").pbtxt = {data};
          }}
        </script>
        <link rel="import" href="https://tensorboard.appspot.com/tf-graph-basic.build.html" onload=load()>
        <div style="height:600px">
          <tf-graph-basic id="{id}"></tf-graph-basic>
        </div>
    """.format(data=repr(data), id='graph'+str(np.random.rand()))

    iframe = """
        <iframe seamless style="width:{}px;height:{}px;border:0" srcdoc="{}"></iframe>
    """.format(width, height, code.replace('"', '&quot;'))
    display(HTML(iframe))



In [3]:
# If this errors out, increment the `export_version` variable, restart the Kernel, and re-run

flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer("batch_size", 10, "The batch size to train")
flags.DEFINE_integer("epoch_number", 10, "Number of epochs to run trainer")
flags.DEFINE_integer("steps_to_validate", 1,
                     "Steps to validate and print loss")
flags.DEFINE_string("checkpoint_dir", "./checkpoint/",
                    "indicates the checkpoint dirctory")
#flags.DEFINE_string("model_path", "./model/", "The export path of the model")
flags.DEFINE_string("model_path", "/root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/", "The export path of the model")
flags.DEFINE_integer("export_version", 27, "The version number of the model")

In [4]:
# If this errors out, increment the `export_version` variable, restart the Kernel, and re-run

def main():
  # Define training data
  x = np.ones(FLAGS.batch_size)
  y = np.ones(FLAGS.batch_size)

  # Define the model
  X = tf.placeholder(tf.float32, shape=[None], name="X")
  Y = tf.placeholder(tf.float32, shape=[None], name="yhat")
  w = tf.Variable(1.0, name="weight")
  b = tf.Variable(1.0, name="bias")
  loss = tf.square(Y - tf.mul(X, w) - b)
  train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)
  predict_op  = tf.mul(X, w) + b

  saver = tf.train.Saver()
  checkpoint_dir = FLAGS.checkpoint_dir
  checkpoint_file = checkpoint_dir + "/checkpoint.ckpt"
  if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)
    
  # Start the session
  with tf.Session() as sess:
    sess.run(tf.initialize_all_variables())

    ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
    if ckpt and ckpt.model_checkpoint_path:
      print("Continue training from the model {}".format(ckpt.model_checkpoint_path))
      saver.restore(sess, ckpt.model_checkpoint_path)

    saver_def = saver.as_saver_def()
    print(saver_def.filename_tensor_name)
    print(saver_def.restore_op_name)

    # Start training
    start_time = time.time()
    for epoch in range(FLAGS.epoch_number):
      sess.run(train_op, feed_dict={X: x, Y: y})

      # Start validating
      if epoch % FLAGS.steps_to_validate == 0:
        end_time = time.time()
        print("[{}] Epoch: {}".format(end_time - start_time, epoch))

        saver.save(sess, checkpoint_file)
        tf.train.write_graph(sess.graph_def, checkpoint_dir, 'trained_model.pb', as_text=False)
        tf.train.write_graph(sess.graph_def, checkpoint_dir, 'trained_model.txt', as_text=True)

        start_time = end_time

    # Print model variables
    w_value, b_value = sess.run([w, b])
    print("The model of w: {}, b: {}".format(w_value, b_value))

    # Export the model
    print("Exporting trained model to {}".format(FLAGS.model_path))
    model_exporter = exporter.Exporter(saver)
    model_exporter.init(
      sess.graph.as_graph_def(),
      named_graph_signatures={
        'inputs': exporter.generic_signature({"features": X}),
        'outputs': exporter.generic_signature({"prediction": predict_op})
      })
    model_exporter.export(FLAGS.model_path, tf.constant(FLAGS.export_version), sess)
    print('Done exporting!')

if __name__ == "__main__":
  main()


WARNING:tensorflow:From <ipython-input-4-f107a23917ee>:25 in main.: initialize_all_variables (from tensorflow.python.ops.variables) is deprecated and will be removed after 2017-03-02.
Instructions for updating:
Use `tf.global_variables_initializer` instead.
Continue training from the model ./checkpoint/checkpoint.ckpt
save/Const:0
save/restore_all
[0.0028002262115478516] Epoch: 0
[0.1538553237915039] Epoch: 1
[0.11622357368469238] Epoch: 2
[0.1186070442199707] Epoch: 3
[0.1167604923248291] Epoch: 4
[0.17220735549926758] Epoch: 5
[0.17842745780944824] Epoch: 6
[0.11929106712341309] Epoch: 7
[0.14556312561035156] Epoch: 8
[0.20454072952270508] Epoch: 9
The model of w: 0.5000001192092896, b: 0.5000001192092896
Exporting trained model to /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-4-f107a23917ee> in <module>()
     67 
     68 if __name__ == "__main__":
---> 69   main()

<ipython-input-4-f107a23917ee> in main()
     64       })
     65     model_exporter.export(FLAGS.model_path, tf.constant(FLAGS.export_version), sess)
---> 66     print('Done exporting!')
     67 
     68 if __name__ == "__main__":

/opt/conda/lib/python3.5/site-packages/tensorflow/python/client/session.py in __exit__(self, exec_type, exec_value, exec_tb)
   1205       logging.error('Session closing due to OpError: %s', (exec_value,))
   1206     self._default_session_context_manager.__exit__(
-> 1207         exec_type, exec_value, exec_tb)
   1208     self._default_graph_context_manager.__exit__(exec_type, exec_value, exec_tb)
   1209 

/opt/conda/lib/python3.5/contextlib.py in __exit__(self, type, value, traceback)
     75                 value = type()
     76             try:
---> 77                 self.gen.throw(type, value, traceback)
     78                 raise RuntimeError("generator didn't stop after throw()")
     79             except StopIteration as exc:

/opt/conda/lib/python3.5/site-packages/tensorflow/python/framework/ops.py in get_controller(self, default)
   3515     try:
   3516       self.stack.append(default)
-> 3517       yield default
   3518     finally:
   3519       if self._enforce_nesting:

<ipython-input-4-f107a23917ee> in main()
     63         'outputs': exporter.generic_signature({"prediction": predict_op})
     64       })
---> 65     model_exporter.export(FLAGS.model_path, tf.constant(FLAGS.export_version), sess)
     66     print('Done exporting!')
     67 

/opt/conda/lib/python3.5/site-packages/tensorflow/contrib/session_bundle/exporter.py in export(self, export_dir_base, global_step_tensor, sess, exports_to_keep)
    262     if gfile.Exists(export_dir):
    263       raise RuntimeError("Overwriting exports can cause corruption and are "
--> 264                          "not allowed. Duplicate export dir: %s" % export_dir)
    265 
    266     # Output to a temporary directory which is atomically renamed to the final

RuntimeError: Overwriting exports can cause corruption and are not allowed. Duplicate export dir: b'/root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/00000027'

In [ ]:
show_graph()

Commit and Deploy New Tensorflow AI Model

Commit Model to Github


In [5]:
!ls -l /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export


total 48
drwxr-xr-x 2 root root 4096 Jan 19 22:39 00000001
drwxr-xr-x 2 root root 4096 Jan 21 20:09 00000005
drwxr-xr-x 2 root root 4096 Jan 21 21:24 00000007
drwxr-xr-x 2 root root 4096 Jan 21 21:30 00000009
drwxr-xr-x 2 root root 4096 Jan 21 21:35 00000011
drwxr-xr-x 2 root root 4096 Jan 21 21:36 00000013
drwxr-xr-x 2 root root 4096 Jan 21 22:02 00000015
drwxr-xr-x 2 root root 4096 Jan 21 22:18 00000017
drwxr-xr-x 2 root root 4096 Jan 22 00:00 00000019
drwxr-xr-x 2 root root 4096 Jan 22 05:15 00000021
drwxr-xr-x 2 root root 4096 Jan 22 06:49 00000025
drwxr-xr-x 2 root root 4096 Jan 25 20:55 00000027

In [6]:
!ls -l /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/00000027


total 32
-rw-r--r-- 1 root root   241 Jan 25 20:55 checkpoint
-rw-r--r-- 1 root root     8 Jan 25 20:55 export.data-00000-of-00001
-rw-r--r-- 1 root root   142 Jan 25 20:55 export.index
-rw-r--r-- 1 root root 19394 Jan 25 20:55 export.meta

In [7]:
!git status


On branch master
Your branch is up-to-date with 'origin/master'.

Changes to be committed:
  (use "git reset HEAD <file>..." to unstage)

	new file:   ../../../../../prediction.ml/tensorflow/models/tensorflow_minimal/export/00000027/checkpoint
	new file:   ../../../../../prediction.ml/tensorflow/models/tensorflow_minimal/export/00000027/export.data-00000-of-00001
	new file:   ../../../../../prediction.ml/tensorflow/models/tensorflow_minimal/export/00000027/export.index
	new file:   ../../../../../prediction.ml/tensorflow/models/tensorflow_minimal/export/00000027/export.meta

Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git checkout -- <file>..." to discard changes in working directory)

	modified:   SparkMLTensorflowAI-HybridCloud-ContinuousDeployment.ipynb
	modified:   ../../../TensorFlow/DeepDream/deepdream.ipynb
	modified:   ../../../Theano/Model Save and Load.ipynb

Untracked files:
  (use "git add <file>..." to include in what will be committed)

	SparkMLTensorflowAI-HybridCloud-ContinuousDeployment-Copy1.ipynb
	checkpoint/
	../../../TimeSeries/Untitled.ipynb


In [8]:
!git add --all /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/00000027/

In [2]:
!git status


On branch master
Your branch is ahead of 'origin/master' by 1 commit.
  (use "git push" to publish your local commits)

Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git checkout -- <file>..." to discard changes in working directory)

	modified:   SparkMLTensorflowAI-HybridCloud-ContinuousDeployment.ipynb
	modified:   ../../../Scikit-Learn/Scikit-Learn+Demos.ipynb
	modified:   ../../../TensorFlow/DeepDream/deepdream.ipynb
	modified:   ../../../Theano/Model Save and Load.ipynb

Untracked files:
  (use "git add <file>..." to include in what will be committed)

	SparkMLTensorflowAI-HybridCloud-ContinuousDeployment-Azure.ipynb
	checkpoint/
	../../../Spark/Pi/
	../../../TimeSeries/Untitled.ipynb

no changes added to commit (use "git add" and/or "git commit -a")

In [9]:
!git commit -m "updated tensorflow model"


[master 0ce7882] updated tensorflow model
 4 files changed, 2 insertions(+)
 create mode 100644 prediction.ml/tensorflow/models/tensorflow_minimal/export/00000027/checkpoint
 create mode 100644 prediction.ml/tensorflow/models/tensorflow_minimal/export/00000027/export.data-00000-of-00001
 create mode 100644 prediction.ml/tensorflow/models/tensorflow_minimal/export/00000027/export.index
 create mode 100644 prediction.ml/tensorflow/models/tensorflow_minimal/export/00000027/export.meta

In [1]:
!git status


On branch master
Your branch is up-to-date with 'origin/master'.

Changes to be committed:
  (use "git reset HEAD <file>..." to unstage)

	modified:   ../../../../scripts/pi.py

Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git checkout -- <file>..." to discard changes in working directory)

	modified:   SparkMLTensorflowAI-HybridCloud-ContinuousDeployment.ipynb
	modified:   ../../../Scikit-Learn/Scikit-Learn+Demos.ipynb
	modified:   ../../../TensorFlow/DeepDream/deepdream.ipynb
	modified:   ../../../Theano/Model Save and Load.ipynb

Untracked files:
  (use "git add <file>..." to include in what will be committed)

	SparkMLTensorflowAI-HybridCloud-ContinuousDeployment-Azure.ipynb
	checkpoint/
	../../../Spark/Pi/
	../../../TimeSeries/Untitled.ipynb


In [10]:
# If this fails with "Permission denied", use terminal within jupyter to manually `git push`
!git push


warning: push.default is unset; its implicit value is changing in
Git 2.0 from 'matching' to 'simple'. To squelch this message
and maintain the current behavior after the default changes, use:

  git config --global push.default matching

To squelch this message and adopt the new behavior now, use:

  git config --global push.default simple

When push.default is set to 'matching', git will push local branches
to the remote branches that already exist with the same name.

In Git 2.0, Git will default to the more conservative 'simple'
behavior, which only pushes the current branch to the corresponding
remote branch that 'git pull' uses to update the current branch.

See 'git help config' and search for 'push.default' for further information.
(the 'simple' mode was introduced in Git 1.7.11. Use the similar mode
'current' instead of 'simple' if you sometimes use older versions of Git)

Permission denied (publickey).
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.

Airflow Workflow Deploys New Model through Github Post-Commit Webhook to Triggers


In [11]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.display import clear_output, Image, display, HTML

html = '<iframe width=100% height=500px src="http://demo.pipeline.io:8080/admin">'
display(HTML(html))


Train and Deploy Spark ML Model (Airbnb Model, Mutable Deploy)

Scale Out Spark Training Cluster

Kubernetes CLI


In [12]:
!kubectl scale --context=awsdemo --replicas=2 rc spark-worker-2-0-1


replicationcontroller "spark-worker-2-0-1" scaled

In [13]:
!kubectl get pod --context=awsdemo


NAME                                                           READY     STATUS    RESTARTS   AGE
airflow-z2txg                                                  1/1       Running   0          51m
hdfs-gd1pf                                                     1/1       Running   0          11d
hystrix-rdx5g                                                  1/1       Running   0          9d
jupyterhub-master-sqqjf                                        1/1       Running   0          11d
metastore-1-2-1-jw8kx                                          1/1       Running   0          11d
mysql-master-586zp                                             1/1       Running   0          11d
prediction-codegen-b8c5b3016dc1eb4e655146be94cd2d56-p5m8f      1/1       Running   0          9d
prediction-pmml-7db0fe5cbac0a14d38069985acd5b119-jv3ln         1/1       Running   0          9d
prediction-tensorflow-60175981688eb64d9663a29c3ced8f45-4zndp   1/1       Running   0          9d
spark-master-2-0-1-28vrg                                       1/1       Running   0          11d
spark-worker-2-0-1-49qqg                                       1/1       Running   0          11d
spark-worker-2-0-1-56hqq                                       1/1       Running   0          5d
turbine-2gvb2                                                  1/1       Running   0          9d
weavescope-app-c9khj                                           1/1       Running   0          11d
weavescope-probe-0mtn7                                         1/1       Running   0          11d
weavescope-probe-7qtz4                                         1/1       Running   0          11d
weavescope-probe-kqt3z                                         1/1       Running   0          11d
web-home-0c9jw                                                 1/1       Running   0          11d

Weavescope Kubernetes AWS Cluster Visualization


In [14]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.display import clear_output, Image, display, HTML

html = '<iframe width=100% height=500px src="http://kubernetes-aws.demo.pipeline.io">'
display(HTML(html))


Generate PMML from Spark ML Model


In [15]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression

In [ ]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Step 0: Load Libraries and Data


In [ ]:
df = spark.read.format("csv") \
  .option("inferSchema", "true").option("header", "true") \
  .load("s3a://datapalooza/airbnb/airbnb.csv.bz2")

df.registerTempTable("df")

print(df.head())

In [ ]:
print(df.count())

Step 1: Clean, Filter, and Summarize the Data


In [ ]:
df_filtered = df.filter("price >= 50 AND price <= 750 AND bathrooms > 0.0 AND bedrooms is not null")

df_filtered.registerTempTable("df_filtered")

df_final = spark.sql("""
    select
        id,
        city,
        case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
            then state
            else 'Other'
        end as state,
        space,
        cast(price as double) as price,
        cast(bathrooms as double) as bathrooms,
        cast(bedrooms as double) as bedrooms,
        room_type,
        host_is_super_host,
        cancellation_policy,
        cast(case when security_deposit is null
            then 0.0
            else security_deposit
        end as double) as security_deposit,
        price_per_bedroom,
        cast(case when number_of_reviews is null
            then 0.0
            else number_of_reviews
        end as double) as number_of_reviews,
        cast(case when extra_people is null
            then 0.0
            else extra_people
        end as double) as extra_people,
        instant_bookable,
        cast(case when cleaning_fee is null
            then 0.0
            else cleaning_fee
        end as double) as cleaning_fee,
        cast(case when review_scores_rating is null
            then 80.0
            else review_scores_rating
        end as double) as review_scores_rating,
        cast(case when square_feet is not null and square_feet > 100
            then square_feet
            when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
            then 350.0
            else 380 * bedrooms
        end as double) as square_feet
    from df_filtered
""").persist()

df_final.registerTempTable("df_final")

df_final.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()

In [ ]:
print(df_final.count())

In [ ]:
print(df_final.schema)

In [ ]:
# Most popular cities

spark.sql("""
    select 
        state,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by state
    order by count(*) desc
""").show()

In [ ]:
# Most expensive popular cities

spark.sql("""
    select 
        city,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by city
    order by avg(price) desc
""").filter("ct > 25").show()

Step 2: Define Continous and Categorical Features


In [29]:
continuous_features = ["bathrooms", \
                       "bedrooms", \
                       "security_deposit", \
                       "cleaning_fee", \
                       "extra_people", \
                       "number_of_reviews", \
                       "square_feet", \
                       "review_scores_rating"]

categorical_features = ["room_type", \
                        "host_is_super_host", \
                        "cancellation_policy", \
                        "instant_bookable", \
                        "state"]

Step 3: Split Data into Training and Validation


In [23]:
[training_dataset, validation_dataset] = df_final.randomSplit([0.8, 0.2])

Step 4: Continous Feature Pipeline


In [24]:
continuous_feature_assembler = VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")

continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features", \
                                           withStd=True, withMean=False)

Step 5: Categorical Feature Pipeline


In [25]:
categorical_feature_indexers = [StringIndexer(inputCol=x, \
                                              outputCol="{}_index".format(x)) \
                                for x in categorical_features]

categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), \
                                                      outputCol="oh_encoder_{}".format(x.getOutputCol() )) \
                                        for x in categorical_feature_indexers]

Step 6: Assemble our Features and Feature Pipeline


In [26]:
feature_cols_lr = [x.getOutputCol() \
                   for x in categorical_feature_one_hot_encoders]
feature_cols_lr.append("scaled_continuous_features")

feature_assembler_lr = VectorAssembler(inputCols=feature_cols_lr, \
                                       outputCol="features_lr")

Step 7: Train a Linear Regression Model


In [27]:
linear_regression = LinearRegression(featuresCol="features_lr", \
                                     labelCol="price", \
                                     predictionCol="price_prediction", \
                                     maxIter=10, \
                                     regParam=0.3, \
                                     elasticNetParam=0.8)

estimators_lr = \
  [continuous_feature_assembler, continuous_feature_scaler] \
  + categorical_feature_indexers + categorical_feature_one_hot_encoders \
  + [feature_assembler_lr] + [linear_regression]

pipeline = Pipeline(stages=estimators_lr)

pipeline_model = pipeline.fit(training_dataset)

print(pipeline_model)


PipelineModel_4a97887ec5bffb967963

Step 8: Convert PipelineModel to PMML


In [30]:
from jpmml import toPMMLBytes

pmmlBytes = toPMMLBytes(spark, training_dataset, pipeline_model)

print(pmmlBytes.decode("utf-8"))


<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<PMML xmlns="http://www.dmg.org/PMML-4_3" version="4.3">
	<Header>
		<Application/>
		<Timestamp>2017-01-25T21:03:04Z</Timestamp>
	</Header>
	<DataDictionary>
		<DataField name="bathrooms" optype="continuous" dataType="double"/>
		<DataField name="bedrooms" optype="continuous" dataType="double"/>
		<DataField name="security_deposit" optype="continuous" dataType="double"/>
		<DataField name="cleaning_fee" optype="continuous" dataType="double"/>
		<DataField name="extra_people" optype="continuous" dataType="double"/>
		<DataField name="number_of_reviews" optype="continuous" dataType="double"/>
		<DataField name="square_feet" optype="continuous" dataType="double"/>
		<DataField name="review_scores_rating" optype="continuous" dataType="double"/>
		<DataField name="room_type" optype="categorical" dataType="string">
			<Value value="Entire home/apt"/>
			<Value value="Private room"/>
			<Value value="Shared room"/>
		</DataField>
		<DataField name="host_is_super_host" optype="categorical" dataType="string">
			<Value value="0.0"/>
			<Value value="1.0"/>
		</DataField>
		<DataField name="cancellation_policy" optype="categorical" dataType="string">
			<Value value="strict"/>
			<Value value="moderate"/>
			<Value value="flexible"/>
			<Value value="super_strict_30"/>
			<Value value="no_refunds"/>
			<Value value="super_strict_60"/>
			<Value value="long_term"/>
		</DataField>
		<DataField name="instant_bookable" optype="categorical" dataType="string">
			<Value value="0.0"/>
			<Value value="1.0"/>
		</DataField>
		<DataField name="state" optype="categorical" dataType="string">
			<Value value="Other"/>
			<Value value="NY"/>
			<Value value="CA"/>
			<Value value="Berlin"/>
			<Value value="IL"/>
			<Value value="TX"/>
			<Value value="WA"/>
			<Value value="DC"/>
			<Value value="OR"/>
			<Value value="London"/>
		</DataField>
		<DataField name="price" optype="continuous" dataType="double"/>
	</DataDictionary>
	<TransformationDictionary>
		<DerivedField name="scaled_continuous_features[0]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="bathrooms"/>
				<Constant dataType="double">2.074530888987612</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[1]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="bedrooms"/>
				<Constant dataType="double">1.183342093127935</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[2]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="security_deposit"/>
				<Constant dataType="double">0.005515446077314265</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[3]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="cleaning_fee"/>
				<Constant dataType="double">0.02354892955874355</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[4]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="extra_people"/>
				<Constant dataType="double">0.05420544924382127</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[5]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="number_of_reviews"/>
				<Constant dataType="double">0.037137618066620755</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[6]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="square_feet"/>
				<Constant dataType="double">0.0027741086292747933</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[7]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="review_scores_rating"/>
				<Constant dataType="double">0.11353378155842442</Constant>
			</Apply>
		</DerivedField>
	</TransformationDictionary>
	<RegressionModel functionName="regression">
		<MiningSchema>
			<MiningField name="price" usageType="target"/>
			<MiningField name="bathrooms"/>
			<MiningField name="bedrooms"/>
			<MiningField name="security_deposit"/>
			<MiningField name="cleaning_fee"/>
			<MiningField name="extra_people"/>
			<MiningField name="number_of_reviews"/>
			<MiningField name="square_feet"/>
			<MiningField name="review_scores_rating"/>
			<MiningField name="room_type"/>
			<MiningField name="host_is_super_host"/>
			<MiningField name="cancellation_policy"/>
			<MiningField name="instant_bookable"/>
			<MiningField name="state"/>
		</MiningSchema>
		<RegressionTable intercept="-33.909482016407964">
			<NumericPredictor name="scaled_continuous_features[0]" coefficient="16.846713112950372"/>
			<NumericPredictor name="scaled_continuous_features[1]" coefficient="21.65744359456862"/>
			<NumericPredictor name="scaled_continuous_features[2]" coefficient="1.019676161981778"/>
			<NumericPredictor name="scaled_continuous_features[3]" coefficient="24.128563121458917"/>
			<NumericPredictor name="scaled_continuous_features[4]" coefficient="2.2128469933598076"/>
			<NumericPredictor name="scaled_continuous_features[5]" coefficient="-2.707908890362369"/>
			<NumericPredictor name="scaled_continuous_features[6]" coefficient="3.296611618708919"/>
			<NumericPredictor name="scaled_continuous_features[7]" coefficient="4.565176020759582"/>
			<CategoricalPredictor name="room_type" value="Entire home/apt" coefficient="26.3916496098938"/>
			<CategoricalPredictor name="room_type" value="Private room" coefficient="-12.814292762342133"/>
			<CategoricalPredictor name="host_is_super_host" value="0.0" coefficient="-5.230664391468683"/>
			<CategoricalPredictor name="cancellation_policy" value="strict" coefficient="2.6265387778200915"/>
			<CategoricalPredictor name="cancellation_policy" value="moderate" coefficient="-4.228524610632222"/>
			<CategoricalPredictor name="cancellation_policy" value="flexible" coefficient="0.0"/>
			<CategoricalPredictor name="cancellation_policy" value="super_strict_30" coefficient="66.09702679547543"/>
			<CategoricalPredictor name="cancellation_policy" value="no_refunds" coefficient="0.0"/>
			<CategoricalPredictor name="cancellation_policy" value="super_strict_60" coefficient="62.58306949412152"/>
			<CategoricalPredictor name="instant_bookable" value="0.0" coefficient="6.857977554604009"/>
			<CategoricalPredictor name="state" value="Other" coefficient="-10.872478259439536"/>
			<CategoricalPredictor name="state" value="NY" coefficient="19.8002758153602"/>
			<CategoricalPredictor name="state" value="CA" coefficient="12.244397012780297"/>
			<CategoricalPredictor name="state" value="Berlin" coefficient="-49.91171097312541"/>
			<CategoricalPredictor name="state" value="IL" coefficient="16.65571231582757"/>
			<CategoricalPredictor name="state" value="TX" coefficient="32.76371863225779"/>
			<CategoricalPredictor name="state" value="WA" coefficient="-7.32245281839754"/>
			<CategoricalPredictor name="state" value="DC" coefficient="5.449012606498783"/>
			<CategoricalPredictor name="state" value="OR" coefficient="-16.720781290330176"/>
		</RegressionTable>
	</RegressionModel>
</PMML>

Push PMML to Live, Running Spark ML Model Server (Mutable)


In [31]:
import urllib.request

update_url = 'http://prediction-pmml-aws.demo.pipeline.io/update-pmml/pmml_airbnb'

update_headers = {}
update_headers['Content-type'] = 'application/xml'

req = urllib.request.Request(update_url, \
                             headers=update_headers, \
                             data=pmmlBytes)

resp = urllib.request.urlopen(req)

print(resp.status) # Should return Http Status 200


200

In [32]:
import urllib.request

update_url = 'http://prediction-pmml-gcp.demo.pipeline.io/update-pmml/pmml_airbnb'

update_headers = {}
update_headers['Content-type'] = 'application/xml'

req = urllib.request.Request(update_url, \
                             headers=update_headers, \
                             data=pmmlBytes)

resp = urllib.request.urlopen(req)

print(resp.status) # Should return Http Status 200


200

In [33]:
import urllib.parse
import json

evaluate_url = 'http://prediction-pmml-aws.demo.pipeline.io/evaluate-pmml/pmml_airbnb'

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'

input_params = '{"bathrooms":2.0, \
                 "bedrooms":2.0, \
                 "security_deposit":175.00, \
                 "cleaning_fee":25.0, \
                 "extra_people":1.0, \
                 "number_of_reviews": 2.0, \
                 "square_feet": 250.0, \
                 "review_scores_rating": 2.0, \
                 "room_type": "Entire home/apt", \
                 "host_is_super_host": "0.0", \
                 "cancellation_policy": "flexible", \
                 "instant_bookable": "1.0", \
                 "state": "CA"}' 
encoded_input_params = input_params.encode('utf-8')

req = urllib.request.Request(evaluate_url, \
                             headers=evaluate_headers, \
                             data=encoded_input_params)

resp = urllib.request.urlopen(req)

print(resp.read())


b'{"results":[[{\'price\': \'139.08123420618884\'}]]'

In [35]:
import urllib.parse
import json

evaluate_url = 'http://prediction-pmml-aws.demo.pipeline.io/evaluate-pmml/pmml_airbnb'

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'

input_params = '{"bathrooms":2.0, \
                 "bedrooms":2.0, \
                 "security_deposit":175.00, \
                 "cleaning_fee":25.0, \
                 "extra_people":1.0, \
                 "number_of_reviews": 2.0, \
                 "square_feet": 250.0, \
                 "review_scores_rating": 2.0, \
                 "room_type": "Entire home/apt", \
                 "host_is_super_host": "0.0", \
                 "cancellation_policy": "flexible", \
                 "instant_bookable": "1.0", \
                 "state": "CA"}' 
encoded_input_params = input_params.encode('utf-8')

req = urllib.request.Request(evaluate_url, \
                             headers=evaluate_headers, \
                             data=encoded_input_params)

resp = urllib.request.urlopen(req)

print(resp.read())


b'{"results":[[{\'price\': \'139.08123420618884\'}]]'

In [ ]:
import urllib.parse
import json

evaluate_url = 'http://prediction-pmml-gcp.demo.pipeline.io/evaluate-pmml/pmml_airbnb'

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'

input_params = '{"bathrooms":2.0, \
                 "bedrooms":2.0, \
                 "security_deposit":175.00, \
                 "cleaning_fee":25.0, \
                 "extra_people":1.0, \
                 "number_of_reviews": 2.0, \
                 "square_feet": 250.0, \
                 "review_scores_rating": 2.0, \
                 "room_type": "Entire home/apt", \
                 "host_is_super_host": "0.0", \
                 "cancellation_policy": "flexible", \
                 "instant_bookable": "1.0", \
                 "state": "CA"}' 
encoded_input_params = input_params.encode('utf-8')

req = urllib.request.Request(evaluate_url, \
                             headers=evaluate_headers, \
                             data=encoded_input_params)

resp = urllib.request.urlopen(req)

print(resp.read())

Deploy Java-based Model (Simple Model, Mutable Deploy)


In [40]:
from urllib import request

sourceBytes = '                                                      \n\
  private String str;                                                \n\
                                                                     \n\
  public void initialize(Map<String, Object> args) {                 \n\
  }                                                                  \n\
                                                                     \n\
  public Object predict(Map<String, Object> inputs) {                \n\
      String id = (String)inputs.get("id");                          \n\
                                                                     \n\
      return id.equals("21619");                                     \n\
  }                                                                  \n\
'.encode('utf-8')

In [41]:
from urllib import request

name = 'codegen_equals'
update_url = 'http://prediction-codegen-aws.demo.pipeline.io/update-codegen/%s/' % name

update_headers = {}
update_headers['Content-type'] = 'text/plain'

req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)

generated_code = resp.read()
print(generated_code.decode('utf-8'))


/* 001 */
/* 002 */ private String str;
/* 003 */
/* 004 */ public void initialize(Map<String, Object> args) {
/* 005 */ }
/* 006 */
/* 007 */ public Object predict(Map<String, Object> inputs) {
/* 008 */   String id = (String)inputs.get("id");
/* 009 */
/* 010 */   return id.equals("21619");
/* 011 */ }


In [42]:
from urllib import request

name = 'codegen_equals'
update_url = 'http://prediction-codegen-gcp.demo.pipeline.io/update-codegen/%s/' % name

update_headers = {}
update_headers['Content-type'] = 'text/plain'

req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)

generated_code = resp.read()
print(generated_code.decode('utf-8'))


/* 001 */
/* 002 */ private String str;
/* 003 */
/* 004 */ public void initialize(Map<String, Object> args) {
/* 005 */ }
/* 006 */
/* 007 */ public Object predict(Map<String, Object> inputs) {
/* 008 */   String id = (String)inputs.get("id");
/* 009 */
/* 010 */   return id.equals("21619");
/* 011 */ }


In [43]:
from urllib import request

name = 'codegen_equals'
evaluate_url = 'http://prediction-codegen-aws.demo.pipeline.io/evaluate-codegen/%s' % name

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21618"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return true


b'false'

In [44]:
from urllib import request

name = 'codegen_equals'
evaluate_url = 'http://prediction-codegen-aws.demo.pipeline.io/evaluate-codegen/%s' % name

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21619"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return false


b'true'

Deploy Java Model (HttpClient Model, Mutable Deploy)


In [45]:
from urllib import request

sourceBytes = '                                                         \n\
  public Map<String, Object> data = new HashMap<String, Object>();      \n\
                                                                        \n\
  public void initialize(Map<String, Object> args) {                    \n\
    data.put("url", "http://demo.pipeline.io:9040/prediction/");        \n\
  }                                                                     \n\
                                                                        \n\
  public Object predict(Map<String, Object> inputs) {                   \n\
    try {                                                               \n\
      String userId = (String)inputs.get("userId");                     \n\
      String itemId = (String)inputs.get("itemId");                     \n\
      String url = data.get("url") + "/" + userId + "/" + itemId;       \n\
                                                                        \n\
      return org.apache.http.client.fluent.Request                      \n\
        .Get(url)                                                       \n\
        .execute()                                                      \n\
        .returnContent();                                               \n\
                                                                        \n\
    } catch(Exception exc) {                                            \n\
      System.out.println(exc);                                          \n\
      throw exc;                                                        \n\
    }                                                                   \n\
  }                                                                     \n\
'.encode('utf-8')

In [46]:
from urllib import request

name = 'codegen_httpclient'
# Note:  Must have trailing '/'
update_url = 'http://prediction-codegen-aws.demo.pipeline.io/update-codegen/%s/' % name

update_headers = {}
update_headers['Content-type'] = 'text/plain'

req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)

generated_code = resp.read()
print(generated_code.decode('utf-8'))


/* 001 */
/* 002 */ public Map<String, Object> data = new HashMap<String, Object>();
/* 003 */
/* 004 */ public void initialize(Map<String, Object> args) {
/* 005 */   data.put("url", "http://demo.pipeline.io:9040/prediction/");
/* 006 */ }
/* 007 */
/* 008 */ public Object predict(Map<String, Object> inputs) {
/* 009 */   try {
/* 010 */     String userId = (String)inputs.get("userId");
/* 011 */     String itemId = (String)inputs.get("itemId");
/* 012 */     String url = data.get("url") + "/" + userId + "/" + itemId;
/* 013 */
/* 014 */     return org.apache.http.client.fluent.Request
/* 015 */     .Get(url)
/* 016 */     .execute()
/* 017 */     .returnContent();
/* 018 */
/* 019 */   } catch(Exception exc) {
/* 020 */     System.out.println(exc);
/* 021 */     throw exc;
/* 022 */   }
/* 023 */ }


In [ ]:
from urllib import request

name = 'codegen_httpclient'
# Note:  Must have trailing '/'
update_url = 'http://prediction-codegen-gcp.demo.pipeline.io/update-codegen/%s/' % name

update_headers = {}
update_headers['Content-type'] = 'text/plain'

req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)

generated_code = resp.read()
print(generated_code.decode('utf-8'))

In [ ]:
from urllib import request

name = 'codegen_httpclient'
evaluate_url = 'http://prediction-codegen-aws.demo.pipeline.io/evaluate-codegen/%s' % name

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"userId":"21619", "itemId":"10006"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return float

In [ ]:
from urllib import request

name = 'codegen_httpclient'
evaluate_url = 'http://prediction-codegen-gcp.demo.pipeline.io/evaluate-codegen/%s' % name

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"userId":"21619", "itemId":"10006"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return float

Load Test and Compare Cloud Providers (AWS and Google)

Monitor Performance Across Cloud Providers

NetflixOSS Services Dashboard (Hystrix)


In [34]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.display import clear_output, Image, display, HTML

html = '<iframe width=100% height=500px src="http://hystrix.demo.pipeline.io/hystrix-dashboard/monitor/monitor.html?streams=%5B%7B%22name%22%3A%22Predictions%20-%20AWS%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-aws.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22Predictions%20-%20GCP%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-gcp.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%5D">'
display(HTML(html))


Start Load Tests

Run JMeter Tests from Local Laptop (Limited by Laptop)

Run Headless JMeter Tests from Training Clusters in Cloud


In [ ]:
# Spark ML - PMML - Airbnb
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-airbnb-rc.yaml
!kubectl create --context=gcpdemo -f /root/pipeline/loadtest.ml/loadtest-aws-airbnb-rc.yaml

In [ ]:
# Codegen - Java - Simple
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-equals-rc.yaml
!kubectl create --context=gcpdemo -f /root/pipeline/loadtest.ml/loadtest-aws-equals-rc.yaml

In [ ]:
# Tensorflow AI - Tensorflow Serving - Simple 
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-minimal-rc.yaml
!kubectl create --context=gcpdemo -f /root/pipeline/loadtest.ml/loadtest-aws-minimal-rc.yaml

End Load Tests


In [ ]:
!kubectl delete --context=awsdemo rc loadtest-aws-airbnb
!kubectl delete --context=gcpdemo rc loadtest-aws-airbnb
!kubectl delete --context=awsdemo rc loadtest-aws-equals
!kubectl delete --context=gcpdemo rc loadtest-aws-equals
!kubectl delete --context=awsdemo rc loadtest-aws-minimal
!kubectl delete --context=gcpdemo rc loadtest-aws-minimal

Rolling Deploy Tensorflow AI (Simple Model, Immutable Deploy)

Kubernetes CLI


In [ ]:
!kubectl rolling-update prediction-tensorflow --context=awsdemo --image-pull-policy=Always --image=fluxcapacitor/prediction-tensorflow

In [ ]:
!kubectl get pod --context=awsdemo

In [ ]:
!kubectl rolling-update prediction-tensorflow --context=gcpdemo --image-pull-policy=Always --image=fluxcapacitor/prediction-tensorflow

In [ ]:
!kubectl get pod --context=gcpdemo

7. Q&A