The dataset used is called "Santander Products", part of the Kaggle competition with the same name. The reason of this choice has been the large amount of predictors and responses available within this data. This amount of possible variable combinations increases the complexity of the problem and therefore the selected predictor model should have a deep level of abstraction. Furthermore, this example will appropriately showcase the advantages of using Spark parallel computing for the analysis of large datasets.
The chosen model, "Random Forest" (RF), is able to accommodate large numbers of features of diverse type. This method is currently state-of-the-art in many different Machine Learning fields, like computer vision. RF is expected to reach a good performance in both, classification and regression implementations.
The goal of the pipeline is to predict whether a financial product (a mortgage) will be purchased by a consumer, given the personal and financial data available for each customer.
In [17]:
# load dependencies
import numpy as np
import pandas as pd
type_dict = {'ncodpers':np.int32,
'ind_ahor_fin_ult1':np.uint8, 'ind_aval_fin_ult1':np.uint8,
'ind_cco_fin_ult1':np.uint8,'ind_cder_fin_ult1':np.uint8,
'ind_cno_fin_ult1':np.uint8,'ind_ctju_fin_ult1':np.uint8,'ind_ctma_fin_ult1':np.uint8,
'ind_ctop_fin_ult1':np.uint8,'ind_ctpp_fin_ult1':np.uint8,'ind_deco_fin_ult1':np.uint8,
'ind_deme_fin_ult1':np.uint8,'ind_dela_fin_ult1':np.uint8,'ind_ecue_fin_ult1':np.uint8,
'ind_fond_fin_ult1':np.uint8,'ind_hip_fin_ult1':np.uint8,'ind_plan_fin_ult1':np.uint8,
'ind_pres_fin_ult1':np.uint8,'ind_reca_fin_ult1':np.uint8,'ind_tjcr_fin_ult1':np.uint8,
'ind_valo_fin_ult1':np.uint8,'ind_viv_fin_ult1':np.uint8, 'ind_recibo_ult1':np.uint8 }
# load data from server into dataframe (only loading the top 1,000,000 for demonstration purpose)
df = pd.read_csv("/data/tempstore/santander-products/train_ver2.csv",
nrows = 1000000,
dtype = type_dict)
In [18]:
df.describe()
Out[18]:
In [19]:
# keep only unique id
unique_ids = pd.Series(df["ncodpers"].unique())
df = df[df.ncodpers.isin(unique_ids)]
df.count() # number of instances
Out[19]:
In [20]:
# eliminate mostly empty columns and redundant variables
df.drop(["tipodom","cod_prov", "ult_fec_cli_1t","conyuemp"],axis=1,inplace=True)
In [21]:
# transform to numeric and set missing values to nan
df['age']=pd.to_numeric(df.age, errors='coerce')
df['ind_nuevo']=pd.to_numeric(df.ind_nuevo, errors='coerce')
df['antiguedad']=pd.to_numeric(df.antiguedad, errors='coerce')
df['indrel']=pd.to_numeric(df.indrel, errors='coerce')
df['renta']=pd.to_numeric(df.renta, errors='coerce')
df['indrel_1mes']=pd.to_numeric(df.indrel_1mes, errors='coerce')
In [22]:
# Remove age outliers and nan from age variable
df.loc[df.age < 18,"age"] = df.loc[(df.age >= 18) & (df.age <= 30),"age"].mean(skipna=True) # replace outlier con mean
df.loc[df.age > 100,"age"] = df.loc[(df.age >= 30) & (df.age <= 100),"age"].mean(skipna=True) # replace outlier con mean
df["age"].fillna(df["age"].mean(),inplace=True) # replace nan with mean
df["age"] = df["age"].astype(int)
In [23]:
# transfor dates to datetime datatype
df["fecha_dato"] = pd.to_datetime(df["fecha_dato"],format="%Y-%m-%d")
df["fecha_alta"] = pd.to_datetime(df["fecha_alta"],format="%Y-%m-%d")
df["fecha_dato"].unique()
Out[23]:
In [24]:
# fill datetime missing values
dates=df.loc[:,"fecha_alta"].sort_values().reset_index()
median_date = int(np.median(dates.index.values))
df.loc[df.fecha_alta.isnull(),"fecha_alta"] = dates.loc[median_date,"fecha_alta"]
In [25]:
# check all missing values
df.isnull().any()
Out[25]:
In [26]:
# Replace missing values in target features with 0
# target features = boolean indicator as to whether or not that product was owned that month
df.loc[df.ind_nomina_ult1.isnull(), "ind_nomina_ult1"] = 0
df.loc[df.ind_nom_pens_ult1.isnull(), "ind_nom_pens_ult1"] = 0
In [27]:
# Replace other missing values
df.loc[df["ind_nuevo"].isnull(),"ind_nuevo"] = 1 # new customers id '1'
df.loc[df.antiguedad.isnull(),"antiguedad"] = df.antiguedad.min()
df.loc[df.antiguedad <0, "antiguedad"] = 0 # new customer antiguedad '0'
df.loc[df.indrel.isnull(),"indrel"] = 1
df.loc[df.ind_actividad_cliente.isnull(),"ind_actividad_cliente"] = \
df["ind_actividad_cliente"].median() # fill in customer activity missing
df.loc[df.nomprov.isnull(),"nomprov"] = "UNKNOWN" # known values for city of residence
df.loc[df.indfall.isnull(),"indfall"] = "N" # missing deceased index set to N
df.loc[df.tiprel_1mes.isnull(),"tiprel_1mes"] = "A" # customer status, if missing = active
df.tiprel_1mes = df.tiprel_1mes.astype("category") # customer status as categorical
In [28]:
# Customer type normalization as categorical variable
map_dict = { 1.0:"1", "1.0":"1", "1":"1", "3.0":"3", "P":"P", 3.0:"3", 2.0:"2", "3":"3", "2.0":"2", "4.0":"4", "4":"4", "2":"2"}
df.indrel_1mes.fillna("P",inplace=True)
df.indrel_1mes = df.indrel_1mes.apply(lambda x: map_dict.get(x,x))
df.indrel_1mes = df.indrel_1mes.astype("category")
In [29]:
# remove rows with any nan value left
df = df.dropna(subset=['renta', 'segmento', 'canal_entrada', 'ind_empleado',
'pais_residencia', 'indresi', 'indresi', 'sexo'], how='any')
In [30]:
# check all missing values are gone
df.isnull().any()
Out[30]:
In [31]:
df.count() # number of instances
Out[31]:
In [32]:
# remove any previous spark session and check df file type
spark.stop()
type(df)
Out[32]:
In [33]:
# Create Spark SQL dataframe
## IMPORTANT!! - this cell usually takes time due to data volume!!!
## IMPORTANT!! - Only run this cell once! (to run it again, you need to restart the kernel)
from pyspark.sql import SQLContext
sc = SparkContext()
sqlCtx = SQLContext(sc) #print(sc)
df_spark = sqlCtx.createDataFrame(df)
type(df_spark)
Out[33]:
In [34]:
# define datatypes in dataframe
df_spark = df_spark.select(df_spark.fecha_dato.cast("date"),
df_spark.ncodpers.cast("float"),
df_spark.ind_empleado.cast("string"),
df_spark.pais_residencia.cast("string"),
df_spark.sexo.cast("string"),
df_spark.age.cast("float"),
df_spark.fecha_alta.cast("date"),
df_spark.ind_nuevo.cast("float"),
df_spark.antiguedad.cast("float"),
df_spark.indrel.cast("float"),
df_spark.indrel_1mes.cast("float"),
df_spark.tiprel_1mes.cast("string"),
df_spark.indresi.cast("string"),
df_spark.indext.cast("string"),
df_spark.canal_entrada.cast("string"),
df_spark.indfall.cast("string"),
df_spark.nomprov.cast("string"),
df_spark.ind_actividad_cliente.cast("float"),
df_spark.renta.cast("float"),
df_spark.segmento.cast("string"),
df_spark.ind_ahor_fin_ult1.cast("float"),
df_spark.ind_aval_fin_ult1.cast("float"),
df_spark.ind_cco_fin_ult1.cast("float"),
df_spark.ind_cder_fin_ult1.cast("float"),
df_spark.ind_cno_fin_ult1.cast("float"),
df_spark.ind_ctju_fin_ult1.cast("float"),
df_spark.ind_ctma_fin_ult1.cast("float"),
df_spark.ind_ctop_fin_ult1.cast("float"),
df_spark.ind_ctpp_fin_ult1.cast("float"),
df_spark.ind_deco_fin_ult1.cast("float"),
df_spark.ind_deme_fin_ult1.cast("float"),
df_spark.ind_dela_fin_ult1.cast("float"),
df_spark.ind_ecue_fin_ult1.cast("float"),
df_spark.ind_fond_fin_ult1.cast("float"),
df_spark.ind_hip_fin_ult1.cast("float"),
df_spark.ind_plan_fin_ult1.cast("float"),
df_spark.ind_pres_fin_ult1.cast("float"),
df_spark.ind_reca_fin_ult1.cast("float"),
df_spark.ind_tjcr_fin_ult1.cast("float"),
df_spark.ind_valo_fin_ult1.cast("float"),
df_spark.ind_viv_fin_ult1.cast("float"),
df_spark.ind_nomina_ult1.cast("float"),
df_spark.ind_nom_pens_ult1.cast("float"),
df_spark.ind_recibo_ult1.cast("float"))
In [35]:
df_spark.printSchema()
In [36]:
# code modified from Spark documentation at:
# https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#random-forest-classifier
# and DataBricks at:
# https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html
# imports dependencies for Random Forest pipeline
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, OneHotEncoder, StringIndexer, VectorAssembler
# IMPORTANT - Define target label (for prediction) from target features. Target select = mortgage products
labels = "ind_hip_fin_ult1"
# stages in the Pipeline
stages = []
# define variables; categorical, countinuous and target features
numericCols = ["age","antiguedad","renta"]
categoricalColumns = ["ind_empleado","pais_residencia","sexo","ind_nuevo","indrel",
"indrel_1mes","tiprel_1mes", "indresi", "indext", "canal_entrada","nomprov",
"ind_actividad_cliente","segmento"]
targetsColumns = ["ind_ahor_fin_ult1", "ind_aval_fin_ult1",
"ind_cco_fin_ult1", "ind_cder_fin_ult1", "ind_cno_fin_ult1",
"ind_ctma_fin_ult1", "ind_ctop_fin_ult1",
"ind_ctpp_fin_ult1", "ind_deco_fin_ult1", "ind_deme_fin_ult1",
"ind_dela_fin_ult1", "ind_ecue_fin_ult1", "ind_fond_fin_ult1",
"ind_ctju_fin_ult1", "ind_plan_fin_ult1", "ind_pres_fin_ult1",
"ind_reca_fin_ult1", "ind_tjcr_fin_ult1", "ind_valo_fin_ult1",
"ind_viv_fin_ult1", "ind_nomina_ult1", "ind_nom_pens_ult1","ind_recibo_ult1"]
In [37]:
# Use OneHotEncoder to convert categorical variables into binary SparseVectors
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol,
outputCol = categoricalCol + "Index") # Category Indexing with StringIndexer
stages += [stringIndexer] # Add stages to the pipeline
In [38]:
# define categorical index columns
categoricalColumnsIDX = ["ind_empleadoIndex","pais_residenciaIndex","sexoIndex",
"ind_nuevoIndex","indrelIndex","indrel_1mesIndex",
"tiprel_1mesIndex","indresiIndex","indextIndex",
"canal_entradaIndex","nomprovIndex","ind_actividad_clienteIndex","segmentoIndex"]
In [39]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = labels,
outputCol = "label")
stages += [label_stringIdx]
In [40]:
# Transform all features into a vector using VectorAssembler
assemblerInputs = categoricalColumnsIDX + numericCols + targetsColumns
assembler = VectorAssembler(inputCols = assemblerInputs,
outputCol = "features")
stages += [assembler] # Add stage to the pipeline
In [41]:
prePipeline = Pipeline(stages = stages)
pipelineModel = prePipeline.fit(df_spark)
dataset = pipelineModel.transform(df_spark)
In [42]:
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol = "label",
featuresCol = "features",
numTrees = 100, # Number of trees in the random forest
impurity = 'entropy', # Criterion used for information gain calculation
featureSubsetStrategy = "auto",
predictionCol = "prediction",
maxDepth = 5,
maxBins = 160,
minInstancesPerNode = 2)
#minInfoGain=0.0,
#subsamplingRate=1.0)
In [43]:
# imports dependencies
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
In [44]:
# Split data into training set and testing set
[trainData, testData] = dataset.randomSplit([0.8, 0.2], seed = 100)
In [45]:
# evaluation of model performance
evaluator = MulticlassClassificationEvaluator(labelCol = "label",
predictionCol = "prediction",
metricName = "accuracy")
# random forest parameters
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [100]).build()
# cross-validation of model performance during grid-search
# Method: pyspark.ml.tuning.TrainValidationSplit
crossval = TrainValidationSplit(estimator = rf,
estimatorParamMaps = paramGrid,
evaluator = evaluator,
trainRatio = 0.9)
# Run cross-validation, and choose the best set of parameters.
print('starting cross-validation')
cvModel = crossval.fit(trainData) # This takes time!
print('finished cross-validation')
In [46]:
# Make predictions for test set and compute test error
predictions = cvModel.transform(trainData)
train_accuracy = evaluator.evaluate(predictions)
print("Training Accuracy = %g" % (train_accuracy))
print("Training Error = %g" % (1.0 - train_accuracy))
In [47]:
# Make predictions for test set and compute test error
predictions = cvModel.transform(testData)
test_accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g" % (test_accuracy))
print("Test Error = %g" % (1.0 - test_accuracy))
Implement a parameter grid (using pyspark.ml.tuning.ParamGridBuilder[source]), varying at least one feature preprocessing step, one machine learning parameter, and the training set size. Document the training and test performance and the time taken for training and testing. Comment on your findings.
In [48]:
# New preprocessing stage, without numeric predictors
new_stages = []
# remove preprocessing numeric predictors by including an empty vector
New_numericCols = [] # empty numeric predictors
# Add Newstages to the pipeline
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol,
outputCol = categoricalCol + "Index")
new_stages += [stringIndexer] # Add stages to the pipeline
new_stages += [label_stringIdx]
# empty vector is inserted here
new_assemblerInputs = categoricalColumnsIDX + New_numericCols + targetsColumns
new_assembler = VectorAssembler(inputCols = new_assemblerInputs, outputCol = "features")
new_stages += [new_assembler]
In [49]:
# Creating new pipeline
from pyspark.ml import Pipeline
new_prePipeline = Pipeline(stages = new_stages)
new_pipelineModel = new_prePipeline.fit(df_spark)
new_dataset = new_pipelineModel.transform(df_spark)
In [50]:
[new_trainData, new_testData] = dataset.randomSplit([0.8, 0.2], seed = 100)
new_cvModel = crossval.fit(new_trainData) # This takes time!
# Results:
print('Numerical predictors not used for training')
new_predictions = cvModel.transform(new_trainData)
new_train_accuracy = evaluator.evaluate(new_predictions)
print("New Training Accuracy = %g" % (new_train_accuracy))
print("New Training Error = %g" % (1.0 - new_train_accuracy))
new_test_predictions = cvModel.transform(new_testData)
new_test_accuracy = evaluator.evaluate(new_test_predictions)
print("New Test Accuracy = %g" % (new_test_accuracy))
print("New Test Error = %g" % (1.0 - new_test_accuracy))
In [51]:
print('Financial products not used for training (only personal data)')
# Add Newstages to the pipeline
stages2 = []
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol,
outputCol = categoricalCol + "Index")
stages2 += [stringIndexer] # Add stages to the pipeline
stages2 += [label_stringIdx]
# empty vector is inserted here
new_assemblerInputs = categoricalColumnsIDX + numericCols
new_assembler = VectorAssembler(inputCols = new_assemblerInputs, outputCol = "features")
stages2 += [new_assembler]
# Creating new pipeline
from pyspark.ml import Pipeline
prePipeline2 = Pipeline(stages = stages2)
pipelineModel2 = prePipeline2.fit(df_spark)
dataset2 = pipelineModel2.transform(df_spark)
[trainData2, testData2] = dataset2.randomSplit([0.8, 0.2], seed = 100)
cvModel2 = crossval.fit(trainData2) # This takes time!
# Results:
predictions2 = cvModel2.transform(trainData2)
train_accuracy2 = evaluator.evaluate(predictions2)
print("Training Accuracy = %g" % (train_accuracy2))
print("Training Error = %g" % (1.0 - train_accuracy2))
test_predictions2 = cvModel2.transform(testData2)
test_accuracy2 = evaluator.evaluate(test_predictions2)
print("Test Accuracy = %g" % (test_accuracy2))
print("Test Error = %g" % (1.0 - test_accuracy2))
In [ ]:
print('Training set size evaluation')
%time
# size of different training set to be evaluated, and split of training set
sizes = [0.1, 0.001, 0.00001]
data = trainData.randomSplit(sizes, seed = 100)
# model performance with full dataset, from previous experiment
print('\n\n=== training set of size 100%:')
print("Classification Error = %g" % (1.0 - new_train_accuracy))
i = 0
for split in data:
print('\n\n=== training set of size reduced to {}%, wait please'.format(sizes[i]*100))
cvModel = crossval.fit(split)
predictions = cvModel.transform(split)
accuracy = evaluator.evaluate(predictions)
print("Classification Error = %g" % (1.0 - accuracy))
i+=1
In [ ]:
# Define hyperparameters and their values to search and evaluate
%time
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [10,100,500]) \
.addGrid(rf.maxDepth, [2,10]).build()
# cross-validation of model performance during grid-search
crossval = TrainValidationSplit(estimator = rf,
estimatorParamMaps = paramGrid,
evaluator = evaluator,
trainRatio = 0.9)
# Run cross-validation, and choose the best set of parameters.
print('starting Hyperparameter Grid Search with cross-validation')
cvModel = crossval.fit(trainData)
print('Grid Search with cross-validation has finished')
# pick best model
rfModel = cvModel.bestModel
print (rfModel)
In [ ]:
# Make predictions for test set and compute test error
predictions = rfModel.transform(testData)
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
As expected, Random Forest is able to return a good generalisation accuracy (validated on the Test Data). The average classification accuracy on the Test Data across different models tested is higher than 98%. The level of abstraction and flexibility offer by the RF model allows for a excellent accuracy results even when the number of predictors is reduced, e.i. not using existing financial products as predictors, or removing numerical predictors from the training set (as seen in section 4.1). This process resulted in a simpler model yet equally powerful.
As expected, the model is sensitive to the reduction in the size of the training data set. The more data the better the model. More training data reduces the effect of outliers and increases the generalisation accuracy of the final model. Besides, more data is likely to reduce the effect of bias in the data. As seen in section 4.2, the accuracy can vary widely depending on randomness selecting training/validation data. Less data makes more likely selecting a no representative sample for training the model, or for the evaluation.
The grid search performed as part of this analysis shows that a RF containing more small trees (low depth) provided better results with less computational cost than a smaller number of deeper trees. Depth trees increase computational cost exponentially and have greater risk of overfitting, without a significant gain in performance (section 4.3).
In summary, Random Forest is a valid approach to perform classification predictions given the nature of the data, large, complex, non-linear and non-uniform.
In [ ]: