For a more detailed guide refer to tensorflow or pytorch example or to the documentation on https://clipper.ai.
In [2]:
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.appName("example").getOrCreate()
sc = spark.sparkContext
In [3]:
from pyspark.mllib.regression import LabeledPoint
In [4]:
# Dummy data and a simple model
trainRDD = sc.parallelize([LabeledPoint(1.0, [1.0, 0.0, 3.0]),
LabeledPoint(1.0, [1.0, 0.0, 4.0]),
LabeledPoint(1.0, [1.0, 0.0, 5.0]),
LabeledPoint(1.0, [2.0, 0.0, 5.0]),
LabeledPoint(0.0, [4.0, 0.0, 2.0]),
LabeledPoint(0.0, [4.0, 0.0, 1.0]),
LabeledPoint(0.0, [5.0, 0.0, 1.5]),
LabeledPoint(0.0, [3.0, 0.0, 1.0])])
model = LogisticRegressionWithSGD.train(trainRDD, iterations=10)
def shift(x):
return x - np.mean(x)
def predict(spark, model, inputs):
return [str(model.predict(shift(x))) for x in inputs]
In [5]:
from clipper_admin import ClipperConnection, DockerContainerManager
from clipper_admin.deployers.pyspark import deploy_pyspark_model
clipper_conn = ClipperConnection(DockerContainerManager())
In [6]:
clipper_conn.start_clipper()
In [7]:
clipper_conn.connect()
clipper_conn.get_all_apps()
Out[7]:
In [8]:
clipper_conn.get_all_models()
Out[8]:
In [9]:
clipper_conn.register_application(name="pyspark-app", input_type="doubles",
default_output="-1.0", slo_micros=100000)
In [10]:
deploy_pyspark_model(
clipper_conn,
name="pyspark-mod",
input_type="doubles",
func=predict,
pyspark_model=model,
version='1',
sc=sc)
In [11]:
clipper_conn.link_model_to_app(
app_name="pyspark-app",
model_name="pyspark-mod")
In [12]:
clipper_conn.get_all_apps()
Out[12]:
In [13]:
# Get query address
query_address = clipper_conn.get_query_addr()
In [14]:
# Run a query
import requests, json, numpy as np
headers = {"Content-type": "application/json"}
requests.post("http://"+query_address+"/pyspark-app/predict", headers=headers, data=json.dumps({
"input": [2.1, 0.0, 4.2]})).json()
# returns label `1`
Out[14]:
In [15]:
requests.post("http://localhost:1337/pyspark-app/predict", headers=headers, data=json.dumps({
"input": [4.1, 0.0, 1.2]})).json()
# returns label `0`s
Out[15]:
In [16]:
clipper_conn.unlink_model_from_app(model_name="pyspark-mod", app_name="pyspark-app")
In [17]:
clipper_conn.stop_models('pyspark-mod')
In [18]:
clipper_conn.delete_application('pyspark-app')
In [19]:
clipper_conn.stop_all()
In [ ]: