In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
In [2]:
cuse = spark.read.csv('data/cuse_binary.csv', header=True, inferSchema=True)
cuse.show(5)
In [3]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
categorical_columns = cuse.columns[:-1]
categorical_columns
Out[3]:
In [4]:
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='stringindexed_' + c) for c in categorical_columns]
# encode label column and add it to stringindexer stages
stringindexer_stages += [StringIndexer(inputCol='y', outputCol='label')]
In [5]:
onehotencoder_stages = [OneHotEncoder(inputCol='stringindexed_' + c, outputCol='onehot_'+c) for c in categorical_columns]
In [6]:
feature_columns = ['onehot_' + c for c in categorical_columns]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')
In [7]:
all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]
pipeline = Pipeline(stages=all_stages)
In [8]:
pipeline_model = pipeline.fit(cuse)
In [9]:
final_columns = feature_columns + ['features', 'label']
cuse_df = pipeline_model.transform(cuse).select(final_columns)
cuse_df.show(5)
In [10]:
train, test = cuse_df.randomSplit([0.8, 0.2], seed=1234)
In [11]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(featuresCol='features', labelCol='label')
In [17]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
addGrid(gbt.maxDepth, [2, 3, 4]).\
addGrid(gbt.minInfoGain, [0.0, 0.1, 0.2, 0.3]).\
addGrid(gbt.stepSize, [0.05, 0.1, 0.2, 0.4]).\
build()
In [21]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction')
In [22]:
from pyspark.ml.tuning import CrossValidator
crossvalidation = CrossValidator(estimator=gbt, estimatorParamMaps=param_grid, evaluator=evaluator)
In [23]:
crossvalidation_mod = crossvalidation.fit(cuse_df)
In [24]:
pred_train = crossvalidation_mod.transform(train)
pred_train.show(5)
In [25]:
pred_test = crossvalidation_mod.transform(test)
pred_test.show(5)
In [26]:
print('Accuracy on training data (areaUnderROC): ', evaluator.setMetricName('areaUnderROC').evaluate(pred_train), "\n"
'Accuracy on training data (areaUnderROC): ', evaluator.setMetricName('areaUnderROC').evaluate(pred_test))
In [27]:
label_pred_train = pred_train.select('label', 'prediction')
label_pred_train.rdd.zipWithIndex().countByKey()
Out[27]:
In [28]:
label_pred_test = pred_test.select('label', 'prediction')
label_pred_test.rdd.zipWithIndex().countByKey()
Out[28]:
In [30]:
print('max depth: ', crossvalidation_mod.bestModel._java_obj.getMaxDepth(), "\n",
'min information gain: ', crossvalidation_mod.bestModel._java_obj.getMinInfoGain(), "\n",
'step size: ', crossvalidation_mod.bestModel._java_obj.getStepSize())
In [ ]: