This notebook contains basic materials and examples/exercises on using pyspark for machine learning via Spark's MLlib (Spark version 1.4.1). MLLib supports the use of Spark dataframe for building the machine learning pipeline.
https://spark.apache.org/docs/1.4.1/mllib-guide.html
Spark as a fast cluster computing platform provides scalability, fault tolerance, and seamless integration with existing big data pipelines. Spark can scale up to hundreds of machines and distribute the computation compare to other machine learning tools such as R, Matlab and Scipy which run on a single machine.
Here's a comparison by Databricks (which is founded by the creators of Spark), the running times between R vs MLLib for Pearson’s correlation on a 32-node cluster
https://databricks.com/blog/2014/08/27/statistics-functionality-in-spark.html
Below shows the performance figure for Pyspark Dataframe, which seems to have comparable performance with Scala dataframe
And also python has plenty of machine learning libaries including scikit-learn
https://wiki.python.org/moin/PythonForArtificialIntelligence
It has has visualization libraries such as matplotlib, ggplot, seaborn etc...
For first time users of IPython notebook, the code cells below can be run directly from this note book either by pressing the "play" icon on the top , or by hitting CTRL+ Enter key. The profile setup for this IPyThon notebook allows PySpark API to be called directly from the code cells below. On top of that, the Anaconda Python distribution has also been installed which include common libraries such as numpy, scikit-learn, scipy, pandas etc.
This exercise will go through the building of a machine learning pipeline with MLlib for classification purpose. The exercise includes:
The following exercises on PySpark will be applied to a classifcation problem on a data set obtained from UCI Machine Learning Repository.
http://archive.ics.uci.edu/ml/datasets/Bank+Marketing#
This data has already been loaded into your VM and its available under the data/ folder which you can view from Jupyter.
The data is related with direct marketing campaigns (phone calls) of a Portuguese banking institution. The classification goal is to predict if the client will subscribe a term deposit (variable y). This is basically a binary classifcation, where the predicted classes is either yes or no. Some of the headings has been renamed for clarity purpose.
Position | Heading | Description | Values |
---|---|---|---|
1 | age (numeric) | ||
2 | job | 'admin.','blue-collar','entrepreneur','housemaid', 'management','retired','self-employed','services','student','technician','unemployed','unknown' | |
3 | marital | 'divorced','married','single','unknown' | |
4 | education | 'basic.4y','basic.6y','basic.9y','high.school', 'illiterate','professional.course','university.degree','unknown' | |
5 | default | has credit in default? | 'no','yes','unknown' |
6 | housing | has housing loan? | 'no','yes','unknown' |
7 | loan | has personal loan? | 'no','yes','unknown' |
8 | contact | contact communication type | 'cellular','telephone' |
9 | month | last contact month of year | 'jan', 'feb', 'mar', ..., 'nov', 'dec' |
10 | day_of_week | last contact day of the week | 'mon','tue','wed','thu','fri' |
11 | duration | last contact duration, in seconds (numeric). Important note: this attribute highly affects the output target (e.g., if duration=0 then y='no'). Yet, the duration is not known before a call is performed. Also, after the end of the call y is obviously known. Thus, this input should only be included for benchmark purposes and should be discarded if the intention is to have a realistic predictive model. | |
12 | campaign | number of contacts performed during this campaign and for this client (numeric, includes last contact) | |
13 | pdays | number of days that passed by after the client was last contacted from a previous campaign (numeric; 999 means client was not previously contacted) | |
14 | previous | number of contacts performed before this campaign and for this client (numeric) | |
15 | outcome_of_prev | outcome of the previous marketing campaign | 'failure','nonexistent','success' |
16 | emp_variation_rate | employment variation rate - quarterly indicator (numeric) | |
17 | consumer_price_idx | consumer price index - monthly indicator (numeric) | |
18 | consumer_conf_idx | consumer confidence index - monthly indicator (numeric) | |
19 | euribor3m | euribor 3 month rate - daily indicator (numeric) | |
20 | num_of_employees | number of employees - quarterly indicator (numeric) | |
21 | has_subscribed | has the client subscribed a term deposit? Note: This is not an attribute, rather the label of the observations | 'yes','no' |
SparkSql core supports various datasource such as Parquet, Json , Avro etc. However for CSV, it requires to use additional Spark Package. Currently theres the following options of spark-csv by Databricks guys, and pyspark_csv.
https://github.com/seahboonsiew/pyspark-csv
https://github.com/databricks/spark-csv
Pyspark_csv was selected due to its ability in inferring the schema, as compared to spark-csv which defaults all columns to string types. :(
In [1]:
import os.path
import pyspark_csv as pycsv
#import the spark package for importing csv
sc.addPyFile('pyspark_csv.py')
#read in the csv data file
fileName = os.path.join('data', 'bank-additional-full-data.csv')
plainTextRdd = sc.textFile(fileName)
rawDataFrame = pycsv.csvToDataFrame(sqlContext, plainTextRdd, parseDate=False)
#Have a look at the schema of the data frame created
rawDataFrame.printSchema()
Preprocess the data if required.
As noted above, the "duration" column should not be used as part of the features as it wont be known till phone calls. So next step we going to remove the duration column.
There's no missing values in the datasets. If replacement of missing values is required, we could use the Dataframe.fillna function (similar to pandas).
In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import UserDefinedFunction
# Number of records
print "Number of observations originally: %s" % rawDataFrame.count()
print "Number of columns originally: %s" % len(rawDataFrame.columns)
#Remove duration column as its only for benchmark purpose. We wont know duration till after we know the outcome of
#whether the customer signed up for the term deposit.
filteredDf = rawDataFrame.drop("duration")
print "Number of filtered observations: %s" % filteredDf.count()
print "Number of columns after drop: %s" % len(filteredDf.columns)
#view some summary of the columns
filteredDf.describe("age").show()
filteredDf.describe("pdays").show()
filteredDf.describe("emp_variation_rate").show()
The label column currently is in string format. But the MLlib classifiers such as Logistic regression and decision trees expect the Dataframe to contain the following structures for training:
- "features" : single vector column representing the feature vectors for the observation
- "label": double type, representing the label
In [3]:
#Add the label column , that basically corresponds to the has_subscribed column
toDouble = UserDefinedFunction(lambda x: 1. if x == "yes" else 0., DoubleType())
filteredDf = filteredDf.withColumn("label", toDouble(filteredDf["has_subscribed"]))
filteredDf.show(3)
In [4]:
import matplotlib.pyplot as plt
#Show number of customers that have signed up term deposit vs those that did not
numOfSignUps = filteredDf.groupBy("has_subscribed").count()
numOfSignUps.show()
#plot labels
labels = numOfSignUps.map(lambda row: row.has_subscribed).collect()
sizes = numOfSignUps.map(lambda row: row.count).collect()
colors = ['pink', 'yellowgreen']
plt.pie(sizes, labels=labels, colors=colors, shadow=True,autopct='%1.1f%%')
# Set aspect ratio to be equal so that pie is drawn as a circle.
plt.axis('equal')
plt.title('Customers which has subscribed to term deposit')
plt.show()
In [5]:
# Register this DataFrame as a table.
filteredDf.registerTempTable("campaign")
#run sql queries
queryResult = sqlContext.sql("SELECT age, job, marital FROM campaign WHERE has_subscribed = 'yes'")
queryResult.show()
Training the model and testing it on the same data could be a problem: a model that would just repeat the labels of the observations that it has seen would have a perfect score but would fail to predict anything useful on newly-unseen data. This situation is called overfitting.
To avoid overfitting, it is common practice when training a (supervised) machine learning model to split the available data into training, test and validation sets.
We will create 3 sets of data:
Our goal is to create model that can generalized well to the dataset and avoid overfitting.
In [6]:
# split into training(60%), validation(20%) and test(20%) datasets
trainingRdd, validationRdd, testRdd = filteredDf.rdd.randomSplit([6, 2, 2], seed=0L)
trainingDf = trainingRdd.toDF()
validationDf = validationRdd.toDF()
testDf = testRdd.toDF()
print trainingDf.take(1)
#lets cache these datasets
trainingDf.cache()
validationDf.cache()
testDf.cache()
print "Num of training observations : %s" % trainingDf.count()
print "Num of validation observations : %s" % validationRdd.count()
print "Num of test observations : %s" % testDf.count()
For the categorical attributes, we need to convert those text-based categories into numeric features before attempting to train/build a classification model with these data.
For example, the attribute of "marital", is a categorical feature with 4 possible values of 'divorced','married','single','unknown'.
We will be using the out-of-the-box MLlib featurization technique named one hot encoding to transform such categorical features into a feature vectors consist of binary 0s and 1s. Refer to the MLLib documentation for more information about one hot encoding https://spark.apache.org/docs/latest/api/python/pyspark.ml.html
VectorAssembler is used to assemble the feature vectors.
If the variables in the feature vectors has too huge of scale difference, you might like to normalize it with feature scaling. MLlib offers standardScaler aspart of the pyspark.mllib. Other dimensionality reduction techniques available include PCA and SVD.
In [7]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
#convert the categorical attributes to binary features
categoricalAttributes = ['job', 'marital', 'education', 'default',
'housing', 'loan', 'contact',
'month', 'day_of_week', 'outcome_of_prev']
#Build a list of pipelist stages for the machine learning pipeline.
#start by the feature transformer of one hot encoder for building the categorical features
pipelineStages = []
for columnName in categoricalAttributes:
stringIndexer = StringIndexer(inputCol=columnName, outputCol=columnName+ "Index")
pipelineStages.append(stringIndexer)
oneHotEncoder = OneHotEncoder(inputCol=columnName+ "Index", outputCol=columnName + "Vec")
pipelineStages.append(oneHotEncoder)
print "%s string indexer and one hot encoders transformers" % len(pipelineStages)
assert len(pipelineStages) == len(categoricalAttributes)*2
# Combine all the feature columns into a single column in the dataframe
numericColumns = ['age', 'campaign', 'pdays', 'previous',
'emp_variation_rate', 'consumer_price_idx', 'consumer_conf_idx',
'euribor3m', 'num_employees']
categoricalCols = [s + "Vec" for s in categoricalAttributes]
allFeatureCols = numericColumns + categoricalCols
vectorAssembler = VectorAssembler(
inputCols=allFeatureCols,
outputCol="features")
pipelineStages.append(vectorAssembler)
print "%s feature columns: %s" % (len(allFeatureCols),allFeatureCols)
#Build pipeline for feature extraction
featurePipeline = Pipeline(stages=pipelineStages)
featureOnlyModel = featurePipeline.fit(trainingDf)
In [9]:
#create list of Dataframes with features
trainingFeaturesDf = featureOnlyModel.transform(trainingDf)
validationFeaturesDf = featureOnlyModel.transform(validationDf)
testFeaturesDf = featureOnlyModel.transform(testDf)
#peek
trainingFeaturesDf.select("features", "label").rdd.take(2)
Out[9]:
MLLib offers some clustering methods. This could be useful when you are dealing with unlabeled data, where its impossible to apply supervised learning algorithms.
List of supporting clustering techniques out of the box by Spark currently are:
Lets train a Gaussian mixture model and see how it performs with our current featureset
In [11]:
from pyspark.mllib.clustering import KMeans, KMeansModel, GaussianMixture
from numpy import array
from math import sqrt
# Extract the "features" from the training set into vector format
def extractVectorFeatures(featuresDf):
return featuresDf.select("features").rdd.map(lambda row: row.features)
#function to calculate the accuracy
def calculateAccuracy(labelsAndPredictionsRdd):
"""""
Calculate accuracy for a given label and prediction RDD
input:
labelsAndPredictionsRdd : RDD consisting of tuples (label, prediction)
return: float accuracy in percentage
"""""
return (labelsAndPredictionsRdd.filter(lambda (l, p): l == p).count()
/ float(labelsAndPredictionsRdd.count())* 100)
# Build the model (cluster the data)
trainingRdd = extractVectorFeatures(trainingFeaturesDf)
numOfClusters = 2
gmm = GaussianMixture.train(trainingRdd, numOfClusters)
def getGmmPredictedAccuracy(featuresDf, featuresRdd, model):
#run the prediction on the featuresRdd
predictionList = model.predict(featuresRdd).collect()
actualAndPred = (featuresDf.select("label").rdd
.map(lambda row: row.label)
.zipWithIndex()
.map(lambda (l, i): (l, predictionList[i])) )
#map the training features data frame to the predicted labels list by index
return calculateAccuracy(actualAndPred)
# Predict training set with GMM cluster model
trainGmmAccuracy = getGmmPredictedAccuracy(trainingFeaturesDf, trainingRdd, gmm)
#repeat with test set
testRdd = extractVectorFeatures(testFeaturesDf)
testGmmAccuracy = getGmmPredictedAccuracy(testFeaturesDf, testRdd, gmm)
#repeat with validation set
validationRdd = extractVectorFeatures(validationFeaturesDf)
validationGmmAccuracy = getGmmPredictedAccuracy(validationFeaturesDf, validationRdd, gmm)
print "=========================================="
print("GMM accuracy against unfiltered training set(%) = " + str(trainGmmAccuracy))
print("GMM accuracy against test set(%) = " + str(testGmmAccuracy))
print("GMM accuracy against validation set(%) = " + str(validationGmmAccuracy))
print "=========================================="
In the previous steps, we have chain together a list of feature encoders to encode our categorical features. Next we will proceed to use MLlib Pipeline api to build the ML pipeline. The advantage of the pipeline API is that it bundles and chains the transformers (feature encoders, feature selectors etc) and estimators (trained model) together and make it easier for reusability.
https://spark.apache.org/docs/latest/ml-guide.html#migration-guide
Pyspark API documentation on classification https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/classification.html
We will create a logistic regression model where the model makes predictions by applying the logistic function. The hyperparameters for a logistic regression model includes:
In [12]:
from pyspark.ml.classification import LogisticRegression
# Configure an machine learning pipeline, which consists of the
# an estimator (classification) (Logistic regression)
lr = LogisticRegression(maxIter=10, regParam=0.01)
lrPipeline = Pipeline(stages=[lr])
# Fit the pipeline to create a model from the training data
lrPipelineModel = lrPipeline.fit(trainingFeaturesDf)
def getAccuracyForPipelineModel(featuresDf, model):
#perform prediction using the featuresdf and pipelineModel
#compute the accuracy in percentage float
results = model.transform(featuresDf)
labelsAndPreds = results.map(lambda p: (p.label, p.prediction))
return (calculateAccuracy(labelsAndPreds), results)
# Evaluating the model on training data
lrTrainAccuracy, lrTrainResultDf = getAccuracyForPipelineModel(trainingFeaturesDf, lrPipelineModel)
# Repeat on test data
lrTestAccuracy, lrTestResultDf = getAccuracyForPipelineModel(testFeaturesDf, lrPipelineModel)
# Repeat on validation data
lrValidationAccuracy, lrValidationResultDf = getAccuracyForPipelineModel(validationFeaturesDf, lrPipelineModel)
print "=========================================="
print("LogisticRegression Model training accuracy (%) = " + str(lrTrainAccuracy))
print("LogisticRegression Model test accuracy (%) = " + str(lrTestAccuracy))
print("LogisticRegression Model validation accuracy (%) = " + str(lrValidationAccuracy))
print "=========================================="
In [14]:
#you can create a pipeline combining multiple pipelines
#(e.g feature extraction pipeline, and classification pipeline)
combinedPipeline = Pipeline(stages= [featurePipeline, lrPipeline])
In [13]:
# Run the prediction with our trained model on test data (which has not been used in training)
# Make predictions on test observations and print results.
selected = lrTestResultDf.select("has_subscribed", "label", "prediction")
#Show number of predicted results
predictedLabel = selected.groupBy("prediction").count()
predictedLabel.show()
selected.show(20)
One of the important task in machine learning is to use data to find the optimal parameters for our model to perform classification.
In the following section, we will try to train the Logistic Regression model with different values for regularization parameter (regParam) and number of maximum iterations. The accuracy of the model is validated with the validation dataset which we created earlier on.
The best parameters is selected based on this.
In [76]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import numpy as np
from math import log
maxIterRange = [5, 10, 30, 50, 100]
regParamRange = [1e-10, 1e-5, 1e-1]
#baseline values from previous section
bestIter = 10
bestRegParam = 0.01
bestModel = lr
bestAccuracy = lrValidationAccuracy
#for plotting purpose
iterations = []
regParams = []
accuracies = []
for maxIter in maxIterRange:
for rp in regParamRange:
currentLr = LogisticRegression(maxIter=maxIter, regParam=rp)
pipeline = Pipeline(stages=[currentLr])
model = pipeline.fit(trainingFeaturesDf)
#use validation dataset test for accuracy
accuracy, resultDf = getAccuracyForPipelineModel(validationFeaturesDf, model)
print "maxIter: %s, regParam: %s, accuracy: %s " % (maxIter, rp, accuracy)
accuracies.append(accuracy)
regParams.append(log(rp))
iterations.append(maxIter)
if accuracy > lrValidationAccuracy:
bestIter = maxIter
bestRegParam = rp
bestModel = model
bestAccuracy = accuracy
print "Best parameters: maxIter %s, regParam %s, accuracy : %s" % (bestIter, bestRegParam, bestAccuracy)
# Repeat on test data
gridTestAccuracy, gridTestResultDf = getAccuracyForPipelineModel(testFeaturesDf, bestModel)
print "=========================================="
print("Grid search Model test accuracy (%) = " + str(gridTestAccuracy))
print "=========================================="
In [81]:
#visualize the results
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
fig = plt.figure(figsize=(14, 14), facecolor='white', edgecolor='white')
ax = fig.add_subplot(111, projection='3d')
ax.scatter(iterations, regParams, accuracies,
s=18**2, c='r', marker='^')
ax.legend()
ax.set_xlabel('Max Iterations')
ax.set_ylabel(r'$\log_rp$ Regularization param')
ax.set_zlabel('Accuracies (%)')
plt.show()
In [17]:
# We use a ParamGridBuilder to construct a grid of parameters to search over.
grid = (ParamGridBuilder()
.addGrid(lr.maxIter, maxIterRange)
.addGrid(lr.regParam,regParamRange )
.build())
evaluator = BinaryClassificationEvaluator()
# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
crossValidator = CrossValidator(estimator=lrPipeline,
estimatorParamMaps=grid,
numFolds=5,
evaluator=evaluator)
# Run cross-validation, and choose the best model
bestCvModel = crossValidator.fit(trainingFeaturesDf)
# verify results on training dataset
cvTrainAccuracy, cvTrainResultDf = getAccuracyForPipelineModel(trainingFeaturesDf, bestCvModel)
# Repeat on test data
cvTestAccuracy, cvTestResultDf = getAccuracyForPipelineModel(testFeaturesDf, bestCvModel)
print "=========================================="
print("CV Model training accuracy (%) = " + str(cvTrainAccuracy))
print("CV Model test accuracy (%) = " + str(cvTestAccuracy))
print "=========================================="