Using the Lending Club data set, this demo will demonstrate how to:
In [5]:
%%configure -f
{"kind": "spark",
"driverMemory": "2048M",
"executorCores": 2,
"conf":{"spark.jars.packages":"org.apache.hadoop:hadoop-aws:2.7.3,ml.combust.mleap:mleap-spark_2.11:0.7.0,com.databricks:spark-avro_2.11:3.0.1", "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"}
}
In [2]:
// Spark Training Pipeline Libraries
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.feature.{StandardScaler, StringIndexer, VectorAssembler, PolynomialExpansion}
import org.apache.spark.ml.classification.{RandomForestClassifier, LogisticRegression, RandomForestClassificationModel}
import org.apache.spark.ml.{Pipeline, PipelineModel, Transformer, PipelineStage}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator}
import com.databricks.spark.avro._
// MLeap/Bundle.ML Serialization Libraries
import ml.combust.mleap.spark.SparkSupport._
import resource._
import ml.combust.bundle.BundleFile
import org.apache.spark.ml.bundle.SparkBundleContext
In [3]:
//val filePath = "s3a://mleap-demo/datasources/lending_club_2017060401.avro"
val filePath = "hdfs:///data/lending_club_2017060401.avro"
val dataset = spark.read.avro(filePath)
println("Total N Records: " + dataset.count())
In [4]:
println("Total N Approved Records: " + dataset.filter("approved == 1.0").count())
In [5]:
// View the individual records
dataset.select("loan_amount", "fico_score_group_fnl", "dti", "emp_length", "state", "approved", "loan_title").show(5)
In [6]:
dataset.registerTempTable("df")
println(dataset.count())
val datasetFnl = spark.sqlContext.sql(f"""
select
loan_amount,
fico_score_group_fnl,
case when dti >= 10.0
then 10.0
else dti
end as dti,
emp_length,
case when state in ('CA', 'NY', 'MN', 'IL', 'FL', 'WA', 'MA', 'TX', 'GA', 'OH', 'NJ', 'VA', 'MI')
then state
else 'Other'
end as state,
loan_title,
approved
from df
where loan_title in('Debt Consolidation', 'Other', 'Home/Home Improvement', 'Payoff Credit Card', 'Car Payment/Loan',
'Business Loan', 'Health/Medical', 'Moving', 'Wedding/Engagement', 'Vacation', 'College', 'Renewable Energy', 'Payoff Bills',
'Personal Loan', 'Motorcycle')
""")
datasetFnl.registerTempTable("dfFnl")
println(datasetFnl.count())
In [7]:
// Most popular cities (original dataset)
spark.sqlContext.sql(f"""
select
state,
count(*) as n,
cast(avg(loan_amount) as decimal(12,2)) as loan_amount,
cast(avg(dti) as decimal(12,2)) as dti,
avg(cast(approved as decimal(12,2))) as approved
from dfFnl
group by state
order by avg(cast(approved as decimal(12,2))) desc
""").show(15)
In [8]:
// Most popular cities (original dataset)
spark.sqlContext.sql(f"""
select
loan_title,
count(*) as n,
cast(avg(loan_amount) as decimal(12,2)) as loan_amount,
cast(avg(dti) as decimal(12,2)) as dti,
avg(cast(approved as decimal(12,2))) as approved
from dfFnl
group by loan_title
order by avg(cast(approved as decimal(12,2))) desc
""").show(15)
In [9]:
// Step 2. Create our feature pipeline and train it on the entire dataset
val continuousFeatures = Array("loan_amount", "dti")
val categoricalFeatures = Array("loan_title",
"emp_length",
"state",
"fico_score_group_fnl")
val allFeatures = continuousFeatures.union(categoricalFeatures)
In [10]:
// Filter all null values
val allCols = allFeatures.union(Seq("approved")).map(datasetFnl.col)
val nullFilter = allCols.map(_.isNotNull).reduce(_ && _)
val datasetImputedFiltered = datasetFnl.select(allCols: _*).filter(nullFilter).persist()
println(datasetImputedFiltered.count())
In [11]:
val Array(trainingDataset, validationDataset) = datasetImputedFiltered.randomSplit(Array(0.7, 0.3))
In [12]:
// Pipeline 1
val continuousFeatureAssembler = new VectorAssembler(uid = "continuous_feature_assembler").
setInputCols(continuousFeatures).
setOutputCol("unscaled_continuous_features")
val continuousFeatureScaler = new StandardScaler(uid = "continuous_feature_scaler").
setInputCol("unscaled_continuous_features").
setOutputCol("scaled_continuous_features")
// Pipeline 2
val polyExpansionAssembler = new VectorAssembler(uid = "poly_expansion_feature_assembler").
setInputCols(Array("loan_amount", "dti")).
setOutputCol("poly_expansions_features")
val continuousFeaturePolynomialExpansion = new PolynomialExpansion(uid = "polynomial_expansion_loan_amount").
setInputCol("poly_expansions_features").
setOutputCol("loan_amount_polynomial_expansion_features")
In [13]:
val categoricalFeatureIndexers = categoricalFeatures.map {
feature => new StringIndexer(uid = s"string_indexer_$feature").
setInputCol(feature).
setOutputCol(s"${feature}_index")
}
val categoricalFeatureOneHotEncoders = categoricalFeatureIndexers.map {
indexer => new OneHotEncoder(uid = s"oh_encoder_${indexer.getOutputCol}").
setInputCol(indexer.getOutputCol).
setOutputCol(s"${indexer.getOutputCol}_oh")
}
In [14]:
// Assemble Feature Vector For Random Forest Classifier
val featureColsRf = categoricalFeatureIndexers.map(_.getOutputCol).union(Seq("scaled_continuous_features", "loan_amount_polynomial_expansion_features"))
// Assemble Feature Vector For Logistic Regression
val featureColsLr = categoricalFeatureOneHotEncoders.map(_.getOutputCol).union(Seq("scaled_continuous_features"))
// Vector-assemble all categorical and continuous features into a single feature vector
val featureAssemblerLr = new VectorAssembler(uid = "feature_assembler_lr").
setInputCols(featureColsLr).
setOutputCol("features_lr")
val featureAssemblerRf = new VectorAssembler(uid = "feature_assembler_rf").
setInputCols(featureColsRf).
setOutputCol("features_rf")
// Define an array of all of our feature estimators that we need to fit
val estimators: Array[PipelineStage] = Array(continuousFeatureAssembler, continuousFeatureScaler, polyExpansionAssembler, continuousFeaturePolynomialExpansion).
union(categoricalFeatureIndexers).
union(categoricalFeatureOneHotEncoders).
union(Seq(featureAssemblerLr, featureAssemblerRf))
// Build our pipeline based on the estimators
val featurePipeline = new Pipeline(uid = "feature_pipeline").
setStages(estimators)
// Fit our pipeline
val sparkFeaturePipelineModel = featurePipeline.fit(trainingDataset)
println("Finished constructing the pipeline")
In [21]:
val dfWithFeatures = sparkFeaturePipelineModel.transform(trainingDataset)
// Explore our generated features - Linear Regression
dfWithFeatures.select("features_lr").head(1).head.get(0)
In [16]:
// Explore our generated features - Linear Regression
dfWithFeatures.select("features_rf").head(1)
In [ ]:
In this section we are going to train a Logistic Regression model and a Random Forest Classifier model on the training data and test their accuracy on the validation dataset.
In [22]:
// Step 3.1 Create our random forest model
val randomForest = new RandomForestClassifier(uid = "random_forest_classifier").
setFeaturesCol("features_rf").
setLabelCol("approved").
setPredictionCol("approved_prediction").
setNumTrees(5).
setMaxDepth(5)
val sparkPipelineEstimatorRf = new Pipeline().setStages(Array(sparkFeaturePipelineModel, randomForest))
val sparkPipelineRf = sparkPipelineEstimatorRf.fit(trainingDataset)
println("Complete: Training Random Forest")
In [23]:
val randomForestModel = randomForest.fit(dfWithFeatures)
randomForestModel.toDebugString
In [24]:
randomForestModel.extractParamMap
In [27]:
val logisticRegression = new LogisticRegression(uid = "logistic_regression").
setFeaturesCol("features_lr").
setLabelCol("approved").
setPredictionCol("approved_prediction")
val sparkPipelineEstimatorLr = new Pipeline().setStages(Array(sparkFeaturePipelineModel, logisticRegression))
val sparkPipelineLr = sparkPipelineEstimatorLr.fit(trainingDataset)
println("Complete: Training Logistic Regression")
In [25]:
// Run the feature transformers and score the validation dataset with RF model
val validationDataWithPrediction = sparkPipelineRf.transform(validationDataset)
In [26]:
// Compute the Area Under the Receiver Operating Characteristics (ROC) Curve
val evaluator = new BinaryClassificationEvaluator().
setLabelCol(randomForest.getLabelCol).
setRawPredictionCol(randomForest.getRawPredictionCol).
setMetricName("areaUnderROC")
val accuracy = evaluator.evaluate(validationDataWithPrediction)
println("Accuracy: " + accuracy)
In [28]:
// Run the feature transformers and score the validation dataset with LR model
val validationDataWithPrediction = sparkPipelineLr.transform(validationDataset)
In [29]:
val evaluator = new BinaryClassificationEvaluator().
setLabelCol(logisticRegression.getLabelCol).
setRawPredictionCol(logisticRegression.getRawPredictionCol).
setMetricName("areaUnderROC")
val accuracy = evaluator.evaluate(validationDataWithPrediction)
println("Accuracy: " + accuracy)
In [ ]:
In [ ]:
// Define our ParamGridBuilder
val paramGrid = new ParamGridBuilder().
addGrid(randomForest.maxDepth, Array(6, 8, 10)).
addGrid(randomForest.numTrees, Array(10, 20, 25)).
build()
In [ ]:
// Define our Validator
val evaluator = new BinaryClassificationEvaluator().
setLabelCol(randomForest.getLabelCol).
setRawPredictionCol(randomForest.getRawPredictionCol).
setMetricName("areaUnderROC")
In [ ]:
// Define the CrossValidator
val cv = new CrossValidator().
setEstimator(sparkPipelineEstimatorRf).
setEvaluator(evaluator).
setEstimatorParamMaps(paramGrid).
setNumFolds(3)
val cvPipeline = cv.fit(trainingDataset)
In [ ]:
val bestPipeline = cvPipeline.bestModel.asInstanceOf[PipelineModel]
bestPipeline.stages
In [ ]:
val bestRf = bestPipeline.stages(1).asInstanceOf[RandomForestClassificationModel]
bestRf
In [ ]:
println("Best Number of Trees: " + bestRf.getNumTrees)
In [ ]:
println("Best Max Depth: " + bestRf.getMaxDepth)
In [ ]:
println(bestRf.explainParams)
In [ ]:
val validationDataWithPrediction = bestPipeline.transform(validationDataset)
val evaluator = new BinaryClassificationEvaluator().
setLabelCol(logisticRegression.getLabelCol).
setRawPredictionCol(logisticRegression.getRawPredictionCol).
setMetricName("areaUnderROC")
val accuracy = evaluator.evaluate(validationDataWithPrediction)
println("Accuracy: " + accuracy)
In [ ]:
// Compute the bundle context, which contains the metadata that describes how we trained the pipeline
val sbc = SparkBundleContext().withDataset(sparkPipelineLr.transform(datasetImputedFiltered))
for(bf <- managed(BundleFile("jar:file:/tmp/lc.model.lr_demo.zip"))) {
sparkPipelineLr.writeBundle.save(bf)(sbc).get
}
In [ ]:
val sbcRf = SparkBundleContext().withDataset(sparkPipelineRf.transform(datasetImputedFiltered))
for(bf <- managed(BundleFile("jar:file:/tmp/lc.model.rf.zip"))) {
sparkPipelineRf.writeBundle.save(bf)(sbcRf).get
}
In [8]:
import sys.process._
// Use curl to upload our model to the Heroku app
// Make sure to replace <App Name> with the name
// Heroku assigned to your app
val herokuAppName = "frozen-sands-62833"
val command = Seq("curl", "-XPUT",
"-H", "content-type: application/binary",
"--data-binary", "@/tmp/lc.model.lr_demo.zip",
s"http://$herokuAppName.herokuapp.com/model")
val output = command !!
// You should see an empty set of curly braces if this command succeeds
// "{}"
println(output)
In [ ]: