ie. Recommendations
In-memory: Redis, Memcache
On-disk: Cassandra, RocksDB
First-class Servable in Tensorflow Serving
It's Useful and Well-Supported
Apple, Cisco, Airbnb, HomeAway, etc
Please Don't Re-build It - Reduce Your Technical Debt!
Hand-coded (Python + Pickling)
Generate Java Code from PMML?
freeze_graph.py: Combine Tensorflow Graph (Static) with Trained Weights (Checkpoints) into Single Deployable Model
In [ ]:
# You may need to Reconnect (more than Restart) the Kernel to pick up changes to these sett
import os
master = '--master spark://spark-master-2-1-0:7077'
conf = '--conf spark.cores.max=1 --conf spark.executor.memory=512m'
packages = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1'
jars = '--jars /root/lib/jpmml-sparkml-package-1.0-SNAPSHOT.jar'
py_files = '--py-files /root/lib/jpmml.py'
os.environ['PYSPARK_SUBMIT_ARGS'] = master \
+ ' ' + conf \
+ ' ' + packages \
+ ' ' + jars \
+ ' ' + py_files \
+ ' ' + 'pyspark-shell'
print(os.environ['PYSPARK_SUBMIT_ARGS'])
In [ ]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression
In [ ]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
In [ ]:
df = spark.read.format("csv") \
.option("inferSchema", "true").option("header", "true") \
.load("s3a://datapalooza/airbnb/airbnb.csv.bz2")
df.registerTempTable("df")
print(df.head())
In [ ]:
print(df.count())
In [ ]:
df_filtered = df.filter("price >= 50 AND price <= 750 AND bathrooms > 0.0 AND bedrooms is not null")
df_filtered.registerTempTable("df_filtered")
df_final = spark.sql("""
select
id,
city,
case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
then state
else 'Other'
end as state,
space,
cast(price as double) as price,
cast(bathrooms as double) as bathrooms,
cast(bedrooms as double) as bedrooms,
room_type,
host_is_super_host,
cancellation_policy,
cast(case when security_deposit is null
then 0.0
else security_deposit
end as double) as security_deposit,
price_per_bedroom,
cast(case when number_of_reviews is null
then 0.0
else number_of_reviews
end as double) as number_of_reviews,
cast(case when extra_people is null
then 0.0
else extra_people
end as double) as extra_people,
instant_bookable,
cast(case when cleaning_fee is null
then 0.0
else cleaning_fee
end as double) as cleaning_fee,
cast(case when review_scores_rating is null
then 80.0
else review_scores_rating
end as double) as review_scores_rating,
cast(case when square_feet is not null and square_feet > 100
then square_feet
when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
then 350.0
else 380 * bedrooms
end as double) as square_feet
from df_filtered
""").persist()
df_final.registerTempTable("df_final")
df_final.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()
In [ ]:
print(df_final.count())
In [ ]:
print(df_final.schema)
In [ ]:
# Most popular cities
spark.sql("""
select
state,
count(*) as ct,
avg(price) as avg_price,
max(price) as max_price
from df_final
group by state
order by count(*) desc
""").show()
In [ ]:
# Most expensive popular cities
spark.sql("""
select
city,
count(*) as ct,
avg(price) as avg_price,
max(price) as max_price
from df_final
group by city
order by avg(price) desc
""").filter("ct > 25").show()
In [ ]:
continuous_features = ["bathrooms", \
"bedrooms", \
"security_deposit", \
"cleaning_fee", \
"extra_people", \
"number_of_reviews", \
"square_feet", \
"review_scores_rating"]
categorical_features = ["room_type", \
"host_is_super_host", \
"cancellation_policy", \
"instant_bookable", \
"state"]
In [ ]:
[training_dataset, validation_dataset] = df_final.randomSplit([0.8, 0.2])
In [ ]:
continuous_feature_assembler = VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")
continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features", \
withStd=True, withMean=False)
In [ ]:
categorical_feature_indexers = [StringIndexer(inputCol=x, \
outputCol="{}_index".format(x)) \
for x in categorical_features]
categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), \
outputCol="oh_encoder_{}".format(x.getOutputCol() )) \
for x in categorical_feature_indexers]
In [ ]:
feature_cols_lr = [x.getOutputCol() \
for x in categorical_feature_one_hot_encoders]
feature_cols_lr.append("scaled_continuous_features")
feature_assembler_lr = VectorAssembler(inputCols=feature_cols_lr, \
outputCol="features_lr")
In [ ]:
linear_regression = LinearRegression(featuresCol="features_lr", \
labelCol="price", \
predictionCol="price_prediction", \
maxIter=10, \
regParam=0.3, \
elasticNetParam=0.8)
estimators_lr = \
[continuous_feature_assembler, continuous_feature_scaler] \
+ categorical_feature_indexers + categorical_feature_one_hot_encoders \
+ [feature_assembler_lr] + [linear_regression]
pipeline = Pipeline(stages=estimators_lr)
pipeline_model = pipeline.fit(training_dataset)
print(pipeline_model)
In [ ]:
from jpmml import toPMMLBytes
model_bytes = toPMMLBytes(spark, training_dataset, pipeline_model)
print(pmmlBytes.decode("utf-8"))
In [ ]:
import urllib.request
namespace = 'default'
model_name = 'airbnb'
version = '1'
update_url = 'http://prediction-pmml-aws.demo.pipeline.io/update-pmml-model/%s/%s/%s' % (namespace, model_name, version)
update_headers = {}
update_headers['Content-type'] = 'application/xml'
req = urllib.request.Request(update_url, \
headers=update_headers, \
data=model_bytes)
resp = urllib.request.urlopen(req)
print(resp.status) # Should return Http Status 200
In [ ]:
import urllib.parse
import json
namespace = 'default'
model_name = 'airbnb'
version = '1'
evaluate_url = 'http://prediction-pmml-aws.demo.pipeline.io/evaluate-pmml-model/%s/%s/%s' % (namespace, model_name, version)
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"bathrooms":5.0, \
"bedrooms":4.0, \
"security_deposit":175.00, \
"cleaning_fee":25.0, \
"extra_people":1.0, \
"number_of_reviews": 2.0, \
"square_feet": 250.0, \
"review_scores_rating": 2.0, \
"room_type": "Entire home/apt", \
"host_is_super_host": "0.0", \
"cancellation_policy": "flexible", \
"instant_bookable": "1.0", \
"state": "CA"}'
encoded_input_params = input_params.encode('utf-8')
req = urllib.request.Request(evaluate_url, \
headers=evaluate_headers, \
data=encoded_input_params)
resp = urllib.request.urlopen(req)
print(resp.read())
In [1]:
from urllib import request
sourceBytes = ' \n\
private String str; \n\
\n\
public void initialize(Map<String, Object> args) { \n\
} \n\
\n\
public Object predict(Map<String, Object> inputs) { \n\
String id = (String)inputs.get("id"); \n\
\n\
return id.equals("21619"); \n\
} \n\
'.encode('utf-8')
In [4]:
from urllib import request
namespace = 'default'
model_name = 'java_equals'
version = '1'
update_url = 'http://prediction-java-aws.demo.pipeline.io/update-java/%s/%s/%s' % (namespace, model_name, version)
update_headers = {}
update_headers['Content-type'] = 'text/plain'
req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)
generated_code = resp.read()
print(generated_code.decode('utf-8'))
In [5]:
from urllib import request
namespace = 'default'
model_name = 'java_equals'
version = '1'
evaluate_url = 'http://prediction-java-aws.demo.pipeline.io/evaluate-java/%s/%s/%s' % (namespace, model_name, version)
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21618"}'
encoded_input_params = input_params.encode('utf-8')
req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)
print(resp.read()) # Should return false
In [6]:
from urllib import request
namespace = 'default'
model_name = 'java_equals'
version = '1'
evaluate_url = 'http://prediction-java-aws.demo.pipeline.io/evaluate-java/%s/%s/%s' % (namespace, model_name, version)
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21619"}'
encoded_input_params = input_params.encode('utf-8')
req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)
print(resp.read()) # Should return true
In [ ]:
!pip install sklearn_pandas
!pip install git+https://github.com/jpmml/sklearn2pmml.git
In [9]:
import pandas as pd
import numpy as np
import urllib.request
import urllib.parse
import json
from sklearn.datasets import load_diabetes,load_iris
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error as mse, r2_score
from sklearn2pmml import PMMLPipeline
from sklearn.tree import DecisionTreeClassifier
from sklearn2pmml import sklearn2pmml
iris = load_iris()
iris_df = pd.DataFrame(iris.data,columns=iris.feature_names)
iris_df['Species'] = iris.target
iris_pipeline = PMMLPipeline([
("classifier", DecisionTreeClassifier())
])
iris_pipeline.fit(iris_df[iris_df.columns.difference(["Species"])], iris_df["Species"])
Out[9]:
In [10]:
sklearn2pmml(iris_pipeline, "DecisionTreeIris.pmml", with_repr = True)
model_bytes = bytearray(open('DecisionTreeIris.pmml', 'rb').read())
In [11]:
import urllib.request
import urllib.parse
namespace = 'default'
model_name = 'iris'
version = '1'
update_url = 'http://prediction-pmml-aws.demo.pipeline.io/update-pmml-model/%s/%s/%s' % (namespace, model_name, version)
update_headers = {}
update_headers["Content-type"] = "application/xml"
req = urllib.request.Request(update_url, headers=update_headers, data=model_bytes)
resp = urllib.request.urlopen(req)
print(resp.status)
In [12]:
namespace = 'default'
model_name = 'iris'
version = '1'
evaluate_url = 'http://prediction-pmml-aws.demo.pipeline.io/evaluate-pmml-model/%s/%s/%s' % (namespace, model_name, version)
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = iris_df.ix[0,:-1].to_json()
encoded_input_params = input_params.encode('utf-8')
req = urllib.request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = urllib.request.urlopen(req)
print(resp.read())
In [ ]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
from IPython.display import clear_output, Image, display, HTML
html = '<iframe width=1200px height=500px src="http://hystrix.demo.pipeline.io/hystrix-dashboard/monitor/monitor.html?streams=%5B%7B%22name%22%3A%22Predictions%20-%20AWS%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-aws.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22Predictions%20-%20GCP%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-gcp.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%5D">'
display(HTML(html))
In [ ]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
from IPython.display import clear_output, Image, display, HTML
html = '<iframe width=1200px height=500px src="http://grafana.demo.pipeline.io">'
display(HTML(html))
In [ ]:
# Spark ML - Airbnb
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-airbnb-rc.yaml
In [ ]:
# Codegen - Java - Simple
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-equals-rc.yaml
In [ ]:
# Tensorflow AI - Tensorflow Serving - Simple
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-minimal-rc.yaml
In [ ]:
!kubectl delete --context=awsdemo rc loadtest-aws-airbnb
!kubectl delete --context=awsdemo rc loadtest-aws-equals
!kubectl delete --context=awsdemo rc loadtest-aws-minimal
In [ ]:
!kubectl rolling-update prediction-tensorflow --context=awsdemo --image-pull-policy=Always --image=fluxcapacitor/prediction-tensorflow
In [ ]: