In [ ]:
# You may need to Reconnect (more than Restart) the Kernel to pick up changes to these sett
import os
master = '--master spark://127.0.0.1:47077'
conf = '--conf spark.cores.max=1 --conf spark.executor.memory=512m'
packages = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1'
jars = '--jars /root/lib/jpmml-sparkml-package-1.0-SNAPSHOT.jar'
py_files = '--py-files /root/lib/jpmml.py'
os.environ['PYSPARK_SUBMIT_ARGS'] = master \
+ ' ' + conf \
+ ' ' + packages \
+ ' ' + jars \
+ ' ' + py_files \
+ ' ' + 'pyspark-shell'
print(os.environ['PYSPARK_SUBMIT_ARGS'])
In [ ]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import DecisionTreeClassifier
In [ ]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.getOrCreate()
In [ ]:
data = sparkSession.read.format("csv") \
.option("inferSchema", "true").option("header", "true") \
.load("hdfs://127.0.0.1:39000/datasets/census/census.csv")
data.head()
In [ ]:
formula = RFormula(formula = "income ~ .")
classifier = DecisionTreeClassifier()
pipeline = Pipeline(stages = [formula, classifier])
pipelineModel = pipeline.fit(data)
print(pipelineModel)
In [ ]:
print(pipelineModel.stages[1].toDebugString)
In [ ]:
from jpmml import toPMMLBytes
pmmlBytes = toPMMLBytes(sparkSession, data, pipelineModel)
print(pmmlBytes.decode("utf-8"))
In [ ]:
from urllib import request
update_url = 'http://<your-ip>:39040/update-pmml/pmml_census'
update_headers = {}
update_headers['Content-type'] = 'application/xml'
req = request.Request(update_url, headers=update_headers, data=pmmlBytes)
resp = request.urlopen(req)
print(resp.status) # Should return Http Status 200
In [ ]:
from urllib import request
evaluate_url = 'http://<your-ip>:39040/evaluate-pmml/pmml_census'
evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'
input_params = '{"age":39,"workclass":"State-gov","education":"Bachelors","education_num":13,"marital_status":"Never-married","occupation":"Adm-clerical","relationship":"Not-in-family","race":"White","sex":"Male","capital_gain":2174,"capital_loss":0,"hours_per_week":40,"native_country":"United-States"}'
encoded_input_params = input_params.encode('utf-8')
req = request.Request(evaluate_url, headers=evaluate_headers, data=encoded_input_params)
resp = request.urlopen(req)
print(resp.read()) # Should return valid classification with probabilities
Fill in
http://<your-ip>:47979/hystrix-dashboard/monitor/monitor.html?streams=%5B%7B%22name%22%3A%22%22%2C%22stream%22%3A%22http%3A%2F%2F<your-ip>%3A39043%2Fhystrix.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22%22%2C%22stream%22%3A%22http%3A%2F%2F<your-ip>%3A39042%2Fhystrix.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22%22%2C%22stream%22%3A%22http%3A%2F%2F<your-ip>%3A39041%2Fhystrix.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22%22%2C%22stream%22%3A%22http%3A%2F%2F<your-ip>%3A39040%2Fhystrix.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%5D
Save Model to Disk
In [ ]:
!mkdir -p /root/src/pmml/census/
with open('/root/src/pmml/census/pmml_census.pmml', 'wb') as f:
f.write(pmmlBytes)
!ls /root/src/pmml/census/pmml_census.pmml
TODO: Trigger Airflow to Build New Docker Image (ie. via Github commit)
In [ ]:
!start-loadtest.sh $SOURCE_HOME/loadtest/RecommendationServiceStressTest-local-census.jmx
In [ ]: