In [ ]:
import ibmos2spark

# @hidden_cell
credentials = {
    'auth_url': 'https://identity.open.softlayer.com',
    'project_id': '6aaf54352357483486ee2d4981f8ef15',
    'region': 'dallas',
    'user_id': 'b160340071b3407ca50c6b9a46b0bb25',
    'username': 'member_b092a5c6f5c11f819059a83dfbd5d922b8a2299b',
    'password': 'qwN4Y5EM*0KuZck['
}

configuration_name = 'os_d3bd5b94a9334de59a55a7fed2bedeaa_configs'
bmos = ibmos2spark.bluemix(sc, credentials, configuration_name)

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Please read the documentation of PySpark to learn more about the possibilities to load data files.
# PySpark documentation: https://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession
# The SparkSession object is already initalized for you.
# The following variable contains the path to your file on your Object Storage.
path_1 = bmos.url('dwlive', 'part-00000-ddf48c42-d9ea-4d72-aa4e-0b879587b372.snappy.parquet')

In [ ]:
path_2 = bmos.url('dwlive', 'part-00000-abf604d6-bc0a-4ea3-8ffa-7838c5912fa2.snappy.parquet')
df_categorical = spark.read.parquet(path_2)

In [ ]:
df_numeric = spark.read.parquet(path_1)

In [ ]:
df_categorical.printSchema()

In [ ]:
df_numeric.printSchema()

In [ ]:
df_numeric.first()

In [ ]:
df_categorical.createOrReplaceTempView("dfcat")
dfcat = spark.sql("select Id, L0_S22_F545 from dfcat")

In [ ]:
df_numeric.createOrReplaceTempView("dfnum")
dfnum = spark.sql("select Id,L0_S0_F0,L0_S0_F2,L0_S0_F4,Response from dfnum")

In [ ]:
df = dfcat.join(dfnum,"Id")
df.createOrReplaceTempView("df")

df_notnull = spark.sql("""
select
    Response as label,
    case 
       when L0_S22_F545 is null then 'NA'
       when L0_S22_F545 = '' then 'NA' 
       else L0_S22_F545 end as L0_S22_F545, 
    case
       when L0_S0_F0 is null then 0.0 
       else L0_S0_F0 end as L0_S0_F0, 
    case
       when L0_S0_F2 is null then 0.0 
       else L0_S0_F2 end as L0_S0_F2,
    case
       when L0_S0_F4 is null then 0.0 
       else L0_S0_F4 end as L0_S0_F4
from df
""")

In [ ]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer() \
  .setInputCol("L0_S22_F545") \
  .setOutputCol("L0_S22_F545Index")

indexed = indexer.setHandleInvalid("skip").fit(df_notnull).transform(df_notnull)

In [ ]:
indexed.printSchema()

In [ ]:
indexed.first()

In [ ]:
indexed.select("L0_S22_F545Index").show()

In [ ]:
indexed.select("L0_S22_F545Index").distinct().show()

In [ ]:
encoder = OneHotEncoder().setInputCol("L0_S22_F545Index").setOutputCol("L0_S22_F545Vec")
encoded = encoder.transform(indexed)

In [ ]:
encoded.printSchema()

In [ ]:
encoded.first().L0_S22_F545Vec

In [ ]:
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

transformers = [
    indexer,
    encoder,
    VectorAssembler()
        .setInputCols(["L0_S22_F545Vec", "L0_S0_F0", "L0_S0_F2","L0_S0_F4"])
        .setOutputCol("features")
]

In [ ]:
pipeline = Pipeline().setStages(transformers).fit(df_notnull)

transformed = pipeline.transform(df_notnull)

In [ ]:
transformed.printSchema()

In [ ]:
transformed.first().features

In [ ]:
from pyspark.ml.classification import RandomForestClassifier
rf =RandomForestClassifier() \
  .setLabelCol("label") \
  .setFeaturesCol("features")

model = Pipeline().setStages([rf]).fit(transformed)

result = model.transform(transformed)

In [ ]:
import pixiedust
display(result)

In [ ]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluation = evaluator.evaluate(result)

In [ ]:
evaluation

In [ ]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [3,5]) \
    .addGrid(rf.featureSubsetStrategy, ["auto","all"]) \
    .addGrid(rf.impurity, ["gini","entropy"]) \
    .addGrid(rf.maxBins, [2,5]) \
    .addGrid(rf.maxDepth, [3,5]) \
    .build()

In [ ]:
transformed_sampled = transformed.sample(False,0.001)

In [ ]:
transformed_sampled.count()

In [ ]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(transformed)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [ ]:
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
/**var paramGrid = new ParamGridBuilder()
    .addGrid(rf.numTrees, 3 :: 5 :: 10 :: 30 :: 50 :: 70 :: 100 :: 150 :: Nil)
    .addGrid(rf.featureSubsetStrategy, "auto" :: "all" :: "sqrt" :: "log2" :: "onethird" :: Nil)
    .addGrid(rf.impurity, "gini" :: "entropy" :: Nil)    
    .addGrid(rf.maxBins, 2 :: 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: Nil)
    .addGrid(rf.maxDepth, 3 :: 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: Nil)
    .build()*/

var paramGrid = new ParamGridBuilder().
    addGrid(rf.numTrees, 3 :: 5 :: 10 :: Nil).
    addGrid(rf.featureSubsetStrategy, "auto" :: "all" :: Nil).
    addGrid(rf.impurity, "gini" :: "entropy" :: Nil).    
    addGrid(rf.maxBins, 2 :: 5 :: Nil).
    addGrid(rf.maxDepth, 3 :: 5 :: Nil).
    build()

In [ ]:
//Model is created
    var crossValidatorModel = crossValidator.fit(df_notnull)
    //Model used to Predict
    var newPredictions = crossValidatorModel.transform(df_notnull)


    var newAucTest = evaluator.evaluate(newPredictions, evaluatorParamMap)
    println("new AUC (with Cross Validation) " + newAucTest)
    var bestModel = crossValidatorModel.bestModel

    //Understand the Model selected
    println()
    println("Parameters for Best Model:")

    var bestPipelineModel = crossValidatorModel.bestModel.asInstanceOf[PipelineModel]
    var stages = bestPipelineModel.stages

In [ ]:
import org.apache.spark.ml.classification.RandomForestClassificationModel
    val rfStage = stages(stages.length-1).asInstanceOf[RandomForestClassificationModel]
rfStage.getNumTrees
rfStage.getFeatureSubsetStrategy
rfStage.getImpurity
rfStage.getMaxBins
rfStage.getMaxDepth

In [ ]: