Where Am I?

Startup.ML Conference - San Francisco - Jan 20, 2017

Who Am I?

Chris Fregly

Research Scientist @ PipelineIO

Video Series Author "High Performance Tensorflow in Production" @ OReilly (Coming Soon)

Founder @ Advanced Spark and Tensorflow Meetup

Github Repo

DockerHub Repo

Slideshare

YouTube

Who Was I?

Software Engineer @ Netflix, Databricks, IBM Spark Tech Center

1. Infrastructure and Tools

Docker

Images, Containers

Useful Docker Image: AWS + GPU + Docker + Tensorflow + Spark

Kubernetes

Container Orchestration Across Clusters

Weavescope

Kubernetes Cluster Visualization

Jupyter Notebooks

What We're Using Here for Everything!

Airflow

Invoke Any Type of Workflow on Any Type of Schedule

Github

Commit New Model to Github, Airflow Workflow Triggered for Continuous Deployment

DockerHub

Maintains Docker Images

Continuous Deployment

Not Just for Code, Also for ML/AI Models!

Canary Release

Deploy and Compare New Model Alongside Existing

Metrics and Dashboards

Not Just System Metrics, ML/AI Model Prediction Metrics

NetflixOSS-based

Prometheus

Grafana

Elasticsearch

Separate Cluster Concerns

Training/Admin Cluster

Prediction Cluster

Hybrid Cloud Deployment for eXtreme High Availability (XHA)

AWS, Google Cloud, and Azure

Apache Spark

Tensorflow + Tensorflow Serving

2. Model Deployment Bundles

KeyValue

ie. Recommendations

In-memory: Redis, Memcache

On-disk: Cassandra, RocksDB

First-class Servable in Tensorflow Serving

PMML

It's Useful and Well-Supported

Apple, Cisco, Airbnb, HomeAway, etc

Please Don't Re-build It - Reduce Your Technical Debt!

Native Code

Hand-coded (Python + Pickling)

Generate Java Code from PMML?

Tensorflow Model Exports

freeze_graph.py: Combine Tensorflow Graph (Static) with Trained Weights (Checkpoints) into Single Deployable Model

3. Model Deployments and Rollbacks

Mutable

Each New Model is Deployed to Live, Running Container

Immutable

Each New Model is a New Docker Image

4. Optimizing Tensorflow Models for Serving

Python Scripts

optimize_graph_for_inference.py

Pete Warden's Blog

Graph Transform Tool

Compile (Tensorflow 1.0+)

XLA Compiler

Compiles 3 graph operations (input, operation, output) into 1 operation

Removes need for Tensorflow Runtime (20 MB is significant on tiny devices)

Allows new backends for hardware-specific optimizations (better portability)

tfcompile

Convert Graph into executable code

Compress/Distill Ensemble Models

Convert ensembles or other complex models into smaller models

Re-score training data with output of model being distilled

Train smaller model to produce same output

Output of smaller model learns more information than original label

5. Optimizing Serving Runtime Environment

Throughput

Option 1: Add more Tensorflow Serving servers behind load balancer

Option 2: Enable request batching in each Tensorflow Serving

Option Trade-offs: Higher Latency (bad) for Higher Throughput (good)

$TENSORFLOW_SERVING_HOME/bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server 
--port=9000 
--model_name=tensorflow_minimal 
--model_base_path=/root/models/tensorflow_minimal/export
--enable_batching=true
--max_batch_size=1000000
--batch_timeout_micros=10000
--max_enqueued_batches=1000000

Latency

The deeper the model, the longer the latency

Start inference in parallel where possible (ie. user inference in parallel with item inference)

Pre-load common inputs from database (ie. user attributes, item attributes)

Pre-compute/partial-compute common inputs (ie. popular word embeddings)

Memory

Word embeddings are huge!

Use hashId for each word

Off-load embedding matrices to parameter server and share between serving servers

6. Demos!!

Train and Deploy Tensorflow AI Model (Simple Model, Immutable Deploy)

Train Tensorflow AI Model


In [ ]:
import numpy as np
import os
import tensorflow as tf
from tensorflow.contrib.session_bundle import exporter
import time

In [ ]:
# make things wide
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.display import clear_output, Image, display, HTML

def strip_consts(graph_def, max_const_size=32):
    """Strip large constant values from graph_def."""
    strip_def = tf.GraphDef()
    for n0 in graph_def.node:
        n = strip_def.node.add() 
        n.MergeFrom(n0)
        if n.op == 'Const':
            tensor = n.attr['value'].tensor
            size = len(tensor.tensor_content)
            if size > max_const_size:
                tensor.tensor_content = "<stripped %d bytes>"%size
    return strip_def

def show_graph(graph_def=None, width=1200, height=800, max_const_size=32, ungroup_gradients=False):
    if not graph_def:
        graph_def = tf.get_default_graph().as_graph_def()
        
    """Visualize TensorFlow graph."""
    if hasattr(graph_def, 'as_graph_def'):
        graph_def = graph_def.as_graph_def()
    strip_def = strip_consts(graph_def, max_const_size=max_const_size)
    data = str(strip_def)
    if ungroup_gradients:
        data = data.replace('"gradients/', '"b_')
        #print(data)
    code = """
        <script>
          function load() {{
            document.getElementById("{id}").pbtxt = {data};
          }}
        </script>
        <link rel="import" href="https://tensorboard.appspot.com/tf-graph-basic.build.html" onload=load()>
        <div style="height:600px">
          <tf-graph-basic id="{id}"></tf-graph-basic>
        </div>
    """.format(data=repr(data), id='graph'+str(np.random.rand()))

    iframe = """
        <iframe seamless style="width:{}px;height:{}px;border:0" srcdoc="{}"></iframe>
    """.format(width, height, code.replace('"', '&quot;'))
    display(HTML(iframe))

In [ ]:
# If this errors out, increment the `export_version` variable, restart the Kernel, and re-run

flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer("batch_size", 10, "The batch size to train")
flags.DEFINE_integer("epoch_number", 10, "Number of epochs to run trainer")
flags.DEFINE_integer("steps_to_validate", 1,
                     "Steps to validate and print loss")
flags.DEFINE_string("checkpoint_dir", "./checkpoint/",
                    "indicates the checkpoint dirctory")
#flags.DEFINE_string("model_path", "./model/", "The export path of the model")
flags.DEFINE_string("model_path", "/root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/", "The export path of the model")
flags.DEFINE_integer("export_version", 37, "The version number of the model")

In [ ]:
# If this errors out, increment the `export_version` variable, restart the Kernel, and re-run

def main():
  # Define training data
  x = np.ones(FLAGS.batch_size)
  y = np.ones(FLAGS.batch_size)

  # Define the model
  X = tf.placeholder(tf.float32, shape=[None], name="X")
  Y = tf.placeholder(tf.float32, shape=[None], name="yhat")
  w = tf.Variable(1.0, name="weight")
  b = tf.Variable(1.0, name="bias")
  loss = tf.square(Y - tf.mul(X, w) - b)
  train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)
  predict_op  = tf.mul(X, w) + b

  saver = tf.train.Saver()
  checkpoint_dir = FLAGS.checkpoint_dir
  checkpoint_file = checkpoint_dir + "/checkpoint.ckpt"
  if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)
    
  # Start the session
  with tf.Session() as sess:
    sess.run(tf.initialize_all_variables())

    ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
    if ckpt and ckpt.model_checkpoint_path:
      print("Continue training from the model {}".format(ckpt.model_checkpoint_path))
      saver.restore(sess, ckpt.model_checkpoint_path)

    saver_def = saver.as_saver_def()
    print(saver_def.filename_tensor_name)
    print(saver_def.restore_op_name)

    # Start training
    start_time = time.time()
    for epoch in range(FLAGS.epoch_number):
      sess.run(train_op, feed_dict={X: x, Y: y})

      # Start validating
      if epoch % FLAGS.steps_to_validate == 0:
        end_time = time.time()
        print("[{}] Epoch: {}".format(end_time - start_time, epoch))

        saver.save(sess, checkpoint_file)
        tf.train.write_graph(sess.graph_def, checkpoint_dir, 'trained_model.pb', as_text=False)
        tf.train.write_graph(sess.graph_def, checkpoint_dir, 'trained_model.txt', as_text=True)

        start_time = end_time

    # Print model variables
    w_value, b_value = sess.run([w, b])
    print("The model of w: {}, b: {}".format(w_value, b_value))

    # Export the model
    print("Exporting trained model to {}".format(FLAGS.model_path))
    model_exporter = exporter.Exporter(saver)
    model_exporter.init(
      sess.graph.as_graph_def(),
      named_graph_signatures={
        'inputs': exporter.generic_signature({"features": X}),
        'outputs': exporter.generic_signature({"prediction": predict_op})
      })
    model_exporter.export(FLAGS.model_path, tf.constant(FLAGS.export_version), sess)
    print('Done exporting!')

if __name__ == "__main__":
  main()

In [ ]:
show_graph()

Commit and Deploy New Tensorflow AI Model

Commit Model to Github


In [ ]:
!ls -l /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export

In [ ]:
!ls -l /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/00000037

In [ ]:
!git status

In [ ]:
!git add --all /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/00000037/

In [ ]:
!git status

In [ ]:
!git commit -m "updated tensorflow model"

In [ ]:
# If this fails with "Permission denied", use terminal within jupyter to manually `git push`
!git push

Airflow Workflow Deploys New Model through Github Post-Commit Webhook to Triggers


In [ ]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.display import clear_output, Image, display, HTML

html = '<iframe width=100% height=500px src="http://demo.pipeline.io:8080/admin">'
display(HTML(html))

Train and Deploy Spark ML Model (Airbnb Model, Mutable Deploy)

Scale Out Spark Training Cluster

Kubernetes CLI


In [ ]:
!kubectl scale --context=awsdemo --replicas=2 rc spark-worker-2-0-1

In [ ]:
!kubectl get pod --context=awsdemo

Weavescope Kubernetes AWS Cluster Visualization


In [ ]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.display import clear_output, Image, display, HTML

html = '<iframe width=100% height=500px src="http://kubernetes-aws.demo.pipeline.io">'
display(HTML(html))

Generate PMML from Spark ML Model


In [ ]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression

In [ ]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Step 0: Load Libraries and Data


In [ ]:
df = spark.read.format("csv") \
  .option("inferSchema", "true").option("header", "true") \
  .load("s3a://datapalooza/airbnb/airbnb.csv.bz2")

df.registerTempTable("df")

print(df.head())

In [ ]:
print(df.count())

Step 1: Clean, Filter, and Summarize the Data


In [ ]:
df_filtered = df.filter("price >= 50 AND price <= 750 AND bathrooms > 0.0 AND bedrooms is not null")

df_filtered.registerTempTable("df_filtered")

df_final = spark.sql("""
    select
        id,
        city,
        case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
            then state
            else 'Other'
        end as state,
        space,
        cast(price as double) as price,
        cast(bathrooms as double) as bathrooms,
        cast(bedrooms as double) as bedrooms,
        room_type,
        host_is_super_host,
        cancellation_policy,
        cast(case when security_deposit is null
            then 0.0
            else security_deposit
        end as double) as security_deposit,
        price_per_bedroom,
        cast(case when number_of_reviews is null
            then 0.0
            else number_of_reviews
        end as double) as number_of_reviews,
        cast(case when extra_people is null
            then 0.0
            else extra_people
        end as double) as extra_people,
        instant_bookable,
        cast(case when cleaning_fee is null
            then 0.0
            else cleaning_fee
        end as double) as cleaning_fee,
        cast(case when review_scores_rating is null
            then 80.0
            else review_scores_rating
        end as double) as review_scores_rating,
        cast(case when square_feet is not null and square_feet > 100
            then square_feet
            when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
            then 350.0
            else 380 * bedrooms
        end as double) as square_feet
    from df_filtered
""").persist()

df_final.registerTempTable("df_final")

df_final.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()

In [ ]:
print(df_final.count())

In [ ]:
print(df_final.schema)

In [ ]:
# Most popular cities

spark.sql("""
    select 
        state,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by state
    order by count(*) desc
""").show()

In [ ]:
# Most expensive popular cities

spark.sql("""
    select 
        city,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by city
    order by avg(price) desc
""").filter("ct > 25").show()

Step 2: Define Continous and Categorical Features


In [ ]:
continuous_features = ["bathrooms", \
                       "bedrooms", \
                       "security_deposit", \
                       "cleaning_fee", \
                       "extra_people", \
                       "number_of_reviews", \
                       "square_feet", \
                       "review_scores_rating"]

categorical_features = ["room_type", \
                        "host_is_super_host", \
                        "cancellation_policy", \
                        "instant_bookable", \
                        "state"]

Step 3: Split Data into Training and Validation


In [ ]:
[training_dataset, validation_dataset] = df_final.randomSplit([0.8, 0.2])

Step 4: Continous Feature Pipeline


In [ ]:
continuous_feature_assembler = VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")

continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features", \
                                           withStd=True, withMean=False)

Step 5: Categorical Feature Pipeline


In [ ]:
categorical_feature_indexers = [StringIndexer(inputCol=x, \
                                              outputCol="{}_index".format(x)) \
                                for x in categorical_features]

categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), \
                                                      outputCol="oh_encoder_{}".format(x.getOutputCol() )) \
                                        for x in categorical_feature_indexers]

Step 6: Assemble our Features and Feature Pipeline


In [ ]:
feature_cols_lr = [x.getOutputCol() \
                   for x in categorical_feature_one_hot_encoders]
feature_cols_lr.append("scaled_continuous_features")

feature_assembler_lr = VectorAssembler(inputCols=feature_cols_lr, \
                                       outputCol="features_lr")

Step 7: Train a Linear Regression Model


In [ ]:
linear_regression = LinearRegression(featuresCol="features_lr", \
                                     labelCol="price", \
                                     predictionCol="price_prediction", \
                                     maxIter=10, \
                                     regParam=0.3, \
                                     elasticNetParam=0.8)

estimators_lr = \
  [continuous_feature_assembler, continuous_feature_scaler] \
  + categorical_feature_indexers + categorical_feature_one_hot_encoders \
  + [feature_assembler_lr] + [linear_regression]

pipeline = Pipeline(stages=estimators_lr)

pipeline_model = pipeline.fit(training_dataset)

print(pipeline_model)

Step 8: Convert PipelineModel to PMML


In [ ]:
from jpmml import toPMMLBytes

pmmlBytes = toPMMLBytes(spark, training_dataset, pipeline_model)

print(pmmlBytes.decode("utf-8"))

Push PMML to Live, Running Spark ML Model Server (Mutable)


In [ ]:
import urllib.request

update_url = 'http://prediction-pmml-aws.demo.pipeline.io/update-pmml/pmml_airbnb'

update_headers = {}
update_headers['Content-type'] = 'application/xml'

req = urllib.request.Request(update_url, \
                             headers=update_headers, \
                             data=pmmlBytes)

resp = urllib.request.urlopen(req)

print(resp.status) # Should return Http Status 200

In [ ]:
import urllib.request

update_url = 'http://prediction-pmml-gcp.demo.pipeline.io/update-pmml/pmml_airbnb'

update_headers = {}
update_headers['Content-type'] = 'application/xml'

req = urllib.request.Request(update_url, \
                             headers=update_headers, \
                             data=pmmlBytes)

resp = urllib.request.urlopen(req)

print(resp.status) # Should return Http Status 200

In [ ]:
import urllib.request

update_url = 'http://prediction-pmml-azure.demo.pipeline.io/update-pmml/pmml_airbnb'

update_headers = {}
update_headers['Content-type'] = 'application/xml'

req = urllib.request.Request(update_url, \
                             headers=update_headers, \
                             data=pmmlBytes)

resp = urllib.request.urlopen(req)

print(resp.status) # Should return Http Status 200

In [ ]:
import urllib.parse
import json

evaluate_url = 'http://prediction-pmml-aws.demo.pipeline.io/evaluate-pmml/pmml_airbnb'

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'

input_params = '{"bathrooms":2.0, \
                 "bedrooms":2.0, \
                 "security_deposit":175.00, \
                 "cleaning_fee":25.0, \
                 "extra_people":1.0, \
                 "number_of_reviews": 2.0, \
                 "square_feet": 250.0, \
                 "review_scores_rating": 2.0, \
                 "room_type": "Entire home/apt", \
                 "host_is_super_host": "0.0", \
                 "cancellation_policy": "flexible", \
                 "instant_bookable": "1.0", \
                 "state": "CA"}' 
encoded_input_params = input_params.encode('utf-8')

req = urllib.request.Request(evaluate_url, \
                             headers=evaluate_headers, \
                             data=encoded_input_params)

resp = urllib.request.urlopen(req)

print(resp.read())

In [ ]:
import urllib.parse
import json

evaluate_url = 'http://prediction-pmml-gcp.demo.pipeline.io/evaluate-pmml/pmml_airbnb'

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'

input_params = '{"bathrooms":2.0, \
                 "bedrooms":2.0, \
                 "security_deposit":175.00, \
                 "cleaning_fee":25.0, \
                 "extra_people":1.0, \
                 "number_of_reviews": 2.0, \
                 "square_feet": 250.0, \
                 "review_scores_rating": 2.0, \
                 "room_type": "Entire home/apt", \
                 "host_is_super_host": "0.0", \
                 "cancellation_policy": "flexible", \
                 "instant_bookable": "1.0", \
                 "state": "CA"}' 
encoded_input_params = input_params.encode('utf-8')

req = urllib.request.Request(evaluate_url, \
                             headers=evaluate_headers, \
                             data=encoded_input_params)

resp = urllib.request.urlopen(req)

print(resp.read())

In [ ]:
import urllib.parse
import json

evaluate_url = 'http://prediction-pmml-azure.demo.pipeline.io/evaluate-pmml/pmml_airbnb'

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'

input_params = '{"bathrooms":2.0, \
                 "bedrooms":2.0, \
                 "security_deposit":175.00, \
                 "cleaning_fee":25.0, \
                 "extra_people":1.0, \
                 "number_of_reviews": 2.0, \
                 "square_feet": 250.0, \
                 "review_scores_rating": 2.0, \
                 "room_type": "Entire home/apt", \
                 "host_is_super_host": "0.0", \
                 "cancellation_policy": "flexible", \
                 "instant_bookable": "1.0", \
                 "state": "CA"}' 
encoded_input_params = input_params.encode('utf-8')

req = urllib.request.Request(evaluate_url, \
                             headers=evaluate_headers, \
                             data=encoded_input_params)

resp = urllib.request.urlopen(req)

print(resp.read())

Deploy Java-based Model (Simple Model, Mutable Deploy)


In [ ]:
from urllib import request

sourceBytes = '                                                      \n\
  private String str;                                                \n\
                                                                     \n\
  public void initialize(Map<String, Object> args) {                 \n\
  }                                                                  \n\
                                                                     \n\
  public Object predict(Map<String, Object> inputs) {                \n\
      String id = (String)inputs.get("id");                          \n\
                                                                     \n\
      return id.equals("21619");                                     \n\
  }                                                                  \n\
'.encode('utf-8')

In [ ]:
from urllib import request

name = 'codegen_equals'
update_url = 'http://prediction-codegen-aws.demo.pipeline.io/update-codegen/%s/' % name

update_headers = {}
update_headers['Content-type'] = 'text/plain'

req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)

generated_code = resp.read()
print(generated_code.decode('utf-8'))

In [ ]:
from urllib import request

name = 'codegen_equals'
update_url = 'http://prediction-codegen-gcp.demo.pipeline.io/update-codegen/%s/' % name

update_headers = {}
update_headers['Content-type'] = 'text/plain'

req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)

generated_code = resp.read()
print(generated_code.decode('utf-8'))

In [ ]:
from urllib import request

name = 'codegen_equals'
update_url = 'http://prediction-codegen-azure.demo.pipeline.io/update-codegen/%s/' % name

update_headers = {}
update_headers['Content-type'] = 'text/plain'

req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)

generated_code = resp.read()
print(generated_code.decode('utf-8'))

In [ ]:
from urllib import request

name = 'codegen_equals'
evaluate_url = 'http://prediction-codegen-aws.demo.pipeline.io/evaluate-codegen/%s' % name

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21618"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return true

In [ ]:
from urllib import request

name = 'codegen_equals'
evaluate_url = 'http://prediction-codegen-gcp.demo.pipeline.io/evaluate-codegen/%s' % name

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21619"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return false

In [ ]:
from urllib import request

name = 'codegen_equals'
evaluate_url = 'http://prediction-codegen-azure.demo.pipeline.io/evaluate-codegen/%s' % name

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21619"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return false

Deploy Java Model (HttpClient Model, Mutable Deploy)


In [ ]:
from urllib import request

sourceBytes = '                                                         \n\
  public Map<String, Object> data = new HashMap<String, Object>();      \n\
                                                                        \n\
  public void initialize(Map<String, Object> args) {                    \n\
    data.put("url", "http://demo.pipeline.io:9040/prediction/");        \n\
  }                                                                     \n\
                                                                        \n\
  public Object predict(Map<String, Object> inputs) {                   \n\
    try {                                                               \n\
      String userId = (String)inputs.get("userId");                     \n\
      String itemId = (String)inputs.get("itemId");                     \n\
      String url = data.get("url") + "/" + userId + "/" + itemId;       \n\
                                                                        \n\
      return org.apache.http.client.fluent.Request                      \n\
        .Get(url)                                                       \n\
        .execute()                                                      \n\
        .returnContent();                                               \n\
                                                                        \n\
    } catch(Exception exc) {                                            \n\
      System.out.println(exc);                                          \n\
      throw exc;                                                        \n\
    }                                                                   \n\
  }                                                                     \n\
'.encode('utf-8')

In [ ]:
from urllib import request

name = 'codegen_httpclient'
# Note:  Must have trailing '/'
update_url = 'http://prediction-codegen-aws.demo.pipeline.io/update-codegen/%s/' % name

update_headers = {}
update_headers['Content-type'] = 'text/plain'

req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)

print(resp.status) # Should return Http Status 200 
generated_code = resp.read()
print(generated_code.decode('utf-8'))

In [ ]:
from urllib import request

name = 'codegen_httpclient'
# Note:  Must have trailing '/'
update_url = 'http://prediction-codegen-gcp.demo.pipeline.io/update-codegen/%s/' % name

update_headers = {}
update_headers['Content-type'] = 'text/plain'

req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)

print(resp.status) # Should return Http Status 200 
generated_code = resp.read()
print(generated_code.decode('utf-8'))

In [ ]:
from urllib import request

name = 'codegen_httpclient'
# Note:  Must have trailing '/'
update_url = 'http://prediction-codegen-azure.demo.pipeline.io/update-codegen/%s/' % name

update_headers = {}
update_headers['Content-type'] = 'text/plain'

req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)

print(resp.status) # Should return Http Status 200 
generated_code = resp.read()
print(generated_code.decode('utf-8'))

In [ ]:
from urllib import request

name = 'codegen_httpclient'
evaluate_url = 'http://prediction-codegen-aws.demo.pipeline.io/evaluate-codegen/%s' % name

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"userId":"21619", "itemId":"10006"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return float

In [ ]:
from urllib import request

name = 'codegen_httpclient'
evaluate_url = 'http://prediction-codegen-gcp.demo.pipeline.io/evaluate-codegen/%s' % name

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"userId":"21619", "itemId":"10006"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return float

In [ ]:
from urllib import request

name = 'codegen_httpclient'
evaluate_url = 'http://prediction-codegen-azure.demo.pipeline.io/evaluate-codegen/%s' % name

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"userId":"21619", "itemId":"10006"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return float

Load Test and Compare Cloud Providers (AWS, Google, Azure)

Monitor Performance Across Cloud Providers

NetflixOSS Services Dashboard (Hystrix)


In [ ]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.display import clear_output, Image, display, HTML

html = '<iframe width=100% height=500px src="http://hystrix.demo.pipeline.io/hystrix-dashboard/monitor/monitor.html?streams=%5B%7B%22name%22%3A%22Model%20Servers%20AWS%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-aws.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22Model%20Servers%20GCP%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-gcp.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22Model%20Servers%20Azure%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-azure.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%5D">'
display(HTML(html))

Start Load Tests

Run JMeter Tests from Local Laptop (Limited by Laptop)

Run Headless JMeter Tests from Training Clusters in Cloud


In [ ]:
# Spark ML - PMML - Airbnb
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-airbnb-rc.yaml
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-gcp-airbnb-rc.yaml

In [ ]:
# Codegen - Java - Simple
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-equals-rc.yaml
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-gcp-equals-rc.yaml

In [1]:
# Tensorflow AI - Tensorflow Serving - Simple 
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-minimal-rc.yaml
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-gcp-minimal-rc.yaml


replicationcontroller "loadtest-aws-minimal" created
replicationcontroller "loadtest-gcp-minimal" created

End Load Tests


In [ ]:
!kubectl delete --context=awsdemo rc loadtest-aws-airbnb
!kubectl delete --context=awsdemo rc loadtest-gcp-airbnb
!kubectl delete --context=awsdemo rc loadtest-aws-equals
!kubectl delete --context=awsdemo rc loadtest-gcp-equals
!kubectl delete --context=awsdemo rc loadtest-aws-minimal
!kubectl delete --context=awsdemo rc loadtest-gcp-minimal

Rolling Deploy Tensorflow AI (Simple Model, Immutable Deploy)

Kubernetes CLI


In [ ]:
!kubectl rolling-update prediction-tensorflow --context=awsdemo --image-pull-policy=Always --image=fluxcapacitor/prediction-tensorflow

In [ ]:
!kubectl get pod --context=awsdemo

In [ ]:
!kubectl rolling-update prediction-tensorflow --context=gcpdemo --image-pull-policy=Always --image=fluxcapacitor/prediction-tensorflow

In [ ]:
!kubectl get pod --context=gcpdemo

7. Q&A