This notebook demonstrates the code required to train and deploy two algorithms (linear regression and random forest) to an MLeap server.
The dataset used for the demo was pulled together from individual cities' data found here: http://insideairbnb.com/get-the-data.html
The sample code has the following sections:
In [2]:
// imports
import java.io.File
import com.esotericsoftware.kryo.io.Output
import com.truecar.mleap.serialization.ml.v1.MlJsonSerializer
import com.truecar.mleap.runtime.transformer.Transformer
import com.truecar.mleap.runtime.transformer
import com.truecar.mleap.spark.MleapSparkSupport._
import org.apache.spark.ml.feature.{StandardScaler, StringIndexer, VectorAssembler}
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import ml.bundle.fs.DirectoryBundle
import com.truecar.mleap.runtime.util.LeapFrameUtil
import com.truecar.mleap.runtime.{LocalLeapFrame, LeapFrame}
import spray.json._
import com.truecar.mleap.serialization.mleap.v1.MleapJsonSupport._
In [6]:
// Step 1. Load our Airbnb dataset
val inputFile = "file:////tmp/airbnb.avro"
val outputFileRf = "/tmp/transformer.rf.ml"
val outputFileLr = "/tmp/transformer.lr.ml"
var dataset = sqlContext.read.format("com.databricks.spark.avro").
load(inputFile)
var datasetFiltered = dataset.filter("price >= 50 AND price <= 750 and bathrooms > 0.0")
println(dataset.count())
println(datasetFiltered.count())
In [7]:
datasetFiltered.select("price", "bedrooms", "bathrooms", "number_of_reviews", "cleaning_fee").describe().show()
In [8]:
// Most popular cities (original dataset)
dataset.registerTempTable("df")
sqlContext.sql(f"""
select
city,
count(*) as n,
cast(avg(price) as decimal(12,2)) as avg_price,
max(price) as max_price
from df
group by city
order by count(*) desc
""").show()
In [9]:
// Most expensive popular cities (original dataset)
dataset.registerTempTable("df")
sqlContext.sql(f"""
select
city,
count(*) as n,
cast(avg(price) as decimal(12,2)) as avg_price,
max(price) as max_price
from df
group by city
order by avg(price) desc
""").filter("n>25").show()
In [10]:
// Step 2. Create our feature pipeline and train it on the entire dataset
val continuousFeatures = Array("bathrooms",
"bedrooms",
"security_deposit",
"cleaning_fee",
"extra_people",
"number_of_reviews",
"review_scores_rating")
val categoricalFeatures = Array("room_type",
"host_is_superhost",
"cancellation_policy",
"instant_bookable")
val allFeatures = continuousFeatures.union(categoricalFeatures)
In [11]:
// Filter all null values
val allCols = allFeatures.union(Seq("price")).map(datasetFiltered.col)
val nullFilter = allCols.map(_.isNotNull).reduce(_ && _)
datasetFiltered = datasetFiltered.select(allCols: _*).filter(nullFilter).persist()
In [12]:
val Array(trainingDataset, validationDataset) = datasetFiltered.randomSplit(Array(0.7, 0.3))
val continuousFeatureAssembler = new VectorAssembler().
setInputCols(continuousFeatures).
setOutputCol("unscaled_continuous_features")
val continuousFeatureScaler = new StandardScaler().
setInputCol("unscaled_continuous_features").
setOutputCol("scaled_continuous_features")
val categoricalFeatureIndexers = categoricalFeatures.map {
feature => new StringIndexer().
setInputCol(feature).
setOutputCol(s"${feature}_index")
}
val featureCols = categoricalFeatureIndexers.map(_.getOutputCol).union(Seq("scaled_continuous_features"))
val featureAssembler = new VectorAssembler().
setInputCols(featureCols).
setOutputCol("features")
val estimators: Array[PipelineStage] = Array(continuousFeatureAssembler, continuousFeatureScaler).
union(categoricalFeatureIndexers).
union(Seq(featureAssembler))
val featurePipeline = new Pipeline().
setStages(estimators)
val sparkFeaturePipelineModel = featurePipeline.fit(datasetFiltered)
println("Finished constructing the pipeline")
In [13]:
// Step 3.1 Create our random forest model
val randomForest = new RandomForestRegressor().
setFeaturesCol("features").
setLabelCol("price").
setPredictionCol("price_prediction")
val sparkPipelineEstimatorRf = new Pipeline().setStages(Array(sparkFeaturePipelineModel, randomForest))
val sparkPipelineRf = sparkPipelineEstimatorRf.fit(trainingDataset)
println("Complete: Training Random Forest")
In [14]:
// Step 3.2 Create our linear regression model
val linearRegression = new LinearRegression().
setFeaturesCol("features").
setLabelCol("price").
setPredictionCol("price_prediction")
val sparkPipelineEstimatorLr = new Pipeline().setStages(Array(sparkFeaturePipelineModel, linearRegression))
val sparkPipelineLr = sparkPipelineEstimatorLr.fit(trainingDataset)
println("Complete: Training Linear Regression")
In [15]:
// Step 4.1 Assemble the final pipeline (random forest) by implicit conversion to MLeap models
val mleapPipelineRf: transformer.PipelineModel = mleapPipelineModelToMleap.toMleap(sparkPipelineRf)
val mleapRandomForest = mleapPipelineRf.
transformers(1).
asInstanceOf[transformer.RandomForestRegressionModel].
copy(predictionCol = "price_prediction_mleap")
In [16]:
// Step 4.2 Assemble the final pipeline (linear regression) by implicit conversion to MLeap models
val mleapPipelineLr: transformer.PipelineModel = mleapPipelineModelToMleap.toMleap(sparkPipelineLr)
val mleapLinearRegression = mleapPipelineLr.
transformers(1).
asInstanceOf[transformer.LinearRegressionModel].
copy(predictionCol = "price_prediction_mleap")
In [17]:
var scoredRf = sparkPipelineRf.transform(validationDataset)
scoredRf = mleapRandomForest.sparkTransform(scoredRf)
scoredRf.select("bathrooms", "bedrooms", "security_deposit", "number_of_reviews", "price", "price_prediction", "price_prediction_mleap").
//where("bedrooms>0 and bathrooms>0").
limit(10).
show()
In [18]:
var scoredLr = sparkPipelineLr.transform(validationDataset)
scoredLr = mleapLinearRegression.sparkTransform(scoredLr)
scoredLr.select("bathrooms", "bedrooms", "security_deposit", "number_of_reviews", "price", "price_prediction", "price_prediction_mleap").
where("bedrooms>0 and bathrooms>0").
limit(10).
show()
In [19]:
// Step 7. Save our MLeap pipeline to a directory
val mleapFileRf = new File(outputFileRf)
val mleapFileLr = new File(outputFileLr)
// if you want to save to S3
// val bundleWriter = S3BundleWriter(s3Path)
val bundleWriterRf = DirectoryBundle(mleapFileRf)
val bundleWriterLr = DirectoryBundle(mleapFileLr)
mleapFileRf.mkdirs()
mleapFileLr.mkdirs()
val serializer = MlJsonSerializer
serializer.serializeWithClass(mleapPipelineRf, bundleWriterRf)
serializer.serializeWithClass(mleapPipelineLr, bundleWriterLr)
Out[19]:
In [ ]:
// curl -v -XPOST \ Workspace/mleap (develop) Hollins-MacBook-Pro
// -H "content-type: application/json" \
// -d @/Users/hollinwilkins/Workspace/scratch/frame.json http://localhost:8080/transform
// sbt "server/run /tmp/transformer.rf.ml 8080"
// sbt "server/run /tmp/transformer.lr.ml 8081"
// curl -v -XPOST \ ~ Hollins-MacBook-Pro
// -H "content-type: application/json" \
// -d @/Users/hollinwilkins/Workspace/scratch/frame.json http://localhost:8080/transform
// curl -v -XPOST \ ~ Hollins-MacBook-Pro
// -H "content-type: application/json" \
// -d @/Users/hollinwilkins/Workspace/scratch/frame.json http://localhost:8081/transform
In [ ]:
/*
{
"schema": {
"fields": [{
"name": "bathrooms",
"dataType": "double"
}, {
"name": "bedrooms",
"dataType": "double"
}, {
"name": "security_deposit",
"dataType": "double"
}, {
"name": "cleaning_fee",
"dataType": "double"
}, {
"name": "extra_people",
"dataType": "double"
}, {
"name": "number_of_reviews",
"dataType": "double"
}, {
"name": "review_scores_rating",
"dataType": "double"
}, {
"name": "room_type",
"dataType": "string"
}, {
"name": "host_is_superhost",
"dataType": "string"
}, {
"name": "cancellation_policy",
"dataType": "string"
}, {
"name": "instant_bookable",
"dataType": "string"
}]
},
"rows": [[2.0, 3.0, 50.0, 30.0, 2.0, 56.0, 90.0, "Entire home/apt", "1.0", "strict", "1.0"]]
}
*/