Excercise 1 Task 5

Pipelines with Apache Spark and pyspark library


In [1]:
# Load libraries
import pandas as pd
import matplotlib.pyplot as plt
import time

In [2]:
# Load dataset
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data"
dataset = pd.read_csv(url, header=None)

In [3]:
# create Spark Dataframe from Pandas Dataframe
cancer_df = sqlContext.createDataFrame(dataset, schema=None)

In [4]:
print 'After parsing, number of training lines: {}'.format(cancer_df.count())
cancer_df.printSchema()


After parsing, number of training lines: 569
root
 |-- 0: long (nullable = true)
 |-- 1: string (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- 5: double (nullable = true)
 |-- 6: double (nullable = true)
 |-- 7: double (nullable = true)
 |-- 8: double (nullable = true)
 |-- 9: double (nullable = true)
 |-- 10: double (nullable = true)
 |-- 11: double (nullable = true)
 |-- 12: double (nullable = true)
 |-- 13: double (nullable = true)
 |-- 14: double (nullable = true)
 |-- 15: double (nullable = true)
 |-- 16: double (nullable = true)
 |-- 17: double (nullable = true)
 |-- 18: double (nullable = true)
 |-- 19: double (nullable = true)
 |-- 20: double (nullable = true)
 |-- 21: double (nullable = true)
 |-- 22: double (nullable = true)
 |-- 23: double (nullable = true)
 |-- 24: double (nullable = true)
 |-- 25: double (nullable = true)
 |-- 26: double (nullable = true)
 |-- 27: double (nullable = true)
 |-- 28: double (nullable = true)
 |-- 29: double (nullable = true)
 |-- 30: double (nullable = true)
 |-- 31: double (nullable = true)


In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Pipeline stages
stages = []


# Convert label into label indices using StringIndexer
label_stringIdx = StringIndexer(inputCol="1", outputCol = "label")
stages += [label_stringIdx]

In [6]:
# Transform all numerical features into a vector using VectorAssembler

# numeric cols 2:31
numeric_cols = ["{}".format(x) for x in range(2,32)]
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
stages += [assembler]

In [7]:
# Create a pipeline
pipeline = Pipeline(stages=stages)

pipeline_model = pipeline.fit(cancer_df)
cancer_transformed_df = pipeline_model.transform(cancer_df)



# Keep relevant columns
selected_cols = ["label", "features"]
cancer_final_df = cancer_transformed_df.select(selected_cols)
cancer_final_df.printSchema()
cancer_final_df.show(5)


root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|[17.99,10.38,122....|
|  1.0|[20.57,17.77,132....|
|  1.0|[19.69,21.25,130....|
|  1.0|[11.42,20.38,77.5...|
|  1.0|[20.29,14.34,135....|
+-----+--------------------+


In [8]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

scalerModel = scaler.fit(cancer_final_df)

#normalize each feature to have unit stdev
scaled_cancer_final_df = scalerModel.transform(cancer_final_df)
scaled_cancer_final_df.printSchema()
scaled_cancer_final_df.show(5)


root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)

+-----+--------------------+--------------------+
|label|            features|      scaledFeatures|
+-----+--------------------+--------------------+
|  1.0|[17.99,10.38,122....|[5.10492359418784...|
|  1.0|[20.57,17.77,132....|[5.83703603849048...|
|  1.0|[19.69,21.25,130....|[5.58732326679036...|
|  1.0|[11.42,20.38,77.5...|[3.24059074183575...|
|  1.0|[20.29,14.34,135....|[5.75758197476772...|
+-----+--------------------+--------------------+


In [10]:
scaled_cancer_final_df = scaled_cancer_final_df.selectExpr("label", "scaledFeatures as features")
scaled_cancer_final_df.printSchema()
scaled_cancer_final_df.show(5)

In [11]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = cancer_final_df.randomSplit([0.7, 0.3], seed = 1)

print trainingData.count()
print testData.count()


421
148

In [12]:
# Train a LogisticRegression model.
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)
# Train model
lr_model = lr.fit(trainingData)
# make predictions
predictions = lr_model.transform(testData)
# select example rows to display
predictions.show(5)
predictions.printSchema()


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  1.0|[20.57,17.77,132....|[-2.0316470979350...|[0.11592001643133...|       1.0|
|  1.0|[20.29,14.34,135....|[-2.5181959189331...|[0.07459238238578...|       1.0|
|  1.0|[18.25,19.98,119....|[-2.3243425285033...|[0.08912688901575...|       1.0|
|  1.0|[16.02,23.24,102....|[0.08587805337901...|[0.52145632819203...|       0.0|
|  1.0|[15.78,17.89,103....|[-1.9185235212382...|[0.12802630336040...|       1.0|
+-----+--------------------+--------------------+--------------------+----------+

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)


In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

#Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(str(evaluator.getMetricName()) + " : " + str(evaluator.evaluate(predictions)))


areaUnderROC : 0.999057848125

In [14]:
print(lr.explainParams())


elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
featuresCol: features column name (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name (default: label, current: label)
maxIter: max number of iterations (>= 0) (default: 100, current: 10)
predictionCol: prediction column name (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)
regParam: regularization parameter (>= 0) (default: 0.1)
threshold: threshold in binary classification prediction, in range [0, 1]. (default: 0.5)
tol: the convergence tolerance for iterative algorithms (default: 1e-06)

In [15]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
                .addGrid(lr.regParam, [0.01, 0.5, 2.0])
                .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                .addGrid(lr.maxIter, [1, 10, 100])
                .build())

In [ ]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cv
cvModel = cv.fit(trainingData)

In [ ]:
predictions2 = cvModel.transform(testData)
predictions2.show(5)
predictions2.printSchema()

In [ ]:
print(str(evaluator.getMetricName()) + " : " + str(evaluator.evaluate(predictions)))

In [ ]: