Excercise 5 Spark


In [3]:
# Load libraries
import pandas as pd
import matplotlib.pyplot as plt
import time

In [4]:
# Load dataset
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data"
dataset = pd.read_csv(url, header=None)

In [5]:
# create Spark Dataframe from Pandas Dataframe
cancer_df = sqlContext.createDataFrame(dataset, schema=None)

In [6]:
print 'After parsing, number of training lines: {}'.format(cancer_df.count())
cancer_df.printSchema()


After parsing, number of training lines: 569
root
 |-- 0: long (nullable = true)
 |-- 1: string (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- 5: double (nullable = true)
 |-- 6: double (nullable = true)
 |-- 7: double (nullable = true)
 |-- 8: double (nullable = true)
 |-- 9: double (nullable = true)
 |-- 10: double (nullable = true)
 |-- 11: double (nullable = true)
 |-- 12: double (nullable = true)
 |-- 13: double (nullable = true)
 |-- 14: double (nullable = true)
 |-- 15: double (nullable = true)
 |-- 16: double (nullable = true)
 |-- 17: double (nullable = true)
 |-- 18: double (nullable = true)
 |-- 19: double (nullable = true)
 |-- 20: double (nullable = true)
 |-- 21: double (nullable = true)
 |-- 22: double (nullable = true)
 |-- 23: double (nullable = true)
 |-- 24: double (nullable = true)
 |-- 25: double (nullable = true)
 |-- 26: double (nullable = true)
 |-- 27: double (nullable = true)
 |-- 28: double (nullable = true)
 |-- 29: double (nullable = true)
 |-- 30: double (nullable = true)
 |-- 31: double (nullable = true)


In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Pipeline stages
stages = []


# Convert label into label indices using StringIndexer
label_stringIdx = StringIndexer(inputCol="1", outputCol = "label")
stages += [label_stringIdx]

In [8]:
# Transform all numerical features into a vector using VectorAssembler

# numeric cols 2:31
numeric_cols = ["{}".format(x) for x in range(2,32)]
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
stages += [assembler]

In [9]:
# Create a pipeline
pipeline = Pipeline(stages=stages)

pipeline_model = pipeline.fit(cancer_df)
cancer_transformed_df = pipeline_model.transform(cancer_df)



# Keep relevant columns
selected_cols = ["label", "features"]
cancer_final_df = cancer_transformed_df.select(selected_cols)
cancer_final_df.printSchema()
cancer_final_df.show(5)


root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|[17.99,10.38,122....|
|  1.0|[20.57,17.77,132....|
|  1.0|[19.69,21.25,130....|
|  1.0|[11.42,20.38,77.5...|
|  1.0|[20.29,14.34,135....|
+-----+--------------------+


In [10]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

scalerModel = scaler.fit(cancer_final_df)

#normalize each feature to have unit stdev
scaled_cancer_final_df = scalerModel.transform(cancer_final_df)
scaled_cancer_final_df.printSchema()
scaled_cancer_final_df.show(5)


root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)

+-----+--------------------+--------------------+
|label|            features|      scaledFeatures|
+-----+--------------------+--------------------+
|  1.0|[17.99,10.38,122....|[5.10492359418784...|
|  1.0|[20.57,17.77,132....|[5.83703603849048...|
|  1.0|[19.69,21.25,130....|[5.58732326679036...|
|  1.0|[11.42,20.38,77.5...|[3.24059074183575...|
|  1.0|[20.29,14.34,135....|[5.75758197476772...|
+-----+--------------------+--------------------+


In [11]:
# Substitute fetures with newly created scaledFeatures 
# (Spark Dataframe columns cannot be modified but a copy can be created and in next step a name can be changed)
scaled_cancer_final_df = scaled_cancer_final_df.selectExpr("label", "scaledFeatures as features")
scaled_cancer_final_df.printSchema()
scaled_cancer_final_df.show(5)


root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|[5.10492359418784...|
|  1.0|[5.83703603849048...|
|  1.0|[5.58732326679036...|
|  1.0|[3.24059074183575...|
|  1.0|[5.75758197476772...|
+-----+--------------------+


In [12]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = cancer_final_df.randomSplit([0.7, 0.3], seed = 1)

print trainingData.count()
print testData.count()


420
149

In [13]:
# Train a LogisticRegression model.
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)
# Train model
lr_model = lr.fit(trainingData)
# make predictions
predictions = lr_model.transform(testData)
# select example rows to display
predictions.show(5)
predictions.printSchema()


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  1.0|[20.57,17.77,132....|[-1.9032279000039...|[0.12974357579949...|       1.0|
|  1.0|[20.29,14.34,135....|[-2.6720690064567...|[0.06464175711414...|       1.0|
|  1.0|[18.25,19.98,119....|[-2.1792208875297...|[0.10163204110436...|       1.0|
|  1.0|[16.02,23.24,102....|[0.29558400444961...|[0.57336263675245...|       0.0|
|  1.0|[15.78,17.89,103....|[-1.9584906497444...|[0.12363048673566...|       1.0|
+-----+--------------------+--------------------+--------------------+----------+

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)


In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

#Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(str(evaluator.getMetricName()) + " : " + str(evaluator.evaluate(predictions)))


areaUnderROC : 0.987947269303

In [15]:
print(lr.explainParams())


elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
featuresCol: features column name (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name (default: label, current: label)
maxIter: max number of iterations (>= 0) (default: 100, current: 10)
predictionCol: prediction column name (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)
regParam: regularization parameter (>= 0) (default: 0.1)
threshold: threshold in binary classification prediction, in range [0, 1]. (default: 0.5)
tol: the convergence tolerance for iterative algorithms (default: 1e-06)

Now lets test the Cross Validator with a parameter grid.


In [28]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
                .addGrid(lr.regParam, [0.01, 0.1, 0.5, 2.0])
                .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                .addGrid(lr.maxIter, [1, 10, 100])
                .build())

In [29]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cv
cvModel = cv.fit(trainingData)

In [30]:
predictions2 = cvModel.transform(testData)
predictions2.show(5)
predictions2.printSchema()


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  1.0|[20.57,17.77,132....|[-3.7099755446092...|[0.02389325976166...|       1.0|
|  1.0|[20.29,14.34,135....|[-5.2965166269030...|[0.00498404666116...|       1.0|
|  1.0|[18.25,19.98,119....|[-4.8198505720329...|[0.00800342109037...|       1.0|
|  1.0|[16.02,23.24,102....|[-0.0350601111901...|[0.49123586993146...|       1.0|
|  1.0|[15.78,17.89,103....|[-4.3766108829897...|[0.01241188939894...|       1.0|
+-----+--------------------+--------------------+--------------------+----------+

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)


In [31]:
print(str(evaluator.getMetricName()) + " : " + str(evaluator.evaluate(predictions)))


areaUnderROC : 0.987947269303

We can see that of of the parameter grid no parameter combination was better than the default one. Unfortunately because of very long running time of CrossValidator and issues with the PySpark Kernel no finer parameter grid has been tested. For those parameter that were the following best combination can be extracted:


In [32]:
bestModel = cvModel.bestModel
print("Best Param <regParam>: {}".format(bestModel._java_obj.getRegParam()))
print("Best Param <elasticNetParam>: {}".format(bestModel._java_obj.getElasticNetParam()))
print("Best Param <maxIter>: {}".format(bestModel._java_obj.getMaxIter()))


Best Param <regParam>: 0.01
Best Param <elasticNetParam>: 0.0
Best Param <maxIter>: 10