Images, Containers
Useful Docker Image: AWS + GPU + Docker + Tensorflow + Spark
Container Orchestration Across Clusters
Kubernetes Cluster Visualization
What We're Using Here for Everything!
Invoke Any Type of Workflow on Any Type of Schedule
Commit New Model to Github, Airflow Workflow Triggered for Continuous Deployment
Maintains Docker Images
Not Just for Code, Also for ML/AI Models!
Deploy and Compare New Model Alongside Existing
Not Just System Metrics, ML/AI Model Prediction Metrics
NetflixOSS-based
Prometheus
Grafana
Elasticsearch
Training/Admin Cluster
Prediction Cluster
AWS, Google Cloud, and Azure
ie. Recommendations
In-memory: Redis, Memcache
On-disk: Cassandra, RocksDB
First-class Servable in Tensorflow Serving
It's Useful and Well-Supported
Apple, Cisco, Airbnb, HomeAway, etc
Please Don't Re-build It - Reduce Your Technical Debt!
Hand-coded (Python + Pickling)
Generate Java Code from PMML?
freeze_graph.py: Combine Tensorflow Graph (Static) with Trained Weights (Checkpoints) into Single Deployable Model
optimize_graph_for_inference.py
Compiles 3 graph operations (input, operation, output) into 1 operation
Removes need for Tensorflow Runtime (20 MB is significant on tiny devices)
Allows new backends for hardware-specific optimizations (better portability)
Convert Graph into executable code
Convert ensembles or other complex models into smaller models
Re-score training data with output of model being distilled
Train smaller model to produce same output
Output of smaller model learns more information than original label
Option 1: Add more Tensorflow Serving servers behind load balancer
Option 2: Enable request batching in each Tensorflow Serving
Option Trade-offs: Higher Latency (bad) for Higher Throughput (good)
$TENSORFLOW_SERVING_HOME/bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server
--port=9000
--model_name=tensorflow_minimal
--model_base_path=/root/models/tensorflow_minimal/export
--enable_batching=true
--max_batch_size=1000000
--batch_timeout_micros=10000
--max_enqueued_batches=1000000
The deeper the model, the longer the latency
Start inference in parallel where possible (ie. user inference in parallel with item inference)
Pre-load common inputs from database (ie. user attributes, item attributes)
Pre-compute/partial-compute common inputs (ie. popular word embeddings)
Word embeddings are huge!
Use hashId for each word
Off-load embedding matrices to parameter server and share between serving servers
In [ ]:
import numpy as np
import os
import tensorflow as tf
from tensorflow.contrib.session_bundle import exporter
import time
In [ ]:
# make things wide
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
from IPython.display import clear_output, Image, display, HTML
def strip_consts(graph_def, max_const_size=32):
"""Strip large constant values from graph_def."""
strip_def = tf.GraphDef()
for n0 in graph_def.node:
n = strip_def.node.add()
n.MergeFrom(n0)
if n.op == 'Const':
tensor = n.attr['value'].tensor
size = len(tensor.tensor_content)
if size > max_const_size:
tensor.tensor_content = "<stripped %d bytes>"%size
return strip_def
def show_graph(graph_def=None, width=1200, height=800, max_const_size=32, ungroup_gradients=False):
if not graph_def:
graph_def = tf.get_default_graph().as_graph_def()
"""Visualize TensorFlow graph."""
if hasattr(graph_def, 'as_graph_def'):
graph_def = graph_def.as_graph_def()
strip_def = strip_consts(graph_def, max_const_size=max_const_size)
data = str(strip_def)
if ungroup_gradients:
data = data.replace('"gradients/', '"b_')
#print(data)
code = """
<script>
function load() {{
document.getElementById("{id}").pbtxt = {data};
}}
</script>
<link rel="import" href="https://tensorboard.appspot.com/tf-graph-basic.build.html" onload=load()>
<div style="height:600px">
<tf-graph-basic id="{id}"></tf-graph-basic>
</div>
""".format(data=repr(data), id='graph'+str(np.random.rand()))
iframe = """
<iframe seamless style="width:{}px;height:{}px;border:0" srcdoc="{}"></iframe>
""".format(width, height, code.replace('"', '"'))
display(HTML(iframe))
In [ ]:
# If this errors out, increment the `export_version` variable, restart the Kernel, and re-run
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer("batch_size", 10, "The batch size to train")
flags.DEFINE_integer("epoch_number", 10, "Number of epochs to run trainer")
flags.DEFINE_integer("steps_to_validate", 1,
"Steps to validate and print loss")
flags.DEFINE_string("checkpoint_dir", "./checkpoint/",
"indicates the checkpoint dirctory")
#flags.DEFINE_string("model_path", "./model/", "The export path of the model")
flags.DEFINE_string("model_path", "/root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/", "The export path of the model")
flags.DEFINE_integer("export_version", 37, "The version number of the model")
In [ ]:
# If this errors out, increment the `export_version` variable, restart the Kernel, and re-run
def main():
# Define training data
x = np.ones(FLAGS.batch_size)
y = np.ones(FLAGS.batch_size)
# Define the model
X = tf.placeholder(tf.float32, shape=[None], name="X")
Y = tf.placeholder(tf.float32, shape=[None], name="yhat")
w = tf.Variable(1.0, name="weight")
b = tf.Variable(1.0, name="bias")
loss = tf.square(Y - tf.mul(X, w) - b)
train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)
predict_op = tf.mul(X, w) + b
saver = tf.train.Saver()
checkpoint_dir = FLAGS.checkpoint_dir
checkpoint_file = checkpoint_dir + "/checkpoint.ckpt"
if not os.path.exists(checkpoint_dir):
os.makedirs(checkpoint_dir)
# Start the session
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
if ckpt and ckpt.model_checkpoint_path:
print("Continue training from the model {}".format(ckpt.model_checkpoint_path))
saver.restore(sess, ckpt.model_checkpoint_path)
saver_def = saver.as_saver_def()
print(saver_def.filename_tensor_name)
print(saver_def.restore_op_name)
# Start training
start_time = time.time()
for epoch in range(FLAGS.epoch_number):
sess.run(train_op, feed_dict={X: x, Y: y})
# Start validating
if epoch % FLAGS.steps_to_validate == 0:
end_time = time.time()
print("[{}] Epoch: {}".format(end_time - start_time, epoch))
saver.save(sess, checkpoint_file)
tf.train.write_graph(sess.graph_def, checkpoint_dir, 'trained_model.pb', as_text=False)
tf.train.write_graph(sess.graph_def, checkpoint_dir, 'trained_model.txt', as_text=True)
start_time = end_time
# Print model variables
w_value, b_value = sess.run([w, b])
print("The model of w: {}, b: {}".format(w_value, b_value))
# Export the model
print("Exporting trained model to {}".format(FLAGS.model_path))
model_exporter = exporter.Exporter(saver)
model_exporter.init(
sess.graph.as_graph_def(),
named_graph_signatures={
'inputs': exporter.generic_signature({"features": X}),
'outputs': exporter.generic_signature({"prediction": predict_op})
})
model_exporter.export(FLAGS.model_path, tf.constant(FLAGS.export_version), sess)
print('Done exporting!')
if __name__ == "__main__":
main()
In [ ]:
show_graph()
Commit Model to Github
In [ ]:
!ls -l /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export
In [ ]:
!ls -l /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/00000037
In [ ]:
!git status
In [ ]:
!git add --all /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/00000037/
In [ ]:
!git status
In [ ]:
!git commit -m "updated tensorflow model"
In [ ]:
# If this fails with "Permission denied", use terminal within jupyter to manually `git push`
!git push
Airflow Workflow Deploys New Model through Github Post-Commit Webhook to Triggers
In [ ]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
from IPython.display import clear_output, Image, display, HTML
html = '<iframe width=100% height=500px src="http://demo.pipeline.io:8080/admin">'
display(HTML(html))
Kubernetes CLI
In [ ]:
!kubectl scale --context=awsdemo --replicas=2 rc spark-worker-2-0-1
In [ ]:
!kubectl get pod --context=awsdemo
Weavescope Kubernetes AWS Cluster Visualization
In [ ]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
from IPython.display import clear_output, Image, display, HTML
html = '<iframe width=100% height=500px src="http://kubernetes-aws.demo.pipeline.io">'
display(HTML(html))
In [ ]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression
In [ ]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Step 0: Load Libraries and Data
In [ ]:
df = spark.read.format("csv") \
.option("inferSchema", "true").option("header", "true") \
.load("s3a://datapalooza/airbnb/airbnb.csv.bz2")
df.registerTempTable("df")
print(df.head())
In [ ]:
print(df.count())
Step 1: Clean, Filter, and Summarize the Data
In [ ]:
df_filtered = df.filter("price >= 50 AND price <= 750 AND bathrooms > 0.0 AND bedrooms is not null")
df_filtered.registerTempTable("df_filtered")
df_final = spark.sql("""
select
id,
city,
case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
then state
else 'Other'
end as state,
space,
cast(price as double) as price,
cast(bathrooms as double) as bathrooms,
cast(bedrooms as double) as bedrooms,
room_type,
host_is_super_host,
cancellation_policy,
cast(case when security_deposit is null
then 0.0
else security_deposit
end as double) as security_deposit,
price_per_bedroom,
cast(case when number_of_reviews is null
then 0.0
else number_of_reviews
end as double) as number_of_reviews,
cast(case when extra_people is null
then 0.0
else extra_people
end as double) as extra_people,
instant_bookable,
cast(case when cleaning_fee is null
then 0.0
else cleaning_fee
end as double) as cleaning_fee,
cast(case when review_scores_rating is null
then 80.0
else review_scores_rating
end as double) as review_scores_rating,
cast(case when square_feet is not null and square_feet > 100
then square_feet
when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
then 350.0
else 380 * bedrooms
end as double) as square_feet
from df_filtered
""").persist()
df_final.registerTempTable("df_final")
df_final.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()
In [ ]:
print(df_final.count())
In [ ]:
print(df_final.schema)
In [ ]:
# Most popular cities
spark.sql("""
select
state,
count(*) as ct,
avg(price) as avg_price,
max(price) as max_price
from df_final
group by state
order by count(*) desc
""").show()
In [ ]:
# Most expensive popular cities
spark.sql("""
select
city,
count(*) as ct,
avg(price) as avg_price,
max(price) as max_price
from df_final
group by city
order by avg(price) desc
""").filter("ct > 25").show()
Step 2: Define Continous and Categorical Features
In [ ]:
continuous_features = ["bathrooms", \
"bedrooms", \
"security_deposit", \
"cleaning_fee", \
"extra_people", \
"number_of_reviews", \
"square_feet", \
"review_scores_rating"]
categorical_features = ["room_type", \
"host_is_super_host", \
"cancellation_policy", \
"instant_bookable", \
"state"]
Step 3: Split Data into Training and Validation
In [ ]:
[training_dataset, validation_dataset] = df_final.randomSplit([0.8, 0.2])
Step 4: Continous Feature Pipeline
In [ ]:
continuous_feature_assembler = VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")
continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features", \
withStd=True, withMean=False)
Step 5: Categorical Feature Pipeline
In [ ]:
categorical_feature_indexers = [StringIndexer(inputCol=x, \
outputCol="{}_index".format(x)) \
for x in categorical_features]
categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), \
outputCol="oh_encoder_{}".format(x.getOutputCol() )) \
for x in categorical_feature_indexers]
Step 6: Assemble our Features and Feature Pipeline
In [ ]:
feature_cols_lr = [x.getOutputCol() \
for x in categorical_feature_one_hot_encoders]
feature_cols_lr.append("scaled_continuous_features")
feature_assembler_lr = VectorAssembler(inputCols=feature_cols_lr, \
outputCol="features_lr")
Step 7: Train a Linear Regression Model
In [ ]:
linear_regression = LinearRegression(featuresCol="features_lr", \
labelCol="price", \
predictionCol="price_prediction", \
maxIter=10, \
regParam=0.3, \
elasticNetParam=0.8)
estimators_lr = \
[continuous_feature_assembler, continuous_feature_scaler] \
+ categorical_feature_indexers + categorical_feature_one_hot_encoders \
+ [feature_assembler_lr] + [linear_regression]
pipeline = Pipeline(stages=estimators_lr)
pipeline_model = pipeline.fit(training_dataset)
print(pipeline_model)
Step 8: Convert PipelineModel to PMML
In [ ]:
from jpmml import toPMMLBytes
pmmlBytes = toPMMLBytes(spark, training_dataset, pipeline_model)
print(pmmlBytes.decode("utf-8"))
In [ ]:
import urllib.request
update_url = 'http://prediction-pmml-aws.demo.pipeline.io/update-pmml/pmml_airbnb'
update_headers = {}
update_headers['Content-type'] = 'application/xml'
req = urllib.request.Request(update_url, \
headers=update_headers, \
data=pmmlBytes)
resp = urllib.request.urlopen(req)
print(resp.status) # Should return Http Status 200
In [ ]:
import urllib.request
update_url = 'http://prediction-pmml-gcp.demo.pipeline.io/update-pmml/pmml_airbnb'
update_headers = {}
update_headers['Content-type'] = 'application/xml'
req = urllib.request.Request(update_url, \
headers=update_headers, \
data=pmmlBytes)
resp = urllib.request.urlopen(req)
print(resp.status) # Should return Http Status 200
In [ ]:
import urllib.request
update_url = 'http://prediction-pmml-azure.demo.pipeline.io/update-pmml/pmml_airbnb'
update_headers = {}
update_headers['Content-type'] = 'application/xml'
req = urllib.request.Request(update_url, \
headers=update_headers, \
data=pmmlBytes)
resp = urllib.request.urlopen(req)
print(resp.status) # Should return Http Status 200
In [ ]:
import urllib.parse
import json
evaluate_url = 'http://prediction-pmml-aws.demo.pipeline.io/evaluate-pmml/pmml_airbnb'
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"bathrooms":2.0, \
"bedrooms":2.0, \
"security_deposit":175.00, \
"cleaning_fee":25.0, \
"extra_people":1.0, \
"number_of_reviews": 2.0, \
"square_feet": 250.0, \
"review_scores_rating": 2.0, \
"room_type": "Entire home/apt", \
"host_is_super_host": "0.0", \
"cancellation_policy": "flexible", \
"instant_bookable": "1.0", \
"state": "CA"}'
encoded_input_params = input_params.encode('utf-8')
req = urllib.request.Request(evaluate_url, \
headers=evaluate_headers, \
data=encoded_input_params)
resp = urllib.request.urlopen(req)
print(resp.read())
In [ ]:
import urllib.parse
import json
evaluate_url = 'http://prediction-pmml-gcp.demo.pipeline.io/evaluate-pmml/pmml_airbnb'
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"bathrooms":2.0, \
"bedrooms":2.0, \
"security_deposit":175.00, \
"cleaning_fee":25.0, \
"extra_people":1.0, \
"number_of_reviews": 2.0, \
"square_feet": 250.0, \
"review_scores_rating": 2.0, \
"room_type": "Entire home/apt", \
"host_is_super_host": "0.0", \
"cancellation_policy": "flexible", \
"instant_bookable": "1.0", \
"state": "CA"}'
encoded_input_params = input_params.encode('utf-8')
req = urllib.request.Request(evaluate_url, \
headers=evaluate_headers, \
data=encoded_input_params)
resp = urllib.request.urlopen(req)
print(resp.read())
In [ ]:
import urllib.parse
import json
evaluate_url = 'http://prediction-pmml-azure.demo.pipeline.io/evaluate-pmml/pmml_airbnb'
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"bathrooms":2.0, \
"bedrooms":2.0, \
"security_deposit":175.00, \
"cleaning_fee":25.0, \
"extra_people":1.0, \
"number_of_reviews": 2.0, \
"square_feet": 250.0, \
"review_scores_rating": 2.0, \
"room_type": "Entire home/apt", \
"host_is_super_host": "0.0", \
"cancellation_policy": "flexible", \
"instant_bookable": "1.0", \
"state": "CA"}'
encoded_input_params = input_params.encode('utf-8')
req = urllib.request.Request(evaluate_url, \
headers=evaluate_headers, \
data=encoded_input_params)
resp = urllib.request.urlopen(req)
print(resp.read())
In [ ]:
from urllib import request
sourceBytes = ' \n\
private String str; \n\
\n\
public void initialize(Map<String, Object> args) { \n\
} \n\
\n\
public Object predict(Map<String, Object> inputs) { \n\
String id = (String)inputs.get("id"); \n\
\n\
return id.equals("21619"); \n\
} \n\
'.encode('utf-8')
In [ ]:
from urllib import request
name = 'codegen_equals'
update_url = 'http://prediction-codegen-aws.demo.pipeline.io/update-codegen/%s/' % name
update_headers = {}
update_headers['Content-type'] = 'text/plain'
req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)
generated_code = resp.read()
print(generated_code.decode('utf-8'))
In [ ]:
from urllib import request
name = 'codegen_equals'
update_url = 'http://prediction-codegen-gcp.demo.pipeline.io/update-codegen/%s/' % name
update_headers = {}
update_headers['Content-type'] = 'text/plain'
req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)
generated_code = resp.read()
print(generated_code.decode('utf-8'))
In [ ]:
from urllib import request
name = 'codegen_equals'
update_url = 'http://prediction-codegen-azure.demo.pipeline.io/update-codegen/%s/' % name
update_headers = {}
update_headers['Content-type'] = 'text/plain'
req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)
generated_code = resp.read()
print(generated_code.decode('utf-8'))
In [ ]:
from urllib import request
name = 'codegen_equals'
evaluate_url = 'http://prediction-codegen-aws.demo.pipeline.io/evaluate-codegen/%s' % name
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21618"}'
encoded_input_params = input_params.encode('utf-8')
req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)
print(resp.read()) # Should return true
In [ ]:
from urllib import request
name = 'codegen_equals'
evaluate_url = 'http://prediction-codegen-gcp.demo.pipeline.io/evaluate-codegen/%s' % name
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21619"}'
encoded_input_params = input_params.encode('utf-8')
req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)
print(resp.read()) # Should return false
In [ ]:
from urllib import request
name = 'codegen_equals'
evaluate_url = 'http://prediction-codegen-azure.demo.pipeline.io/evaluate-codegen/%s' % name
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"id":"21619"}'
encoded_input_params = input_params.encode('utf-8')
req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)
print(resp.read()) # Should return false
In [ ]:
from urllib import request
sourceBytes = ' \n\
public Map<String, Object> data = new HashMap<String, Object>(); \n\
\n\
public void initialize(Map<String, Object> args) { \n\
data.put("url", "http://demo.pipeline.io:9040/prediction/"); \n\
} \n\
\n\
public Object predict(Map<String, Object> inputs) { \n\
try { \n\
String userId = (String)inputs.get("userId"); \n\
String itemId = (String)inputs.get("itemId"); \n\
String url = data.get("url") + "/" + userId + "/" + itemId; \n\
\n\
return org.apache.http.client.fluent.Request \n\
.Get(url) \n\
.execute() \n\
.returnContent(); \n\
\n\
} catch(Exception exc) { \n\
System.out.println(exc); \n\
throw exc; \n\
} \n\
} \n\
'.encode('utf-8')
In [ ]:
from urllib import request
name = 'codegen_httpclient'
# Note: Must have trailing '/'
update_url = 'http://prediction-codegen-aws.demo.pipeline.io/update-codegen/%s/' % name
update_headers = {}
update_headers['Content-type'] = 'text/plain'
req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)
print(resp.status) # Should return Http Status 200
generated_code = resp.read()
print(generated_code.decode('utf-8'))
In [ ]:
from urllib import request
name = 'codegen_httpclient'
# Note: Must have trailing '/'
update_url = 'http://prediction-codegen-gcp.demo.pipeline.io/update-codegen/%s/' % name
update_headers = {}
update_headers['Content-type'] = 'text/plain'
req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)
print(resp.status) # Should return Http Status 200
generated_code = resp.read()
print(generated_code.decode('utf-8'))
In [ ]:
from urllib import request
name = 'codegen_httpclient'
# Note: Must have trailing '/'
update_url = 'http://prediction-codegen-azure.demo.pipeline.io/update-codegen/%s/' % name
update_headers = {}
update_headers['Content-type'] = 'text/plain'
req = request.Request("%s" % update_url, headers=update_headers, data=sourceBytes)
resp = request.urlopen(req)
print(resp.status) # Should return Http Status 200
generated_code = resp.read()
print(generated_code.decode('utf-8'))
In [ ]:
from urllib import request
name = 'codegen_httpclient'
evaluate_url = 'http://prediction-codegen-aws.demo.pipeline.io/evaluate-codegen/%s' % name
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"userId":"21619", "itemId":"10006"}'
encoded_input_params = input_params.encode('utf-8')
req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)
print(resp.read()) # Should return float
In [ ]:
from urllib import request
name = 'codegen_httpclient'
evaluate_url = 'http://prediction-codegen-gcp.demo.pipeline.io/evaluate-codegen/%s' % name
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"userId":"21619", "itemId":"10006"}'
encoded_input_params = input_params.encode('utf-8')
req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)
print(resp.read()) # Should return float
In [ ]:
from urllib import request
name = 'codegen_httpclient'
evaluate_url = 'http://prediction-codegen-azure.demo.pipeline.io/evaluate-codegen/%s' % name
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"userId":"21619", "itemId":"10006"}'
encoded_input_params = input_params.encode('utf-8')
req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)
print(resp.read()) # Should return float
NetflixOSS Services Dashboard (Hystrix)
In [ ]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
from IPython.display import clear_output, Image, display, HTML
html = '<iframe width=100% height=500px src="http://hystrix.demo.pipeline.io/hystrix-dashboard/monitor/monitor.html?streams=%5B%7B%22name%22%3A%22Model%20Servers%20AWS%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-aws.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22Model%20Servers%20GCP%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-gcp.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22Model%20Servers%20Azure%22%2C%22stream%22%3A%22http%3A%2F%2Fturbine-azure.demo.pipeline.io%2Fturbine.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%5D">'
display(HTML(html))
Run JMeter Tests from Local Laptop (Limited by Laptop)
Run Headless JMeter Tests from Training Clusters in Cloud
In [ ]:
# Spark ML - PMML - Airbnb
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-airbnb-rc.yaml
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-gcp-airbnb-rc.yaml
In [ ]:
# Codegen - Java - Simple
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-equals-rc.yaml
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-gcp-equals-rc.yaml
In [1]:
# Tensorflow AI - Tensorflow Serving - Simple
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-aws-minimal-rc.yaml
!kubectl create --context=awsdemo -f /root/pipeline/loadtest.ml/loadtest-gcp-minimal-rc.yaml
In [ ]:
!kubectl delete --context=awsdemo rc loadtest-aws-airbnb
!kubectl delete --context=awsdemo rc loadtest-gcp-airbnb
!kubectl delete --context=awsdemo rc loadtest-aws-equals
!kubectl delete --context=awsdemo rc loadtest-gcp-equals
!kubectl delete --context=awsdemo rc loadtest-aws-minimal
!kubectl delete --context=awsdemo rc loadtest-gcp-minimal
Kubernetes CLI
In [ ]:
!kubectl rolling-update prediction-tensorflow --context=awsdemo --image-pull-policy=Always --image=fluxcapacitor/prediction-tensorflow
In [ ]:
!kubectl get pod --context=awsdemo
In [ ]:
!kubectl rolling-update prediction-tensorflow --context=gcpdemo --image-pull-policy=Always --image=fluxcapacitor/prediction-tensorflow
In [ ]:
!kubectl get pod --context=gcpdemo