Step 0: Load Libraries, Data, and SparkSession


In [1]:
# You may need to Reconnect (more than Restart) the Kernel to pick up changes to these sett
import os

master = '--master spark://127.0.0.1:47077'
conf = '--conf spark.cores.max=1 --conf spark.executor.memory=512m'
packages = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1'
jars = '--jars /root/lib/jpmml-sparkml-package-1.0-SNAPSHOT.jar'
py_files = '--py-files /root/lib/jpmml.py'

os.environ['PYSPARK_SUBMIT_ARGS'] = master \
  + ' ' + conf \
  + ' ' + packages \
  + ' ' + jars \
  + ' ' + py_files \
  + ' ' + 'pyspark-shell'

print(os.environ['PYSPARK_SUBMIT_ARGS'])


--master spark://127.0.0.1:47077 --conf spark.cores.max=1 --conf spark.executor.memory=512m --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1 --jars /root/lib/jpmml-sparkml-package-1.0-SNAPSHOT.jar --py-files /root/lib/jpmml.py pyspark-shell

In [2]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [4]:
df = spark.read.format("csv") \
  .option("inferSchema", "true").option("header", "true") \
  .load("hdfs://127.0.0.1:39000/datasets/airbnb/airbnb.csv.bz2")

df.registerTempTable("df")

print(df.head())


Row(id=5731498, name='A 2-bdrm house in Plaka of Athens', space='Ideally located a unique house in a very peaceful neighborhood of Plaka, near Acropolis. It is a traditional house in the heart of the historical center of Athens, in Plaka. The kitchen is fully equipped with oven, fridge with freezer. Cutlery, dishes and pans, kettle, espresso coffee maker (espresso capsules are provided), toaster. There is also a vacuum cleaner and a laundry machine. One big closet will make your stay more comfortable. Bed linen, towels and bath amenities are provided. Moreover, the apartment is fully airconditioned. The apartment is very close to a greek traditional tavernas, a pharmacy, banks and public transport.  Airport or any other transport is available upon demand at an additional but very reasonable cost. ', price='120.0', bathrooms='1.0', bedrooms='2.0', room_type='Entire home/apt', square_feet=None, host_is_super_host='0.0', city='Athina', state=None, cancellation_policy='moderate', security_deposit='200.0', cleaning_fee='20.0', extra_people='15.0', minimum_nights='2', first_review='2015-04-07', instant_bookable='1.0', number_of_reviews='16', review_scores_rating='94.0', price_per_bedroom='60.0')

In [5]:
print(df.count())


198454

Step 1: Clean, Filter, and Summarize the Data


In [6]:
df_filtered = df.filter("price >= 50 AND price <= 750 AND bathrooms > 0.0 AND bedrooms is not null")

df_filtered.registerTempTable("df_filtered")

df_final = spark.sql("""
    select
        id,
        city,
        case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
            then state
            else 'Other'
        end as state,
        space,
        cast(price as double) as price,
        cast(bathrooms as double) as bathrooms,
        cast(bedrooms as double) as bedrooms,
        room_type,
        host_is_super_host,
        cancellation_policy,
        cast(case when security_deposit is null
            then 0.0
            else security_deposit
        end as double) as security_deposit,
        price_per_bedroom,
        cast(case when number_of_reviews is null
            then 0.0
            else number_of_reviews
        end as double) as number_of_reviews,
        cast(case when extra_people is null
            then 0.0
            else extra_people
        end as double) as extra_people,
        instant_bookable,
        cast(case when cleaning_fee is null
            then 0.0
            else cleaning_fee
        end as double) as cleaning_fee,
        cast(case when review_scores_rating is null
            then 80.0
            else review_scores_rating
        end as double) as review_scores_rating,
        cast(case when square_feet is not null and square_feet > 100
            then square_feet
            when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
            then 350.0
            else 380 * bedrooms
        end as double) as square_feet
    from df_filtered
""").persist()

df_final.registerTempTable("df_final")

df_final.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()


+-------+------------------+------------------+------------------+------------------+-----------------+
|summary|       square_feet|             price|          bedrooms|         bathrooms|     cleaning_fee|
+-------+------------------+------------------+------------------+------------------+-----------------+
|  count|            151770|            151770|            151770|            151770|           151770|
|   mean| 545.5359161889702|130.99831982605258|    1.333537589774|1.1987316333926337|37.24688014759175|
| stddev|363.18878116083886| 89.57902021660226|0.8457954996720247|0.4834034060595014|  42.621395309755|
|    min|             104.0|              50.0|               0.0|               0.5|              0.0|
|    max|           32292.0|             750.0|              10.0|               8.0|            700.0|
+-------+------------------+------------------+------------------+------------------+-----------------+


In [7]:
print(df_final.count())


151770

In [8]:
print(df_final.schema)


StructType(List(StructField(id,IntegerType,true),StructField(city,StringType,true),StructField(state,StringType,true),StructField(space,StringType,true),StructField(price,DoubleType,true),StructField(bathrooms,DoubleType,true),StructField(bedrooms,DoubleType,true),StructField(room_type,StringType,true),StructField(host_is_super_host,StringType,true),StructField(cancellation_policy,StringType,true),StructField(security_deposit,DoubleType,true),StructField(price_per_bedroom,StringType,true),StructField(number_of_reviews,DoubleType,true),StructField(extra_people,DoubleType,true),StructField(instant_bookable,StringType,true),StructField(cleaning_fee,DoubleType,true),StructField(review_scores_rating,DoubleType,true),StructField(square_feet,DoubleType,true)))

In [9]:
# Most popular cities

spark.sql("""
    select 
        state,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by state
    order by count(*) desc
""").show()


+------+-----+------------------+---------+
| state|   ct|         avg_price|max_price|
+------+-----+------------------+---------+
| Other|87420|122.00251658659346|    750.0|
|    NY|22887|145.93550924105386|    750.0|
|    CA|20726|157.41102962462608|    750.0|
|Berlin| 6031| 80.64118719946941|    650.0|
|    IL| 3552|141.46903153153153|    690.0|
|    TX| 3104|195.02673969072166|    750.0|
|    WA| 2698|131.48999258710157|    750.0|
|    DC| 2590|136.64015444015445|    720.0|
|    OR| 1954|114.02661207778915|    700.0|
|London|  808|108.85396039603961|    600.0|
+------+-----+------------------+---------+


In [10]:
# Most expensive popular cities

spark.sql("""
    select 
        city,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by city
    order by avg(price) desc
""").filter("ct > 25").show()


+-------------------+---+------------------+---------+
|               city| ct|         avg_price|max_price|
+-------------------+---+------------------+---------+
|         Palm Beach| 26| 348.7692307692308|    701.0|
|        Watsonville| 38| 313.3157894736842|    670.0|
|             Malibu|136| 280.9852941176471|    750.0|
|             Avalon| 38|262.42105263157896|    701.0|
|           Capitola| 35|             246.4|    650.0|
|           Tamarama| 72|             238.5|    750.0|
|    Manhattan Beach|109|232.10091743119267|    700.0|
|Rancho Palos Verdes| 39|230.02564102564102|    750.0|
|       Avalon Beach| 38|229.60526315789474|    620.0|
|            Newport| 52| 223.8653846153846|    750.0|
|      Darling Point| 29|221.51724137931035|    623.0|
|        Middle Park| 34| 212.7941176470588|    671.0|
|            Balmain| 55|212.56363636363636|    712.0|
|        North Bondi|180|206.68333333333334|    750.0|
|             Bronte|144|203.70833333333334|    750.0|
|        Queenscliff| 40|           201.925|    650.0|
|          Lilyfield| 26|198.92307692307693|    701.0|
|         Freshwater| 54| 198.5185185185185|    650.0|
|           La Jolla| 52|197.82692307692307|    649.0|
|     Marina del Rey|205| 196.6390243902439|    550.0|
+-------------------+---+------------------+---------+
only showing top 20 rows

Step 2: Define Continous and Categorical Features


In [11]:
continuous_features = ["bathrooms", \
                       "bedrooms", \
                       "security_deposit", \
                       "cleaning_fee", \
                       "extra_people", \
                       "number_of_reviews", \
                       "square_feet", \
                       "review_scores_rating"]

categorical_features = ["room_type", \
                        "host_is_super_host", \
                        "cancellation_policy", \
                        "instant_bookable", \
                        "state"]

Step 3: Split Data into Training and Validation


In [12]:
[training_dataset, validation_dataset] = df_final.randomSplit([0.8, 0.2])

Step 4: Continous Feature Pipeline


In [13]:
continuous_feature_assembler = VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")

continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features", \
                                           withStd=True, withMean=False)

Step 5: Categorical Feature Pipeline


In [14]:
categorical_feature_indexers = [StringIndexer(inputCol=x, \
                                              outputCol="{}_index".format(x)) \
                                for x in categorical_features]

categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), \
                                                      outputCol="oh_encoder_{}".format(x.getOutputCol() )) \
                                        for x in categorical_feature_indexers]

Step 6: Assemble our features and feature pipeline


In [15]:
feature_cols_lr = [x.getOutputCol() \
                   for x in categorical_feature_one_hot_encoders]
feature_cols_lr.append("scaled_continuous_features")

feature_assembler_lr = VectorAssembler(inputCols=feature_cols_lr, \
                                       outputCol="features_lr")

Step 7: Train a Linear Regression Model


In [16]:
linear_regression = LinearRegression(featuresCol="features_lr", \
                                     labelCol="price", \
                                     predictionCol="price_prediction", \
                                     maxIter=10, \
                                     regParam=0.3, \
                                     elasticNetParam=0.8)

estimators_lr = \
  [continuous_feature_assembler, continuous_feature_scaler] \
  + categorical_feature_indexers + categorical_feature_one_hot_encoders \
  + [feature_assembler_lr] + [linear_regression]

pipeline = Pipeline(stages=estimators_lr)

pipeline_model = pipeline.fit(training_dataset)

print(pipeline_model)


PipelineModel_44b4876b2f1052c4d138

TODO: Step 8: Validate Linear Regression Model

Step 9: Convert PipelineModel to PMML


In [17]:
from jpmml import toPMMLBytes

pmmlBytes = toPMMLBytes(spark, training_dataset, pipeline_model)

print(pmmlBytes.decode("utf-8"))


<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<PMML xmlns="http://www.dmg.org/PMML-4_3" version="4.3">
	<Header>
		<Application/>
		<Timestamp>2017-03-01T15:56:23Z</Timestamp>
	</Header>
	<DataDictionary>
		<DataField name="bathrooms" optype="continuous" dataType="double"/>
		<DataField name="bedrooms" optype="continuous" dataType="double"/>
		<DataField name="security_deposit" optype="continuous" dataType="double"/>
		<DataField name="cleaning_fee" optype="continuous" dataType="double"/>
		<DataField name="extra_people" optype="continuous" dataType="double"/>
		<DataField name="number_of_reviews" optype="continuous" dataType="double"/>
		<DataField name="square_feet" optype="continuous" dataType="double"/>
		<DataField name="review_scores_rating" optype="continuous" dataType="double"/>
		<DataField name="room_type" optype="categorical" dataType="string">
			<Value value="Entire home/apt"/>
			<Value value="Private room"/>
			<Value value="Shared room"/>
		</DataField>
		<DataField name="host_is_super_host" optype="categorical" dataType="string">
			<Value value="0.0"/>
			<Value value="1.0"/>
		</DataField>
		<DataField name="cancellation_policy" optype="categorical" dataType="string">
			<Value value="strict"/>
			<Value value="moderate"/>
			<Value value="flexible"/>
			<Value value="super_strict_30"/>
			<Value value="no_refunds"/>
			<Value value="super_strict_60"/>
			<Value value="long_term"/>
		</DataField>
		<DataField name="instant_bookable" optype="categorical" dataType="string">
			<Value value="0.0"/>
			<Value value="1.0"/>
		</DataField>
		<DataField name="state" optype="categorical" dataType="string">
			<Value value="Other"/>
			<Value value="NY"/>
			<Value value="CA"/>
			<Value value="Berlin"/>
			<Value value="IL"/>
			<Value value="TX"/>
			<Value value="WA"/>
			<Value value="DC"/>
			<Value value="OR"/>
			<Value value="London"/>
		</DataField>
		<DataField name="price" optype="continuous" dataType="double"/>
	</DataDictionary>
	<TransformationDictionary>
		<DerivedField name="scaled_continuous_features[0]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="bathrooms"/>
				<Constant dataType="double">2.066948662722852</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[1]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="bedrooms"/>
				<Constant dataType="double">1.180521103246704</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[2]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="security_deposit"/>
				<Constant dataType="double">0.005528871611005103</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[3]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="cleaning_fee"/>
				<Constant dataType="double">0.023388321106418194</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[4]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="extra_people"/>
				<Constant dataType="double">0.05377243067347896</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[5]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="number_of_reviews"/>
				<Constant dataType="double">0.03702381794115088</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[6]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="square_feet"/>
				<Constant dataType="double">0.002699814182534678</Constant>
			</Apply>
		</DerivedField>
		<DerivedField name="scaled_continuous_features[7]" optype="continuous" dataType="double">
			<Apply function="*">
				<FieldRef field="review_scores_rating"/>
				<Constant dataType="double">0.11342208392584212</Constant>
			</Apply>
		</DerivedField>
	</TransformationDictionary>
	<RegressionModel functionName="regression">
		<MiningSchema>
			<MiningField name="price" usageType="target"/>
			<MiningField name="bathrooms"/>
			<MiningField name="bedrooms"/>
			<MiningField name="security_deposit"/>
			<MiningField name="cleaning_fee"/>
			<MiningField name="extra_people"/>
			<MiningField name="number_of_reviews"/>
			<MiningField name="square_feet"/>
			<MiningField name="review_scores_rating"/>
			<MiningField name="room_type"/>
			<MiningField name="host_is_super_host"/>
			<MiningField name="cancellation_policy"/>
			<MiningField name="instant_bookable"/>
			<MiningField name="state"/>
		</MiningSchema>
		<RegressionTable intercept="-34.4106991521102">
			<NumericPredictor name="scaled_continuous_features[0]" coefficient="16.847748561646224"/>
			<NumericPredictor name="scaled_continuous_features[1]" coefficient="21.92709156483596"/>
			<NumericPredictor name="scaled_continuous_features[2]" coefficient="0.9172950622239107"/>
			<NumericPredictor name="scaled_continuous_features[3]" coefficient="24.23345392494766"/>
			<NumericPredictor name="scaled_continuous_features[4]" coefficient="2.791294479271732"/>
			<NumericPredictor name="scaled_continuous_features[5]" coefficient="-2.732570793417158"/>
			<NumericPredictor name="scaled_continuous_features[6]" coefficient="3.2598541490766517"/>
			<NumericPredictor name="scaled_continuous_features[7]" coefficient="4.546195202685065"/>
			<CategoricalPredictor name="room_type" value="Entire home/apt" coefficient="27.62034753659175"/>
			<CategoricalPredictor name="room_type" value="Private room" coefficient="-12.122302489552254"/>
			<CategoricalPredictor name="host_is_super_host" value="0.0" coefficient="-5.283908158249202"/>
			<CategoricalPredictor name="cancellation_policy" value="strict" coefficient="2.0448928156029966"/>
			<CategoricalPredictor name="cancellation_policy" value="moderate" coefficient="-5.019287620695403"/>
			<CategoricalPredictor name="cancellation_policy" value="flexible" coefficient="0.0"/>
			<CategoricalPredictor name="cancellation_policy" value="super_strict_30" coefficient="69.4653087542643"/>
			<CategoricalPredictor name="cancellation_policy" value="no_refunds" coefficient="0.0"/>
			<CategoricalPredictor name="cancellation_policy" value="super_strict_60" coefficient="74.8258849462191"/>
			<CategoricalPredictor name="instant_bookable" value="0.0" coefficient="7.058634421712414"/>
			<CategoricalPredictor name="state" value="Other" coefficient="-11.185614866757914"/>
			<CategoricalPredictor name="state" value="NY" coefficient="20.85540225955847"/>
			<CategoricalPredictor name="state" value="CA" coefficient="12.986033731266012"/>
			<CategoricalPredictor name="state" value="Berlin" coefficient="-49.890885827932564"/>
			<CategoricalPredictor name="state" value="IL" coefficient="15.225180408712143"/>
			<CategoricalPredictor name="state" value="TX" coefficient="32.48034256281033"/>
			<CategoricalPredictor name="state" value="WA" coefficient="-7.750903977521724"/>
			<CategoricalPredictor name="state" value="DC" coefficient="5.179406693202477"/>
			<CategoricalPredictor name="state" value="OR" coefficient="-17.11373330831209"/>
		</RegressionTable>
	</RegressionModel>
</PMML>

Deployment Option 1: Mutable Model Deployment

Deploy New Model to Live, Running Model Server


In [18]:
import urllib.request

update_url = 'http://<your-ip>:39040/update-pmml/pmml_airbnb'

update_headers = {}
update_headers['Content-type'] = 'application/xml'

req = urllib.request.Request(update_url, \
                             headers=update_headers, \
                             data=pmmlBytes)

resp = urllib.request.urlopen(req)

print(resp.status) # Should return Http Status 200



gaierrorTraceback (most recent call last)
/opt/conda/lib/python3.5/urllib/request.py in do_open(self, http_class, req, **http_conn_args)
   1253             try:
-> 1254                 h.request(req.get_method(), req.selector, req.data, headers)
   1255             except OSError as err: # timeout error

/opt/conda/lib/python3.5/http/client.py in request(self, method, url, body, headers)
   1106         """Send a complete request to the server."""
-> 1107         self._send_request(method, url, body, headers)
   1108 

/opt/conda/lib/python3.5/http/client.py in _send_request(self, method, url, body, headers)
   1151             body = _encode(body, 'body')
-> 1152         self.endheaders(body)
   1153 

/opt/conda/lib/python3.5/http/client.py in endheaders(self, message_body)
   1102             raise CannotSendHeader()
-> 1103         self._send_output(message_body)
   1104 

/opt/conda/lib/python3.5/http/client.py in _send_output(self, message_body)
    933 
--> 934         self.send(msg)
    935         if message_body is not None:

/opt/conda/lib/python3.5/http/client.py in send(self, data)
    876             if self.auto_open:
--> 877                 self.connect()
    878             else:

/opt/conda/lib/python3.5/http/client.py in connect(self)
    848         self.sock = self._create_connection(
--> 849             (self.host,self.port), self.timeout, self.source_address)
    850         self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

/opt/conda/lib/python3.5/socket.py in create_connection(address, timeout, source_address)
    693     err = None
--> 694     for res in getaddrinfo(host, port, 0, SOCK_STREAM):
    695         af, socktype, proto, canonname, sa = res

/opt/conda/lib/python3.5/socket.py in getaddrinfo(host, port, family, type, proto, flags)
    732     addrlist = []
--> 733     for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
    734         af, socktype, proto, canonname, sa = res

gaierror: [Errno -2] Name or service not known

During handling of the above exception, another exception occurred:

URLErrorTraceback (most recent call last)
<ipython-input-18-ee54834438a7> in <module>()
      8 req = urllib.request.Request(update_url,                              headers=update_headers,                              data=pmmlBytes)
      9 
---> 10 resp = urllib.request.urlopen(req)
     11 
     12 print(resp.status) # Should return Http Status 200

/opt/conda/lib/python3.5/urllib/request.py in urlopen(url, data, timeout, cafile, capath, cadefault, context)
    161     else:
    162         opener = _opener
--> 163     return opener.open(url, data, timeout)
    164 
    165 def install_opener(opener):

/opt/conda/lib/python3.5/urllib/request.py in open(self, fullurl, data, timeout)
    464             req = meth(req)
    465 
--> 466         response = self._open(req, data)
    467 
    468         # post-process response

/opt/conda/lib/python3.5/urllib/request.py in _open(self, req, data)
    482         protocol = req.type
    483         result = self._call_chain(self.handle_open, protocol, protocol +
--> 484                                   '_open', req)
    485         if result:
    486             return result

/opt/conda/lib/python3.5/urllib/request.py in _call_chain(self, chain, kind, meth_name, *args)
    442         for handler in handlers:
    443             func = getattr(handler, meth_name)
--> 444             result = func(*args)
    445             if result is not None:
    446                 return result

/opt/conda/lib/python3.5/urllib/request.py in http_open(self, req)
   1280 
   1281     def http_open(self, req):
-> 1282         return self.do_open(http.client.HTTPConnection, req)
   1283 
   1284     http_request = AbstractHTTPHandler.do_request_

/opt/conda/lib/python3.5/urllib/request.py in do_open(self, http_class, req, **http_conn_args)
   1254                 h.request(req.get_method(), req.selector, req.data, headers)
   1255             except OSError as err: # timeout error
-> 1256                 raise URLError(err)
   1257             r = h.getresponse()
   1258         except:

URLError: <urlopen error [Errno -2] Name or service not known>

In [ ]:
import urllib.parse
import json

# Note:  You will need to run this twice.
#        A fallback will trigger the first time. (bug)
evaluate_url = 'http://<your-ip>:39040/evaluate-pmml/pmml_airbnb'

evaluate_headers = {}
evaluate_headers['Content-type'] = 'application/json'

input_params = '{"bathrooms":2.0, \
                 "bedrooms":2.0, \
                 "security_deposit":175.00, \
                 "cleaning_fee":25.0, \
                 "extra_people":1.0, \
                 "number_of_reviews": 2.0, \
                 "square_feet": 250.0, \
                 "review_scores_rating": 2.0, \
                 "room_type": "Entire home/apt", \
                 "host_is_super_host": "0.0", \
                 "cancellation_policy": "flexible", \
                 "instant_bookable": "1.0", \
                 "state": "CA"}' 
encoded_input_params = input_params.encode('utf-8')

req = urllib.request.Request(evaluate_url, \
                             headers=evaluate_headers, \
                             data=encoded_input_params)

resp = urllib.request.urlopen(req)

print(resp.read())

Model Server Dashboard

Fill in below, then copy/paste to your browser

http://<your-ip>:47979/hystrix-dashboard/monitor/monitor.html?streams=%5B%7B%22name%22%3A%22%22%2C%22stream%22%3A%22http%3A%2F%2F<your-ip>%3A39043%2Fhystrix.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22%22%2C%22stream%22%3A%22http%3A%2F%2F<your-ip>%3A39042%2Fhystrix.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22%22%2C%22stream%22%3A%22http%3A%2F%2F<your-ip>%3A39041%2Fhystrix.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%2C%7B%22name%22%3A%22%22%2C%22stream%22%3A%22http%3A%2F%2F<your-ip>%3A39040%2Fhystrix.stream%22%2C%22auth%22%3A%22%22%2C%22delay%22%3A%22%22%7D%5D

Deployment Option 2: Immutable Model Deployment

Save Model to Disk


In [ ]:
!mkdir -p /root/src/pmml/airbnb/

with open('/root/src/pmml/airbnb/pmml_airbnb.pmml', 'wb') as f:
  f.write(pmmlBytes)

!ls /root/src/pmml/airbnb/pmml_airbnb.pmml

TODO: Trigger Airflow to Build New Docker Image (ie. via Github commit)

Load Test


In [19]:
!start-loadtest.sh $SOURCE_HOME/loadtest/RecommendationServiceStressTest-local-airbnb.jmx


Writing log file to: /root/pipeline/education.ml/serving/src/notebooks/spark/jmeter.log
Creating summariser <summary>
Created the tree successfully using /root/src/loadtest/RecommendationServiceStressTest-local-airbnb.jmx
Starting the test @ Wed Mar 01 16:07:43 UTC 2017 (1488384463893)
Waiting for possible Shutdown/StopTestNow/Heapdump message on port 4446
Killed