In [3]:
# Load libraries
import pandas as pd
import matplotlib.pyplot as plt
import time
In [4]:
# Load dataset
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data"
dataset = pd.read_csv(url, header=None)
In [5]:
# create Spark Dataframe from Pandas Dataframe
cancer_df = sqlContext.createDataFrame(dataset, schema=None)
In [6]:
print 'After parsing, number of training lines: {}'.format(cancer_df.count())
cancer_df.printSchema()
In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
# Pipeline stages
stages = []
# Convert label into label indices using StringIndexer
label_stringIdx = StringIndexer(inputCol="1", outputCol = "label")
stages += [label_stringIdx]
In [8]:
# Transform all numerical features into a vector using VectorAssembler
# numeric cols 2:31
numeric_cols = ["{}".format(x) for x in range(2,32)]
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
stages += [assembler]
In [9]:
# Create a pipeline
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(cancer_df)
cancer_transformed_df = pipeline_model.transform(cancer_df)
# Keep relevant columns
selected_cols = ["label", "features"]
cancer_final_df = cancer_transformed_df.select(selected_cols)
cancer_final_df.printSchema()
cancer_final_df.show(5)
In [10]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
scalerModel = scaler.fit(cancer_final_df)
#normalize each feature to have unit stdev
scaled_cancer_final_df = scalerModel.transform(cancer_final_df)
scaled_cancer_final_df.printSchema()
scaled_cancer_final_df.show(5)
In [11]:
# Substitute fetures with newly created scaledFeatures
# (Spark Dataframe columns cannot be modified but a copy can be created and in next step a name can be changed)
scaled_cancer_final_df = scaled_cancer_final_df.selectExpr("label", "scaledFeatures as features")
scaled_cancer_final_df.printSchema()
scaled_cancer_final_df.show(5)
In [12]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = cancer_final_df.randomSplit([0.7, 0.3], seed = 1)
print trainingData.count()
print testData.count()
In [13]:
# Train a LogisticRegression model.
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)
# Train model
lr_model = lr.fit(trainingData)
# make predictions
predictions = lr_model.transform(testData)
# select example rows to display
predictions.show(5)
predictions.printSchema()
In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
#Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(str(evaluator.getMetricName()) + " : " + str(evaluator.evaluate(predictions)))
In [15]:
print(lr.explainParams())
In [28]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.01, 0.1, 0.5, 2.0])
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
.addGrid(lr.maxIter, [1, 10, 100])
.build())
In [29]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cv
cvModel = cv.fit(trainingData)
In [30]:
predictions2 = cvModel.transform(testData)
predictions2.show(5)
predictions2.printSchema()
In [31]:
print(str(evaluator.getMetricName()) + " : " + str(evaluator.evaluate(predictions)))
In [32]:
bestModel = cvModel.bestModel
print("Best Param <regParam>: {}".format(bestModel._java_obj.getRegParam()))
print("Best Param <elasticNetParam>: {}".format(bestModel._java_obj.getElasticNetParam()))
print("Best Param <maxIter>: {}".format(bestModel._java_obj.getMaxIter()))