Spark Machine Learning Pipeline

This coursework is about implementing and applying Spark Machine Learning Pipelines, and evaluating them with respect to preprocessing, parametrisation, and scaling.

1. Data set initial analysis and summary of pipeline task. (20%)

1.1. Summary of machine learning pipeline

Step 1.
Step 2.
Step 3.
Step 4.

1.2. Loading data to RDD


In [ ]:
# import dependencies for creating a data frame
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import csv


# Create SparkSession 
spark = SparkSession.builder.getOrCreate() 




# create RDD from csv files
trainRDD = spark.read.csv("hdfs://saltdean/data/data/santander-products/train_ver2.csv", 
                          header=True, mode="DROPMALFORMED", schema=schema)






# alternatively...
# create RDD from csv files
trainRDD = sc.textFile("hdfs://saltdean/data/data/santander-products/train_ver2.csv")
trainRDD = trainRDD.mapPartitions(lambda x: csv.reader(x))





# alternatively... from https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
# create RDD from csv files
lines = sc.textFile("hdfs://saltdean/data/data/santander-products/train_ver2.csv")
elements = lines.map(lambda l: l.split(","))

# Each line is converted to a tuple.
clients = elements.map(lambda p: (p[0], p[1].strip(),p[2],...))

# The schema is encoded in a string.
schemaString = "name age ..."
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD and register the DataFrame to be used with Spark SQL.
trainRDD = spark.createDataFrame(clients, schema)
trainRDD.createOrReplaceTempView('trainingset')






# alternatively, as seen in tutorial 8:
lines = sc.textFile("hdfs://saltdean/data/data/santander-products/train_ver2.csv")
parts = lines.map(lambda l: l.split(","))
trainRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))

# Create DataFrame and register it to be used with Spark SQL.
trainData = spark.createDataFrame(trainRDD)
trainData.createOrReplaceTempView('Clients')

# For testing
print(trainData.describe()) # columns info
print(trainData.count()) # number of instances

1.3. Descriptive Statistics


In [ ]:
# Code modified from 
# https://www.kaggle.com/apryor6/santander-product-recommendation/detailed-cleaning-visualization-python/notebook

# import dependencies
import numpy as np
import pandas as pd

# create dataframe 'df'
limit_rows = 7000000
df = pd.read_csv("hdfs://saltdean/data/data/santander-products/train_ver2.csv",
                           dtype={"sexo":str, "ult_fec_cli_1t":str, "indext":str}, nrows=limit_rows)

unique_ids = pd.Series(df["ncodpers"].unique())
limit_people = 1.2e4
unique_id = unique_ids.sample(n=limit_people)
df = df[df.ncodpers.isin(unique_id)]
df.count()     # number of instances
df.describe()

1.4. Data Cleaning


In [ ]:
# find missing values
df.isnull().any()

In [ ]:
# Remove age outliers and nan from dataframe
df.loc[df.age < 18,"age"]  = df.loc[(df.age >= 18) & (df.age <= 30),"age"].mean(skipna=True) # replace outlier con mean
df.loc[df.age > 100,"age"] = df.loc[(df.age >= 30) & (df.age <= 100),"age"].mean(skipna=True) # replace outlier con mean
df["age"].fillna(df["age"].mean(),inplace=True) # replace nan with mean
df["age"] = df["age"].astype(int)

# Replace missing values
df.loc[df["ind_nuevo"].isnull(),"ind_nuevo"] = 1                   # new customers id '1'
df.loc[df.antiguedad.isnull(),"antiguedad"] = df.antiguedad.min()
df.loc[df.antiguedad <0, "antiguedad"] = 0                         # new customer antiguedad '0'
df.loc[df.indrel.isnull(),"indrel"] = 1 

dates=df.loc[:,"fecha_alta"].sort_values().reset_index()
median_date = int(np.median(dates.index.values))
df.loc[df.fecha_alta.isnull(),"fecha_alta"] = dates.loc[median_date,"fecha_alta"] # fill join date missing values

df.loc[df.ind_actividad_cliente.isnull(),"ind_actividad_cliente"] = \
df["ind_actividad_cliente"].median()                   # fill in customer activity missing

df.loc[df.nomprov.isnull(),"nomprov"] = "UNKNOWN"      # known values for city of residence

df.loc[df.indfall.isnull(),"indfall"] = "N"            # missing deceased index set to N
df.loc[df.tiprel_1mes.isnull(),"tiprel_1mes"] = "A"    # customer status, if missing = active 
df.tiprel_1mes = df.tiprel_1mes.astype("category")     # customer status as categorical

# Customer type normalization as categorical variable 
map_dict = { 1.0:"1", "1.0":"1", "1":"1", "3.0":"3", "P":"P", 3.0:"3", 2.0:"2", "3":"3", "2.0":"2", "4.0":"4", "4":"4", "2":"2"}
df.indrel_1mes.fillna("P",inplace=True)
df.indrel_1mes = df.indrel_1mes.apply(lambda x: map_dict.get(x,x))
df.indrel_1mes = df.indrel_1mes.astype("category")

# Replace missing values in target features with 0
# target features = boolean indicator as to whether or not that product was owned that month
df.loc[df.ind_nomina_ult1.isnull(), "ind_nomina_ult1"] = 0
df.loc[df.ind_nom_pens_ult1.isnull(), "ind_nom_pens_ult1"] = 0

# Elimnate entries with nan values in given variable, eg:
print("Total number of entries before removing nan= ", df.count())
df.renta.isnull().sum()
df.na.drop(subset=["renta","indfall","tiprel_1mes","indrel_1mes"]) # !!!! need to be tested that only nan entries are removed
df.renta.isnull().sum()
print("Total number of entries after removing nan= ", df.count())

In [ ]:
# Eliminate redundant variables
df.drop(["tipodom","cod_prov"],axis=1,inplace=True)

# check all missing values are gone
df.isnull().any()

In [ ]:
# Convert target features column into integers
feature_cols = df.iloc[:1,].filter(regex="ind_+.*ult.*").columns.values
for col in feature_cols:
    df[col] = df[col].astype(int)

2. Implementation of machine learning pipeline. (25%)

Implement a machine learning pipeline in Spark, including feature extractors, transformers, and/or selectors. Test that your pipeline it is correctly implemented and explain your choice of processing steps, learning algorithms, and parameter settings.


In [ ]:
# code modified from Spark documentation at:
# https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#random-forest-classifier
# and DataBricks at:
# https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html

# imports dependencies for Random Forest pipeline
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, OneHotEncoder, StringIndexer, VectorAssembler


# stages in the Pipeline
stages = []

# One-Hot Encoding
categoricalColumns = ["a", "b", "c", "d", "e", "f", "g", "j"]
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index") # Category Indexing with StringIndexer
    encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec") # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    stages += [stringIndexer, encoder]  # Add stages to the pipeline
    
# Convert labels into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "add here target column in csv file", outputCol = "labels")
stages += [label_stringIdx]  # Add stage to the pipeline

# Transform all features into a vector using VectorAssembler
numericCols = ["m", "n", "o", "p", "q", "r"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]  # Add stage to the pipeline

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="labels", 
                            featuresCol="features", 
                            numTrees=100,                 #  Number of trees in the random forest
                            impurity='entropy',            # Criterion used for information gain calculation
                            featureSubsetStrategy="auto",
                            predictionCol="prediction"
                            maxDepth=5, 
                            maxBins=32, 
                            minInstancesPerNode=1, 
                            minInfoGain=0.0, 
                            subsamplingRate=1.0)
stages += [rf]  # Add stage to the pipeline

# Machine Learning Pipeline
pipeline = Pipeline(stages=stages)

3. Evaluation and test of model. (20%)

Evaluate the performance of your pipeline using training and test set (don’t use CV but pyspark.ml.tuning.TrainValidationSplit).

3.1. Evaluate performance of machine learning pipeline on training data and test data.


In [ ]:
# imports dependencies
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Split data into training set and testing set
[trainData, testData] = trainData.randomSplit([0.8, 0.2], seed = 100)

# Train model in pipeline
rfModel = pipeline.fit(trainData)

# Make predictions for training set and compute training set accuracy
predictions = rfModel.transform(trainData)
evaluator = MulticlassClassificationEvaluator(labelCol="labels", 
                                              predictionCol="prediction", 
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print(train_pipeline.stages[0])  # summary


# Run the feature transformations pipeline on the test data set
pipelineModel = prePro_pipeline.fit(testClients)  #  computes feature statistics
testData = pipelineModel.transform(testClients)  #  transforms the features

# Make predictions for test set and compute test error
test_predictions = rfModel.transform(testData)
test_accuracy = evaluator.evaluate(test_predictions)
print("Test Error = %g" % (1.0 - test_accuracy))

4. Model fine-tuning. (35%)

Implement a parameter grid (using pyspark.ml.tuning.ParamGridBuilder[source]), varying at least one feature preprocessing step, one machine learning parameter, and the training set size. Document the training and test performance and the time taken for training and testing. Comment on your findings.

4.1. Training set size evaluation


In [ ]:
print('Training set size evaluation')

# size of different training set to be evaluated, and split of training set
sizes = [0.5, 0.1, 0.05, 0.01, 0.001]
data = trainData.randomSplit(sizes, seed = 100)

print('\n=== training set of size 100%')
# Train model in pipeline
tempModel = pipeline.fit(trainData)
# Make predictions for training set and compute training set accuracy
tempPredictions = tempModel.transform(trainData)
tempAccuracy = evaluator.evaluate(tempPredictions)
print("Classification Error = %g" % (1.0 - tempAccuracy))

for x in data:
    print('\n=== training set of size reduced to %g' % x)
    # Train model in pipeline
    tempModel = pipeline.fit(data[x])
    # Make predictions for training set and compute training set accuracy
    tempPredictions = tempModel.transform(data[x])
    tempAccuracy = evaluator.evaluate(tempPredictions)
    print("Classification Error = %g" % (1.0 - tempAccuracy))

In [ ]:
# Define hyperparameters and their values to search and evaluate
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10,20,50,100,200,500,1000,5000]) \
    .addGrid(rf.minInstancesPerNode, [0,1,2,4,6,8,10]) \
    .addGrid(rf.maxDepth, [2,5,10,20,50]).build()

# Grid Search and Cross Validation
crossVal = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator)
print('starting Hyperparameter Grid Search with cross-validation')
rfCrosVal = crossVal.fit(trainData)
print('Grid Search has finished')

print(rfCrosVal.bestModel.rank)
paramMap = list(zip(rfCrosVal.getEstimatorParamMaps(),rfCrosVal.avgMetrics))
paramMax = max(paramMap, key=lambda x: x[1])
print(paramMax)

# Evaluate the model with test data
cvtest_predictions = rfCrosVal.transform(testData)
cvtest_accuracy = evaluator.evaluate(cvtest_predictions)
print("Test Error = %g" % (1.0 - cvtest_accuracy))

4.3. Evaluate model performance using a subset of variables (predictors)


In [ ]: