For a more detailed guide refer to tensorflow or pytorch example or to the documentation on https://clipper.ai.


In [2]:
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.appName("example").getOrCreate()
sc = spark.sparkContext

In [3]:
from pyspark.mllib.regression import LabeledPoint

In [4]:
# Dummy data and a simple model
trainRDD = sc.parallelize([LabeledPoint(1.0, [1.0, 0.0, 3.0]), 
                           LabeledPoint(1.0, [1.0, 0.0, 4.0]), 
                           LabeledPoint(1.0, [1.0, 0.0, 5.0]),
                           LabeledPoint(1.0, [2.0, 0.0, 5.0]),
                           LabeledPoint(0.0, [4.0, 0.0, 2.0]),
                           LabeledPoint(0.0, [4.0, 0.0, 1.0]),
                           LabeledPoint(0.0, [5.0, 0.0, 1.5]),
                           LabeledPoint(0.0, [3.0, 0.0, 1.0])])

model = LogisticRegressionWithSGD.train(trainRDD, iterations=10)

def shift(x):
    return x - np.mean(x)

def predict(spark, model, inputs):
    return [str(model.predict(shift(x))) for x in inputs]

In [5]:
from clipper_admin import ClipperConnection, DockerContainerManager
from clipper_admin.deployers.pyspark import deploy_pyspark_model
clipper_conn = ClipperConnection(DockerContainerManager())

In [6]:
clipper_conn.start_clipper()


19-05-31:10:37:56 INFO     [docker_container_manager.py:154] [default-cluster] Starting managed Redis instance in Docker
19-05-31:10:37:58 INFO     [docker_container_manager.py:232] [default-cluster] Metric Configuration Saved at /tmp/tmp5l9n0w3g.yml
19-05-31:10:37:59 INFO     [clipper_admin.py:143] [default-cluster] Clipper is running

In [7]:
clipper_conn.connect()
clipper_conn.get_all_apps()


19-05-31:10:38:00 INFO     [clipper_admin.py:156] [default-cluster] Successfully connected to Clipper cluster at localhost:1337
Out[7]:
[]

In [8]:
clipper_conn.get_all_models()


Out[8]:
[]

In [9]:
clipper_conn.register_application(name="pyspark-app", input_type="doubles", 
                                  default_output="-1.0", slo_micros=100000)


19-05-31:10:38:02 INFO     [clipper_admin.py:220] [default-cluster] Application pyspark-app was successfully registered

In [10]:
deploy_pyspark_model(
    clipper_conn,
    name="pyspark-mod",
    input_type="doubles",
    func=predict,
    pyspark_model=model,
    version='1',
    sc=sc)


19-05-31:10:38:06 INFO     [deployer_utils.py:41] Saving function to /tmp/tmpwrru4e6vclipper
19-05-31:10:38:06 INFO     [deployer_utils.py:51] Serialized and supplied predict function
19-05-31:10:38:10 INFO     [pyspark.py:234] Spark model saved
19-05-31:10:38:10 INFO     [pyspark.py:248] Using Python 3.6 base image
19-05-31:10:38:10 INFO     [clipper_admin.py:513] [default-cluster] Building model Docker image with model data from /tmp/tmpwrru4e6vclipper
19-05-31:10:38:10 INFO     [clipper_admin.py:518] [default-cluster] Step 1/2 : FROM clipper/pyspark36-container:develop
19-05-31:10:38:10 INFO     [clipper_admin.py:518] [default-cluster]  ---> dc5518780d68
19-05-31:10:38:10 INFO     [clipper_admin.py:518] [default-cluster] Step 2/2 : COPY /tmp/tmpwrru4e6vclipper /model/
19-05-31:10:38:10 INFO     [clipper_admin.py:518] [default-cluster]  ---> 17910751223a
19-05-31:10:38:10 INFO     [clipper_admin.py:518] [default-cluster] Successfully built 17910751223a
19-05-31:10:38:10 INFO     [clipper_admin.py:518] [default-cluster] Successfully tagged default-cluster-pyspark-mod:1
19-05-31:10:38:10 INFO     [clipper_admin.py:520] [default-cluster] Pushing model Docker image to default-cluster-pyspark-mod:1
19-05-31:10:38:12 INFO     [docker_container_manager.py:356] [default-cluster] Found 0 replicas for pyspark-mod:1. Adding 1
19-05-31:10:38:13 INFO     [clipper_admin.py:697] [default-cluster] Successfully registered model pyspark-mod:1
19-05-31:10:38:13 INFO     [clipper_admin.py:615] [default-cluster] Done deploying model pyspark-mod:1.

In [11]:
clipper_conn.link_model_to_app(
    app_name="pyspark-app",
    model_name="pyspark-mod")


19-05-31:10:38:15 INFO     [clipper_admin.py:282] [default-cluster] Model pyspark-mod is now linked to application pyspark-app

In [12]:
clipper_conn.get_all_apps()


Out[12]:
['pyspark-app']

In [13]:
# Get query address
query_address = clipper_conn.get_query_addr()

In [14]:
# Run a query
import requests, json, numpy as np
headers = {"Content-type": "application/json"}
requests.post("http://"+query_address+"/pyspark-app/predict", headers=headers, data=json.dumps({
    "input": [2.1, 0.0, 4.2]})).json()
# returns label `1`


Out[14]:
{'query_id': 0, 'output': 1, 'default': False}

In [15]:
requests.post("http://localhost:1337/pyspark-app/predict", headers=headers, data=json.dumps({
    "input": [4.1, 0.0, 1.2]})).json()
# returns label `0`s


Out[15]:
{'query_id': 1, 'output': 0, 'default': False}

In [16]:
clipper_conn.unlink_model_from_app(model_name="pyspark-mod", app_name="pyspark-app")


19-05-31:10:38:54 INFO     [clipper_admin.py:323] Model pyspark-mod is now removed to application pyspark-app

In [17]:
clipper_conn.stop_models('pyspark-mod')


19-05-31:10:39:05 INFO     [clipper_admin.py:1238] [default-cluster] Stopped all containers for these models and versions:
{'pyspark-mod': ['1']}

In [18]:
clipper_conn.delete_application('pyspark-app')


19-05-31:10:39:05 INFO     [clipper_admin.py:239] [default-cluster] Application pyspark-app was successfully deleted

In [19]:
clipper_conn.stop_all()


19-05-31:10:39:36 INFO     [clipper_admin.py:1324] [default-cluster] Stopped all Clipper cluster and all model containers

In [ ]: