In [ ]:
import ibmos2spark
# @hidden_cell
credentials = {
'auth_url': 'https://identity.open.softlayer.com',
'project_id': '6aaf54352357483486ee2d4981f8ef15',
'region': 'dallas',
'user_id': 'b160340071b3407ca50c6b9a46b0bb25',
'username': 'member_b092a5c6f5c11f819059a83dfbd5d922b8a2299b',
'password': 'qwN4Y5EM*0KuZck['
}
configuration_name = 'os_d3bd5b94a9334de59a55a7fed2bedeaa_configs'
bmos = ibmos2spark.bluemix(sc, credentials, configuration_name)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Please read the documentation of PySpark to learn more about the possibilities to load data files.
# PySpark documentation: https://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession
# The SparkSession object is already initalized for you.
# The following variable contains the path to your file on your Object Storage.
path_1 = bmos.url('dwlive', 'part-00000-ddf48c42-d9ea-4d72-aa4e-0b879587b372.snappy.parquet')
In [ ]:
path_2 = bmos.url('dwlive', 'part-00000-abf604d6-bc0a-4ea3-8ffa-7838c5912fa2.snappy.parquet')
df_categorical = spark.read.parquet(path_2)
In [ ]:
df_numeric = spark.read.parquet(path_1)
In [ ]:
df_categorical.printSchema()
In [ ]:
df_numeric.printSchema()
In [ ]:
df_numeric.first()
In [ ]:
df_categorical.createOrReplaceTempView("dfcat")
dfcat = spark.sql("select Id, L0_S22_F545 from dfcat")
In [ ]:
df_numeric.createOrReplaceTempView("dfnum")
dfnum = spark.sql("select Id,L0_S0_F0,L0_S0_F2,L0_S0_F4,Response from dfnum")
In [ ]:
df = dfcat.join(dfnum,"Id")
df.createOrReplaceTempView("df")
df_notnull = spark.sql("""
select
Response as label,
case
when L0_S22_F545 is null then 'NA'
when L0_S22_F545 = '' then 'NA'
else L0_S22_F545 end as L0_S22_F545,
case
when L0_S0_F0 is null then 0.0
else L0_S0_F0 end as L0_S0_F0,
case
when L0_S0_F2 is null then 0.0
else L0_S0_F2 end as L0_S0_F2,
case
when L0_S0_F4 is null then 0.0
else L0_S0_F4 end as L0_S0_F4
from df
""")
In [ ]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer() \
.setInputCol("L0_S22_F545") \
.setOutputCol("L0_S22_F545Index")
indexed = indexer.setHandleInvalid("skip").fit(df_notnull).transform(df_notnull)
In [ ]:
indexed.printSchema()
In [ ]:
indexed.first()
In [ ]:
indexed.select("L0_S22_F545Index").show()
In [ ]:
indexed.select("L0_S22_F545Index").distinct().show()
In [ ]:
encoder = OneHotEncoder().setInputCol("L0_S22_F545Index").setOutputCol("L0_S22_F545Vec")
encoded = encoder.transform(indexed)
In [ ]:
encoded.printSchema()
In [ ]:
encoded.first().L0_S22_F545Vec
In [ ]:
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
transformers = [
indexer,
encoder,
VectorAssembler()
.setInputCols(["L0_S22_F545Vec", "L0_S0_F0", "L0_S0_F2","L0_S0_F4"])
.setOutputCol("features")
]
In [ ]:
pipeline = Pipeline().setStages(transformers).fit(df_notnull)
transformed = pipeline.transform(df_notnull)
In [ ]:
transformed.printSchema()
In [ ]:
transformed.first().features
In [ ]:
from pyspark.ml.classification import RandomForestClassifier
rf =RandomForestClassifier() \
.setLabelCol("label") \
.setFeaturesCol("features")
model = Pipeline().setStages([rf]).fit(transformed)
result = model.transform(transformed)
In [ ]:
import pixiedust
display(result)
In [ ]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluation = evaluator.evaluate(result)
In [ ]:
evaluation
In [ ]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [3,5]) \
.addGrid(rf.featureSubsetStrategy, ["auto","all"]) \
.addGrid(rf.impurity, ["gini","entropy"]) \
.addGrid(rf.maxBins, [2,5]) \
.addGrid(rf.maxDepth, [3,5]) \
.build()
In [ ]:
transformed_sampled = transformed.sample(False,0.001)
In [ ]:
transformed_sampled.count()
In [ ]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations
cvModel = cv.fit(transformed)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing
In [ ]:
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
/**var paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, 3 :: 5 :: 10 :: 30 :: 50 :: 70 :: 100 :: 150 :: Nil)
.addGrid(rf.featureSubsetStrategy, "auto" :: "all" :: "sqrt" :: "log2" :: "onethird" :: Nil)
.addGrid(rf.impurity, "gini" :: "entropy" :: Nil)
.addGrid(rf.maxBins, 2 :: 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: Nil)
.addGrid(rf.maxDepth, 3 :: 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: Nil)
.build()*/
var paramGrid = new ParamGridBuilder().
addGrid(rf.numTrees, 3 :: 5 :: 10 :: Nil).
addGrid(rf.featureSubsetStrategy, "auto" :: "all" :: Nil).
addGrid(rf.impurity, "gini" :: "entropy" :: Nil).
addGrid(rf.maxBins, 2 :: 5 :: Nil).
addGrid(rf.maxDepth, 3 :: 5 :: Nil).
build()
In [ ]:
//Model is created
var crossValidatorModel = crossValidator.fit(df_notnull)
//Model used to Predict
var newPredictions = crossValidatorModel.transform(df_notnull)
var newAucTest = evaluator.evaluate(newPredictions, evaluatorParamMap)
println("new AUC (with Cross Validation) " + newAucTest)
var bestModel = crossValidatorModel.bestModel
//Understand the Model selected
println()
println("Parameters for Best Model:")
var bestPipelineModel = crossValidatorModel.bestModel.asInstanceOf[PipelineModel]
var stages = bestPipelineModel.stages
In [ ]:
import org.apache.spark.ml.classification.RandomForestClassificationModel
val rfStage = stages(stages.length-1).asInstanceOf[RandomForestClassificationModel]
rfStage.getNumTrees
rfStage.getFeatureSubsetStrategy
rfStage.getImpurity
rfStage.getMaxBins
rfStage.getMaxDepth
In [ ]: