Who Am I?

Chris Fregly

Research Scientist, Founder @ PipelineIO

Video Series Author "High Performance Tensorflow in Production" @ OReilly (Coming Soon)

Founder @ Advanced Spark and Tensorflow Meetup

Github Repo

DockerHub Repo

Slideshare

YouTube

Who Was I?

Software Engineer @ Netflix, Databricks, IBM Spark Tech Center

Types of Model Deployments

KeyValue

ie. Recommendations

In-memory: Redis, Memcache

On-disk: Cassandra, RocksDB

First-class Servable in Tensorflow Serving

PMML

It's Useful and Well-Supported

Apple, Cisco, Airbnb, HomeAway, etc

Please Don't Re-build It - Reduce Your Technical Debt!

Native Code Generation (CPU and GPU)

Hand-coded (Python + Pickling)

Generate Java Code from PMML?

Tensorflow Models

freeze_graph.py: Combine Tensorflow Graph (Static) with Trained Weights (Checkpoints) into Single Deployable Model

Demos!!


In [ ]:
# You may need to Reconnect (more than Restart) the Kernel to pick up changes to these sett
import os

master = '--master spark://spark-master-2-1-0:7077'
conf = '--conf spark.cores.max=1 --conf spark.executor.memory=512m'
packages = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1'
jars = '--jars /root/lib/jpmml-sparkml-package-1.0-SNAPSHOT.jar'
py_files = '--py-files /root/lib/jpmml.py'

os.environ['PYSPARK_SUBMIT_ARGS'] = master \
  + ' ' + conf \
  + ' ' + packages \
  + ' ' + jars \
  + ' ' + py_files \
  + ' ' + 'pyspark-shell'

print(os.environ['PYSPARK_SUBMIT_ARGS'])

Deploy Spark ML Models


In [ ]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression

In [ ]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Step 0: Load Libraries and Data


In [ ]:
df = spark.read.format("csv") \
  .option("inferSchema", "true").option("header", "true") \
  .load("s3a://datapalooza/airbnb/airbnb.csv.bz2")

df.registerTempTable("df")

print(df.head())

In [ ]:
print(df.count())

Step 1: Clean, Filter, and Summarize the Data


In [ ]:
df_filtered = df.filter("price >= 50 AND price <= 750 AND bathrooms > 0.0 AND bedrooms is not null")

df_filtered.registerTempTable("df_filtered")

df_final = spark.sql("""
    select
        id,
        city,
        case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
            then state
            else 'Other'
        end as state,
        space,
        cast(price as double) as price,
        cast(bathrooms as double) as bathrooms,
        cast(bedrooms as double) as bedrooms,
        room_type,
        host_is_super_host,
        cancellation_policy,
        cast(case when security_deposit is null
            then 0.0
            else security_deposit
        end as double) as security_deposit,
        price_per_bedroom,
        cast(case when number_of_reviews is null
            then 0.0
            else number_of_reviews
        end as double) as number_of_reviews,
        cast(case when extra_people is null
            then 0.0
            else extra_people
        end as double) as extra_people,
        instant_bookable,
        cast(case when cleaning_fee is null
            then 0.0
            else cleaning_fee
        end as double) as cleaning_fee,
        cast(case when review_scores_rating is null
            then 80.0
            else review_scores_rating
        end as double) as review_scores_rating,
        cast(case when square_feet is not null and square_feet > 100
            then square_feet
            when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
            then 350.0
            else 380 * bedrooms
        end as double) as square_feet
    from df_filtered
""").persist()

df_final.registerTempTable("df_final")

df_final.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()

In [ ]:
print(df_final.count())

In [ ]:
print(df_final.schema)

In [ ]:
# Most popular cities

spark.sql("""
    select 
        state,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by state
    order by count(*) desc
""").show()

In [ ]:
# Most expensive popular cities

spark.sql("""
    select 
        city,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by city
    order by avg(price) desc
""").filter("ct > 25").show()

Step 2: Define Continous and Categorical Features


In [ ]:
continuous_features = ["bathrooms", \
                       "bedrooms", \
                       "security_deposit", \
                       "cleaning_fee", \
                       "extra_people", \
                       "number_of_reviews", \
                       "square_feet", \
                       "review_scores_rating"]

categorical_features = ["room_type", \
                        "host_is_super_host", \
                        "cancellation_policy", \
                        "instant_bookable", \
                        "state"]

Step 3: Split Data into Training and Validation


In [ ]:
[training_dataset, validation_dataset] = df_final.randomSplit([0.8, 0.2])

Step 4: Continous Feature Pipeline


In [ ]:
continuous_feature_assembler = VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")

continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features", \
                                           withStd=True, withMean=False)

Step 5: Categorical Feature Pipeline


In [ ]:
categorical_feature_indexers = [StringIndexer(inputCol=x, \
                                              outputCol="{}_index".format(x)) \
                                for x in categorical_features]

categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), \
                                                      outputCol="oh_encoder_{}".format(x.getOutputCol() )) \
                                        for x in categorical_feature_indexers]

Step 6: Assemble our Features and Feature Pipeline


In [ ]:
feature_cols_lr = [x.getOutputCol() \
                   for x in categorical_feature_one_hot_encoders]
feature_cols_lr.append("scaled_continuous_features")

feature_assembler_lr = VectorAssembler(inputCols=feature_cols_lr, \
                                       outputCol="features_lr")

Step 7: Train a Linear Regression Model


In [ ]:
linear_regression = LinearRegression(featuresCol="features_lr", \
                                     labelCol="price", \
                                     predictionCol="price_prediction", \
                                     maxIter=10, \
                                     regParam=0.3, \
                                     elasticNetParam=0.8)

estimators_lr = \
  [continuous_feature_assembler, continuous_feature_scaler] \
  + categorical_feature_indexers + categorical_feature_one_hot_encoders \
  + [feature_assembler_lr] + [linear_regression]

pipeline = Pipeline(stages=estimators_lr)

pipeline_model = pipeline.fit(training_dataset)

print(pipeline_model)

Step 8: Serialize PipelineModel


In [ ]:
from jpmml import toPMMLBytes

model_bytes = toPMMLBytes(spark, training_dataset, pipeline_model)

print(pmmlBytes.decode("utf-8"))

Step 9: Push Model to Live, Running Spark ML Model Server (Mutable)


In [ ]:
import urllib.request

namespace = 'default'
model_name = 'airbnb'
version = '1'
update_url = 'http://prediction-pmml-aws.demo.pipeline.io/update-pmml-model/%s/%s/%s' % (namespace, model_name, version)

update_headers = {}
update_headers['Content-type'] = 'application/xml'

req = urllib.request.Request(update_url, \
                             headers=update_headers, \
                             data=model_bytes)

resp = urllib.request.urlopen(req)

print(resp.status) # Should return Http Status 200

Step 10: Evalute Model


In [ ]:
import urllib.parse
import json

namespace = 'default'
model_name = 'airbnb'
version = '1'
evaluate_url = 'http://prediction-pmml-aws.demo.pipeline.io/evaluate-pmml-model/%s/%s/%s' % (namespace, model_name, version)

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'

input_params = '{"bathrooms":5.0, \
                 "bedrooms":4.0, \
                 "security_deposit":175.00, \
                 "cleaning_fee":25.0, \
                 "extra_people":1.0, \
                 "number_of_reviews": 2.0, \
                 "square_feet": 250.0, \
                 "review_scores_rating": 2.0, \
                 "room_type": "Entire home/apt", \
                 "host_is_super_host": "0.0", \
                 "cancellation_policy": "flexible", \
                 "instant_bookable": "1.0", \
                 "state": "CA"}' 
encoded_input_params = input_params.encode('utf-8')

req = urllib.request.Request(evaluate_url, \
                             headers=evaluate_headers, \
                             data=encoded_input_params)

resp = urllib.request.urlopen(req)

print(resp.read())

Bonus Demos!

Deploy Java-based Model

Create Java-based Model


In [1]:
from urllib import request

sourceBytes = '                                                      \n\
  private String str;                                                \n\
                                                                     \n\
  public void initialize(Map<String, Object> args) {                 \n\
  }                                                                  \n\
                                                                     \n\
  public Object predict(Map<String, Object> inputs) {                \n\
      String id = (String)inputs.get("id");                          \n\
                                                                     \n\
      return id.equals("21619");                                     \n\
  }                                                                  \n\
'.encode('utf-8')

Deploy Java-based Model


In [4]:
from urllib import request

namespace = 'default'
model_name = 'java_equals'
version = '1'

update_url = 'http://prediction-java-aws.demo.pipeline.io/update-java/%s/%s/%s' % (namespace, model_name, version)

update_headers = {}
update_headers['Content-type'] = 'text/plain'

req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)

generated_code = resp.read()
print(generated_code.decode('utf-8'))


/* 001 */
/* 002 */ private String str;
/* 003 */
/* 004 */ public void initialize(Map<String, Object> args) {
/* 005 */ }
/* 006 */
/* 007 */ public Object predict(Map<String, Object> inputs) {
/* 008 */   String id = (String)inputs.get("id");
/* 009 */
/* 010 */   return id.equals("21619");
/* 011 */ }

Evaluate Java-based Model


In [5]:
from urllib import request

namespace = 'default'
model_name = 'java_equals'
version = '1'

evaluate_url = 'http://prediction-java-aws.demo.pipeline.io/evaluate-java/%s/%s/%s' % (namespace, model_name, version)

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21618"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return false


b'false'

In [6]:
from urllib import request

namespace = 'default'
model_name = 'java_equals'
version = '1'
evaluate_url = 'http://prediction-java-aws.demo.pipeline.io/evaluate-java/%s/%s/%s' % (namespace, model_name, version)

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21619"}' 
encoded_input_params = input_params.encode('utf-8')

req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)

print(resp.read()) # Should return true


b'true'

Deploy Scikit-Learn Model


In [ ]:
!pip install sklearn_pandas
!pip install git+https://github.com/jpmml/sklearn2pmml.git

Create Scikit-Learn Model


In [9]:
import pandas as pd
import numpy as np
import urllib.request
import urllib.parse
import json

from sklearn.datasets import load_diabetes,load_iris
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error as mse, r2_score
from sklearn2pmml import PMMLPipeline
from sklearn.tree import DecisionTreeClassifier
from sklearn2pmml import sklearn2pmml

iris = load_iris()
iris_df = pd.DataFrame(iris.data,columns=iris.feature_names)
iris_df['Species'] = iris.target
iris_pipeline = PMMLPipeline([
    ("classifier", DecisionTreeClassifier())
])
iris_pipeline.fit(iris_df[iris_df.columns.difference(["Species"])], iris_df["Species"])


Out[9]:
PMMLPipeline(steps=[('classifier', DecisionTreeClassifier(class_weight=None, criterion='gini', max_depth=None,
            max_features=None, max_leaf_nodes=None,
            min_impurity_split=1e-07, min_samples_leaf=1,
            min_samples_split=2, min_weight_fraction_leaf=0.0,
            presort=False, random_state=None, splitter='best'))])

Serialize Scikit-Learn Model


In [10]:
sklearn2pmml(iris_pipeline, "DecisionTreeIris.pmml", with_repr = True)
model_bytes = bytearray(open('DecisionTreeIris.pmml', 'rb').read())

Deploy Scikit-Learn Model


In [11]:
import urllib.request
import urllib.parse

namespace = 'default'
model_name = 'iris'
version = '1'

update_url = 'http://prediction-pmml-aws.demo.pipeline.io/update-pmml-model/%s/%s/%s' % (namespace, model_name, version)

update_headers = {}
update_headers["Content-type"] = "application/xml"

req = urllib.request.Request(update_url, headers=update_headers, data=model_bytes)

resp = urllib.request.urlopen(req)
print(resp.status)


200

In [12]:
namespace = 'default'
model_name = 'iris'
version = '1'

evaluate_url = 'http://prediction-pmml-aws.demo.pipeline.io/evaluate-pmml-model/%s/%s/%s' % (namespace, model_name, version)
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'

input_params = iris_df.ix[0,:-1].to_json()
encoded_input_params = input_params.encode('utf-8')

req = urllib.request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = urllib.request.urlopen(req)

print(resp.read())


b'{"results":[[{\'Species\': \'NodeScoreDistribution{result=0, probability_entries=[0=1.0, 1=0.0, 2=0.0], entityId=2, confidence_entries=[]}\'}]]}'

Monitoring Your Models

Netflix Microservices Dashboard (Hystrix)


In [ ]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.display import clear_output, Image, display, HTML

html = '<iframe width=1200px height=500px src="http://hystrix.demo.pipeline.io/hystrix-dashboard/monitor/monitor.html?streams=%5B%7B%22name%22%3A%22Predictions%20-%20AWS%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-aws.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22Predictions%20-%20GCP%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-gcp.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%5D">'
display(HTML(html))

Grafana + Prometheus Dashboard


In [ ]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.display import clear_output, Image, display, HTML

html = '<iframe width=1200px height=500px src="http://grafana.demo.pipeline.io">'
display(HTML(html))

Load-Test Your Model Servers

Run JMeter Tests from Local Laptop (Limited by Laptop Performance)

Run Headless JMeter Tests from Training Clusters in Cloud


In [ ]:
# Spark ML - Airbnb
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-airbnb-rc.yaml

In [ ]:
# Codegen - Java - Simple
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-equals-rc.yaml

In [ ]:
# Tensorflow AI - Tensorflow Serving - Simple 
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-minimal-rc.yaml

End Load Tests


In [ ]:
!kubectl delete --context=awsdemo rc loadtest-aws-airbnb
!kubectl delete --context=awsdemo rc loadtest-aws-equals
!kubectl delete --context=awsdemo rc loadtest-aws-minimal

Rolling Deploy


In [ ]:
!kubectl rolling-update prediction-tensorflow --context=awsdemo --image-pull-policy=Always --image=fluxcapacitor/prediction-tensorflow

PipelineIO Premium Edition

A/B and Multi-armed Bandit Testing

Continuous, Hybrid-Cloud Deployments

Online Model Training and Deploying

GPU-based Deployments


In [ ]: