Spark ML Pipelines - Lending Club Demo

Using the Lending Club data set, this demo will demonstrate how to:

  1. Load the research dataset from s3
  2. Perform feature extraction using Spark ML APIs
  3. Train a Logistic Regression and a Random Forest Classifier
  4. Perform Hyperparameter tuning using ParamGrid, Binary Classification Evaluator, and a Cross Validator
  5. Export the best pipeline (feature transformers and LR/RF models) to an MLeap Bundle, which we'll use to deploy the pipeline to an RESTful API service.

Cluster Configuration


In [5]:
%%configure -f
{"kind": "spark",
"driverMemory": "2048M",
"executorCores": 2,
 "conf":{"spark.jars.packages":"org.apache.hadoop:hadoop-aws:2.7.3,ml.combust.mleap:mleap-spark_2.11:0.7.0,com.databricks:spark-avro_2.11:3.0.1", "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"}
}


Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
84application_1497282488964_0084sparkidleLinkLink
SparkSession available as 'spark'.
Current session configs: {'conf': {'spark.jars.packages': 'org.apache.hadoop:hadoop-aws:2.7.3,ml.combust.mleap:mleap-spark_2.11:0.7.0,com.databricks:spark-avro_2.11:3.0.1', 'spark.jars.excludes': 'org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11'}, 'driverMemory': '2048M', 'kind': 'spark', 'executorCores': 2}
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
81application_1497282488964_0081sparkidleLinkLink
82application_1497282488964_0082sparkidleLinkLink
84application_1497282488964_0084sparkidleLinkLink

Part 2: Spark ML Feature Extraction

Load Required Libraries


In [2]:
// Spark Training Pipeline Libraries
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.feature.{StandardScaler, StringIndexer, VectorAssembler, PolynomialExpansion}
import org.apache.spark.ml.classification.{RandomForestClassifier, LogisticRegression, RandomForestClassificationModel}
import org.apache.spark.ml.{Pipeline, PipelineModel, Transformer, PipelineStage}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator}
import com.databricks.spark.avro._

// MLeap/Bundle.ML Serialization Libraries
import ml.combust.mleap.spark.SparkSupport._
import resource._
import ml.combust.bundle.BundleFile
import org.apache.spark.ml.bundle.SparkBundleContext


Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
80application_1497282488964_0080sparkidleLinkLink
SparkSession available as 'spark'.
import org.apache.spark.ml.bundle.SparkBundleContext

Step 1: Import and Explore the Research Dataset


In [3]:
//val filePath = "s3a://mleap-demo/datasources/lending_club_2017060401.avro"
val filePath = "hdfs:///data/lending_club_2017060401.avro"
val dataset = spark.read.avro(filePath)

println("Total N Records: " + dataset.count())


Total N Records: 797987

In [4]:
println("Total N Approved Records: " + dataset.filter("approved == 1.0").count())


Total N Approved Records: 42535

In [5]:
// View the individual records
dataset.select("loan_amount", "fico_score_group_fnl", "dti", "emp_length", "state", "approved", "loan_title").show(5)


+-----------+--------------------+-----+----------+-----+--------+-------------+
|loan_amount|fico_score_group_fnl|  dti|emp_length|state|approved|   loan_title|
+-----------+--------------------+-----+----------+-----+--------+-------------+
|     5000.0|           650 - 700|27.65| 10+ years|   AZ|     1.0|  Electronics|
|     2500.0|           600 - 650|  1.0|  < 1 year|   GA|     1.0|   Motorcycle|
|     2400.0|           600 - 650| 8.72| 10+ years|   IL|     1.0|Business Loan|
|    10000.0|           600 - 650| 20.0| 10+ years|   CA|     1.0|Personal Loan|
|     3000.0|           650 - 700|17.94|    1 year|   OR|     1.0|Personal Loan|
+-----------+--------------------+-----+----------+-----+--------+-------------+
only showing top 5 rows

Alter the Data Slightly For The Demo

  • Cap DTI
  • Limit the number of states for OneHotEncoding
  • Keep only certain loan categories

In [6]:
dataset.registerTempTable("df")
println(dataset.count())

val datasetFnl = spark.sqlContext.sql(f"""
    select
        loan_amount,
        fico_score_group_fnl,
        case when dti >= 10.0
            then 10.0
            else dti
        end as dti,
        emp_length,
        case when state in ('CA', 'NY', 'MN', 'IL', 'FL', 'WA', 'MA', 'TX', 'GA', 'OH', 'NJ', 'VA', 'MI')
            then state
            else 'Other'
        end as state,
        loan_title,
        approved
    from df
    where loan_title in('Debt Consolidation', 'Other', 'Home/Home Improvement', 'Payoff Credit Card', 'Car Payment/Loan',
    'Business Loan', 'Health/Medical', 'Moving', 'Wedding/Engagement', 'Vacation', 'College', 'Renewable Energy', 'Payoff Bills',
    'Personal Loan', 'Motorcycle')
""")

datasetFnl.registerTempTable("dfFnl")
println(datasetFnl.count())


796751

Summary Statistics


In [7]:
// Most popular cities (original dataset)

spark.sqlContext.sql(f"""
    select 
        state,
        count(*) as n,
        cast(avg(loan_amount) as decimal(12,2)) as loan_amount,
        cast(avg(dti) as decimal(12,2)) as dti,
        avg(cast(approved as decimal(12,2))) as approved
    from dfFnl
    group by state
    order by avg(cast(approved as decimal(12,2))) desc
""").show(15)


+-----+------+-----------+----+--------+
|state|     n|loan_amount| dti|approved|
+-----+------+-----------+----+--------+
|   MA| 20217|   12371.90|0.86|0.070782|
|   CA|107084|   13524.24|0.87|0.069021|
|   NJ| 29613|   13781.77|0.84|0.066761|
|   NY| 64701|   13120.19|0.79|0.062565|
|   WA| 14878|   13517.80|0.80|0.059282|
|   VA| 25018|   12865.42|0.87|0.059197|
|   MN| 11036|   12790.27|0.80|0.058536|
|   IL| 33109|   13138.97|0.73|0.050198|
|   FL| 63016|   12390.05|0.70|0.048543|
|   GA| 30428|   12307.86|0.74|0.048541|
|Other|281543|   12508.86|0.70|0.046622|
|   TX| 64858|   13090.81|0.73|0.044466|
|   OH| 29796|   12094.84|0.71|0.044234|
|   MI| 21454|   12582.70|0.62|0.036590|
+-----+------+-----------+----+--------+

In [8]:
// Most popular cities (original dataset)

spark.sqlContext.sql(f"""
    select 
        loan_title,
        count(*) as n,
        cast(avg(loan_amount) as decimal(12,2)) as loan_amount,
        cast(avg(dti) as decimal(12,2)) as dti,
        avg(cast(approved as decimal(12,2))) as approved
    from dfFnl
    group by loan_title
    order by avg(cast(approved as decimal(12,2))) desc
""").show(15)


+--------------------+------+-----------+----+--------+
|          loan_title|     n|loan_amount| dti|approved|
+--------------------+------+-----------+----+--------+
|          Motorcycle|   732|    7059.05|2.49|0.289617|
|        Payoff Bills|  3162|   10763.05|2.61|0.271980|
|       Personal Loan|  2981|    9366.92|2.24|0.243207|
|  Debt Consolidation|315519|   14769.15|0.90|0.070924|
|               Other|190221|    9442.56|0.86|0.054605|
|  Wedding/Engagement| 13568|   10148.23|0.63|0.051739|
|             College|  7989|    7923.65|0.83|0.044812|
|  Payoff Credit Card| 60197|   15821.83|0.70|0.039387|
|Home/Home Improve...| 60779|   15176.65|0.42|0.028085|
|      Health/Medical| 20615|    7470.27|0.43|0.023575|
|    Car Payment/Loan| 47387|   10308.56|0.38|0.021504|
|            Vacation|  9905|    5626.50|0.38|0.017769|
|              Moving| 18733|    6621.09|0.38|0.014573|
|       Business Loan| 41792|   18151.20|0.36|0.012132|
|    Renewable Energy|  3171|    9786.62|0.34|0.001892|
+--------------------+------+-----------+----+--------+

Step 2: Define continous and categorical features and filter nulls


In [9]:
// Step 2. Create our feature pipeline and train it on the entire dataset
val continuousFeatures = Array("loan_amount", "dti")

val categoricalFeatures = Array("loan_title",
  "emp_length",
  "state",
  "fico_score_group_fnl")

val allFeatures = continuousFeatures.union(categoricalFeatures)


allFeatures: Array[String] = Array(loan_amount, dti, loan_title, emp_length, state, fico_score_group_fnl)

In [10]:
// Filter all null values
val allCols = allFeatures.union(Seq("approved")).map(datasetFnl.col)
val nullFilter = allCols.map(_.isNotNull).reduce(_ && _)
val datasetImputedFiltered = datasetFnl.select(allCols: _*).filter(nullFilter).persist()

println(datasetImputedFiltered.count())


796751

Step 3: Split data into training and validation


In [11]:
val Array(trainingDataset, validationDataset) = datasetImputedFiltered.randomSplit(Array(0.7, 0.3))


trainingDataset: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [loan_amount: double, dti: double ... 5 more fields]
validationDataset: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [loan_amount: double, dti: double ... 5 more fields]

Step 4: Continous Feature Pipelines

Pipeline 1: Scale Features

- VectorAssembler
- StandardScaler

Pipeline 2: Polynomial Expansion

- VectorAssembler
- PolynomialExpansion

In [12]:
// Pipeline 1
val continuousFeatureAssembler = new VectorAssembler(uid = "continuous_feature_assembler").
    setInputCols(continuousFeatures).
    setOutputCol("unscaled_continuous_features")

val continuousFeatureScaler = new StandardScaler(uid = "continuous_feature_scaler").
    setInputCol("unscaled_continuous_features").
    setOutputCol("scaled_continuous_features")

// Pipeline 2
val polyExpansionAssembler = new VectorAssembler(uid = "poly_expansion_feature_assembler").
    setInputCols(Array("loan_amount", "dti")).
    setOutputCol("poly_expansions_features")

val continuousFeaturePolynomialExpansion = new PolynomialExpansion(uid = "polynomial_expansion_loan_amount").
    setInputCol("poly_expansions_features").
    setOutputCol("loan_amount_polynomial_expansion_features")


continuousFeaturePolynomialExpansion: org.apache.spark.ml.feature.PolynomialExpansion = polynomial_expansion_loan_amount

Step 5: Categorical Feature Pipeline

Pipeline 3: String Index + One-Hot-Encode each categorical feature

- StringIndexer
- OneHotEncoder

In [13]:
val categoricalFeatureIndexers = categoricalFeatures.map {
    feature => new StringIndexer(uid = s"string_indexer_$feature").
      setInputCol(feature).
      setOutputCol(s"${feature}_index")
}

val categoricalFeatureOneHotEncoders = categoricalFeatureIndexers.map {
    indexer => new OneHotEncoder(uid = s"oh_encoder_${indexer.getOutputCol}").
      setInputCol(indexer.getOutputCol).
      setOutputCol(s"${indexer.getOutputCol}_oh")
}


categoricalFeatureOneHotEncoders: Array[org.apache.spark.ml.feature.OneHotEncoder] = Array(oh_encoder_loan_title_index, oh_encoder_emp_length_index, oh_encoder_state_index, oh_encoder_fico_score_group_fnl_index)

Step 6: Assemble our features and feature pipeline

Combine Pipeline 1, Pipeline 2, and Pipeline 3


In [14]:
// Assemble Feature Vector For Random Forest Classifier
val featureColsRf = categoricalFeatureIndexers.map(_.getOutputCol).union(Seq("scaled_continuous_features", "loan_amount_polynomial_expansion_features"))

// Assemble Feature Vector For Logistic Regression
val featureColsLr = categoricalFeatureOneHotEncoders.map(_.getOutputCol).union(Seq("scaled_continuous_features"))

// Vector-assemble all categorical and continuous features into a single feature vector
val featureAssemblerLr = new VectorAssembler(uid = "feature_assembler_lr").
    setInputCols(featureColsLr).
    setOutputCol("features_lr")
    
val featureAssemblerRf = new VectorAssembler(uid = "feature_assembler_rf").
    setInputCols(featureColsRf).
    setOutputCol("features_rf")

// Define an array of all of our feature estimators that we need to fit
val estimators: Array[PipelineStage] = Array(continuousFeatureAssembler, continuousFeatureScaler, polyExpansionAssembler, continuousFeaturePolynomialExpansion).
    union(categoricalFeatureIndexers).
    union(categoricalFeatureOneHotEncoders).
    union(Seq(featureAssemblerLr, featureAssemblerRf))

// Build our pipeline based on the estimators
val featurePipeline = new Pipeline(uid = "feature_pipeline").
    setStages(estimators)

// Fit our pipeline
val sparkFeaturePipelineModel = featurePipeline.fit(trainingDataset)

println("Finished constructing the pipeline")


Finished constructing the pipeline

In [21]:
val dfWithFeatures = sparkFeaturePipelineModel.transform(trainingDataset)
// Explore our generated features - Linear Regression
dfWithFeatures.select("features_lr").head(1).head.get(0)


res64: Any = (46,[0,20,25,42,44,45],[1.0,1.0,1.0,1.0,0.04701321420936851,-0.004627647986286722])

In [16]:
// Explore our generated features - Linear Regression
dfWithFeatures.select("features_rf").head(1)


res51: Array[org.apache.spark.sql.Row] = Array([[0.0,6.0,0.0,4.0,0.04701321420936851,-0.004627647986286722,500.0,250000.0,-0.01,-5.0,1.0E-4]])

In [ ]:

Part 3: Spark ML Training and Scoring

In this section we are going to train a Logistic Regression model and a Random Forest Classifier model on the training data and test their accuracy on the validation dataset.

Step 7: Train Random Forest Classifier


In [22]:
// Step 3.1 Create our random forest model
val randomForest = new RandomForestClassifier(uid = "random_forest_classifier").
    setFeaturesCol("features_rf").
    setLabelCol("approved").
    setPredictionCol("approved_prediction").
    setNumTrees(5).
    setMaxDepth(5)

val sparkPipelineEstimatorRf = new Pipeline().setStages(Array(sparkFeaturePipelineModel, randomForest))

val sparkPipelineRf = sparkPipelineEstimatorRf.fit(trainingDataset)

println("Complete: Training Random Forest")


Complete: Training Random Forest

In [23]:
val randomForestModel = randomForest.fit(dfWithFeatures)
randomForestModel.toDebugString


res69: String =
"RandomForestClassificationModel (uid=rfc_9df1a346bde4) with 5 trees
  Tree 0 (weight 1.0):
    If (feature 10 <= 7.617599999999999)
     If (feature 10 <= 0.4489000000000001)
      If (feature 1 in {2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0})
       If (feature 3 in {4.0,6.0})
        If (feature 1 in {2.0,3.0,4.0,5.0,6.0,8.0,9.0,10.0,11.0})
         Predict: 0.0
        Else (feature 1 not in {2.0,3.0,4.0,5.0,6.0,8.0,9.0,10.0,11.0})
         Predict: 0.0
       Else (feature 3 not in {4.0,6.0})
        If (feature 7 <= 9.9500625E7)
         Predict: 0.0
        Else (feature 7 > 9.9500625E7)
         Predict: 0.0
      Else (feature 1 not in {2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0})
       If (feature 5 <= 0.20236704644031833)
        If (feature 1 in {0.0})
       ...

In [24]:
randomForestModel.extractParamMap


res70: org.apache.spark.ml.param.ParamMap =
{
	rfc_9df1a346bde4-cacheNodeIds: false,
	rfc_9df1a346bde4-checkpointInterval: 10,
	rfc_9df1a346bde4-featureSubsetStrategy: auto,
	rfc_9df1a346bde4-featuresCol: features_rf,
	rfc_9df1a346bde4-impurity: gini,
	rfc_9df1a346bde4-labelCol: approved,
	rfc_9df1a346bde4-maxBins: 32,
	rfc_9df1a346bde4-maxDepth: 5,
	rfc_9df1a346bde4-maxMemoryInMB: 256,
	rfc_9df1a346bde4-minInfoGain: 0.0,
	rfc_9df1a346bde4-minInstancesPerNode: 1,
	rfc_9df1a346bde4-numTrees: 5,
	rfc_9df1a346bde4-predictionCol: approved_prediction,
	rfc_9df1a346bde4-probabilityCol: probability,
	rfc_9df1a346bde4-rawPredictionCol: rawPrediction,
	rfc_9df1a346bde4-seed: 207336481,
	rfc_9df1a346bde4-subsamplingRate: 1.0
}

Step 8: Train Logistic Regression Model


In [27]:
val logisticRegression = new LogisticRegression(uid = "logistic_regression").
    setFeaturesCol("features_lr").
    setLabelCol("approved").
    setPredictionCol("approved_prediction")

val sparkPipelineEstimatorLr = new Pipeline().setStages(Array(sparkFeaturePipelineModel, logisticRegression))

val sparkPipelineLr = sparkPipelineEstimatorLr.fit(trainingDataset)

println("Complete: Training Logistic Regression")


Complete: Training Logistic Regression

Step 9: Validate the Random Forest Classifier


In [25]:
// Run the feature transformers and score the validation dataset with RF model
val validationDataWithPrediction = sparkPipelineRf.transform(validationDataset)


validationDataWithPrediction: org.apache.spark.sql.DataFrame = [loan_amount: double, dti: double ... 22 more fields]

In [26]:
// Compute the Area Under the Receiver Operating Characteristics (ROC) Curve
val evaluator = new BinaryClassificationEvaluator().
    setLabelCol(randomForest.getLabelCol).
    setRawPredictionCol(randomForest.getRawPredictionCol).
    setMetricName("areaUnderROC")
    
val accuracy = evaluator.evaluate(validationDataWithPrediction)
println("Accuracy: " + accuracy)


Accuracy: 0.995613741352638

Step 10: Validate the Logistic Regression


In [28]:
// Run the feature transformers and score the validation dataset with LR model
val validationDataWithPrediction = sparkPipelineLr.transform(validationDataset)


validationDataWithPrediction: org.apache.spark.sql.DataFrame = [loan_amount: double, dti: double ... 22 more fields]

In [29]:
val evaluator = new BinaryClassificationEvaluator().
    setLabelCol(logisticRegression.getLabelCol).
    setRawPredictionCol(logisticRegression.getRawPredictionCol).
    setMetricName("areaUnderROC")
    
val accuracy = evaluator.evaluate(validationDataWithPrediction)
println("Accuracy: " + accuracy)


Accuracy: 0.9915699979510854

In [ ]:

Part 4: Spark ML Hyperparameter Tuning


In [ ]:
// Define our ParamGridBuilder
val paramGrid = new ParamGridBuilder().
      addGrid(randomForest.maxDepth, Array(6, 8, 10)).
      addGrid(randomForest.numTrees, Array(10, 20, 25)).
      build()

In [ ]:
// Define our Validator
val evaluator = new BinaryClassificationEvaluator().
    setLabelCol(randomForest.getLabelCol).
    setRawPredictionCol(randomForest.getRawPredictionCol).
    setMetricName("areaUnderROC")

In [ ]:
// Define the CrossValidator
val cv = new CrossValidator().
      setEstimator(sparkPipelineEstimatorRf).
      setEvaluator(evaluator).
      setEstimatorParamMaps(paramGrid).
      setNumFolds(3)
    
val cvPipeline = cv.fit(trainingDataset)

In [ ]:
val bestPipeline = cvPipeline.bestModel.asInstanceOf[PipelineModel]
bestPipeline.stages

In [ ]:
val bestRf = bestPipeline.stages(1).asInstanceOf[RandomForestClassificationModel]
bestRf

In [ ]:
println("Best Number of Trees: " + bestRf.getNumTrees)

In [ ]:
println("Best Max Depth: " + bestRf.getMaxDepth)

In [ ]:
println(bestRf.explainParams)

In [ ]:
val validationDataWithPrediction = bestPipeline.transform(validationDataset)

val evaluator = new BinaryClassificationEvaluator().
    setLabelCol(logisticRegression.getLabelCol).
    setRawPredictionCol(logisticRegression.getRawPredictionCol).
    setMetricName("areaUnderROC")
    
val accuracy = evaluator.evaluate(validationDataWithPrediction)
println("Accuracy: " + accuracy)

Part 5: Serializing ML Pipelines for Deployment


In [ ]:
// Compute the bundle context, which contains the metadata that describes how we trained the pipeline
val sbc = SparkBundleContext().withDataset(sparkPipelineLr.transform(datasetImputedFiltered))

for(bf <- managed(BundleFile("jar:file:/tmp/lc.model.lr_demo.zip"))) {
        sparkPipelineLr.writeBundle.save(bf)(sbc).get
      }

In [ ]:
val sbcRf = SparkBundleContext().withDataset(sparkPipelineRf.transform(datasetImputedFiltered))
for(bf <- managed(BundleFile("jar:file:/tmp/lc.model.rf.zip"))) {
        sparkPipelineRf.writeBundle.save(bf)(sbcRf).get
      }

Part 6: Deploy Model with cURL


In [8]:
import sys.process._

// Use curl to upload our model to the Heroku app
// Make sure to replace <App Name> with the name
// Heroku assigned to your app
val herokuAppName = "frozen-sands-62833"
val command = Seq("curl", "-XPUT",
                 "-H", "content-type: application/binary",
                 "--data-binary", "@/tmp/lc.model.lr_demo.zip",
                 s"http://$herokuAppName.herokuapp.com/model")
val output = command !!

// You should see an empty set of curly braces if this command succeeds
// "{}"
println(output)


{}

In [ ]: