Spark Machine Learning Pipeline - Task 2 (Big Data Module)

Miguel Esteras & Alberto Ruiz Benitez de Lugo

This coursework contains an implementation and application of Spark Machine Learning Pipelines. The piepeline is evaluated regarding preprocessing, parametrisation, and scaling.

Section A) Choice of dataset and task

Santander products <-- DataSet

The dataset used is called "Santander Products", part of the Kaggle competition with the same name. The reason of this choice has been the large amount of predictors and responses available within this data. This amount of possible variable combinations increases the complexity of the problem and therefore the selected predictor model should have a deep level of abstraction. Furthermore, this example will appropriately showcase the advantages of using Spark parallel computing for the analysis of large datasets.

The chosen model, "Random Forest" (RF), is able to accommodate large numbers of features of diverse type. This method is currently state-of-the-art in many different Machine Learning fields, like computer vision. RF is expected to reach a good performance in both, classification and regression implementations.

Task

The goal of the pipeline is to predict whether a financial product (a mortgage) will be purchased by a consumer, given the personal and financial data available for each customer.

Section B) Machine Learning Pipeline in Spark

1. Data set initial analysis and summary of pipeline task

1.1 Summary of Pipeline

  • Load Data and first preprocessing (1.2)
  • Descriptive statistics (1.3)
  • Data Cleaning (1.4)
  • Machine learning pipeline Implementation using Random Forest (2.)

1.2. Loading data to RDD and first preprocessing


In [17]:
# load dependencies
import numpy as np
import pandas as pd

type_dict = {'ncodpers':np.int32,
            'ind_ahor_fin_ult1':np.uint8, 'ind_aval_fin_ult1':np.uint8, 
            'ind_cco_fin_ult1':np.uint8,'ind_cder_fin_ult1':np.uint8,
            'ind_cno_fin_ult1':np.uint8,'ind_ctju_fin_ult1':np.uint8,'ind_ctma_fin_ult1':np.uint8,
            'ind_ctop_fin_ult1':np.uint8,'ind_ctpp_fin_ult1':np.uint8,'ind_deco_fin_ult1':np.uint8,
            'ind_deme_fin_ult1':np.uint8,'ind_dela_fin_ult1':np.uint8,'ind_ecue_fin_ult1':np.uint8,
            'ind_fond_fin_ult1':np.uint8,'ind_hip_fin_ult1':np.uint8,'ind_plan_fin_ult1':np.uint8,
            'ind_pres_fin_ult1':np.uint8,'ind_reca_fin_ult1':np.uint8,'ind_tjcr_fin_ult1':np.uint8,
            'ind_valo_fin_ult1':np.uint8,'ind_viv_fin_ult1':np.uint8, 'ind_recibo_ult1':np.uint8 }

# load data from server into dataframe (only loading the top 1,000,000 for demonstration purpose)
df = pd.read_csv("/data/tempstore/santander-products/train_ver2.csv",
                 nrows = 1000000,
                 dtype = type_dict)


/usr/local/lib/python3.5/dist-packages/IPython/core/interactiveshell.py:2717: DtypeWarning: Columns (15) have mixed types. Specify dtype option on import or set low_memory=False.
  interactivity=interactivity, compiler=compiler, result=result)

1.3. Descriptive Statistics


In [18]:
df.describe()


Out[18]:
ncodpers ind_nuevo indrel indrel_1mes tipodom cod_prov ind_actividad_cliente renta ind_ahor_fin_ult1 ind_aval_fin_ult1 ... ind_hip_fin_ult1 ind_plan_fin_ult1 ind_pres_fin_ult1 ind_reca_fin_ult1 ind_tjcr_fin_ult1 ind_valo_fin_ult1 ind_viv_fin_ult1 ind_nomina_ult1 ind_nom_pens_ult1 ind_recibo_ult1
count 1.000000e+06 989218.000000 989218.000000 989218.000000 989218.0 982266.000000 989218.000000 8.248170e+05 1000000.000000 1000000.000000 ... 1000000.000000 1000000.000000 1000000.000000 1000000.000000 1000000.000000 1000000.000000 1000000.000000 994598.000000 994598.000000 1000000.000000
mean 6.905967e+05 0.000489 1.109074 1.000085 1.0 26.852131 0.564971 1.396462e+05 0.000177 0.000039 ... 0.009982 0.014553 0.004661 0.072581 0.066084 0.039378 0.006442 0.071629 0.079543 0.166275
std 4.044084e+05 0.022114 3.267624 0.012954 0.0 12.422924 0.495761 2.389858e+05 0.013303 0.006245 ... 0.099410 0.119755 0.068112 0.259448 0.248429 0.194493 0.080003 0.257873 0.270584 0.372327
min 1.588900e+04 0.000000 1.000000 1.000000 1.0 1.000000 0.000000 1.202730e+03 0.000000 0.000000 ... 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000
25% 3.364110e+05 0.000000 1.000000 1.000000 1.0 18.000000 0.000000 7.157184e+04 0.000000 0.000000 ... 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000
50% 6.644760e+05 0.000000 1.000000 1.000000 1.0 28.000000 1.000000 1.066519e+05 0.000000 0.000000 ... 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000
75% 1.074511e+06 0.000000 1.000000 1.000000 1.0 33.000000 1.000000 1.634325e+05 0.000000 0.000000 ... 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000
max 1.379131e+06 1.000000 99.000000 3.000000 1.0 52.000000 1.000000 2.889440e+07 1.000000 1.000000 ... 1.000000 1.000000 1.000000 1.000000 1.000000 1.000000 1.000000 1.000000 1.000000 1.000000

8 rows × 32 columns

1.4. Data Cleaning


In [19]:
# keep only unique id
unique_ids = pd.Series(df["ncodpers"].unique())
df = df[df.ncodpers.isin(unique_ids)]  
df.count() # number of instances


Out[19]:
fecha_dato               1000000
ncodpers                 1000000
ind_empleado              989218
pais_residencia           989218
sexo                      989214
age                      1000000
fecha_alta                989218
ind_nuevo                 989218
antiguedad               1000000
indrel                    989218
ult_fec_cli_1t              1101
indrel_1mes               989218
tiprel_1mes               989218
indresi                   989218
indext                    989218
conyuemp                     178
canal_entrada             989139
indfall                   989218
tipodom                   989218
cod_prov                  982266
nomprov                   982266
ind_actividad_cliente     989218
renta                     824817
segmento                  989105
ind_ahor_fin_ult1        1000000
ind_aval_fin_ult1        1000000
ind_cco_fin_ult1         1000000
ind_cder_fin_ult1        1000000
ind_cno_fin_ult1         1000000
ind_ctju_fin_ult1        1000000
ind_ctma_fin_ult1        1000000
ind_ctop_fin_ult1        1000000
ind_ctpp_fin_ult1        1000000
ind_deco_fin_ult1        1000000
ind_deme_fin_ult1        1000000
ind_dela_fin_ult1        1000000
ind_ecue_fin_ult1        1000000
ind_fond_fin_ult1        1000000
ind_hip_fin_ult1         1000000
ind_plan_fin_ult1        1000000
ind_pres_fin_ult1        1000000
ind_reca_fin_ult1        1000000
ind_tjcr_fin_ult1        1000000
ind_valo_fin_ult1        1000000
ind_viv_fin_ult1         1000000
ind_nomina_ult1           994598
ind_nom_pens_ult1         994598
ind_recibo_ult1          1000000
dtype: int64

In [20]:
# eliminate mostly empty columns and redundant variables
df.drop(["tipodom","cod_prov", "ult_fec_cli_1t","conyuemp"],axis=1,inplace=True)

In [21]:
# transform to numeric and set missing values to nan
df['age']=pd.to_numeric(df.age, errors='coerce')
df['ind_nuevo']=pd.to_numeric(df.ind_nuevo, errors='coerce')
df['antiguedad']=pd.to_numeric(df.antiguedad, errors='coerce')
df['indrel']=pd.to_numeric(df.indrel, errors='coerce')
df['renta']=pd.to_numeric(df.renta, errors='coerce')
df['indrel_1mes']=pd.to_numeric(df.indrel_1mes, errors='coerce')

In [22]:
# Remove age outliers and nan from age variable
df.loc[df.age < 18,"age"]  = df.loc[(df.age >= 18) & (df.age <= 30),"age"].mean(skipna=True) # replace outlier con mean
df.loc[df.age > 100,"age"] = df.loc[(df.age >= 30) & (df.age <= 100),"age"].mean(skipna=True) # replace outlier con mean
df["age"].fillna(df["age"].mean(),inplace=True) # replace nan with mean
df["age"] = df["age"].astype(int)

In [23]:
# transfor dates to datetime datatype
df["fecha_dato"] = pd.to_datetime(df["fecha_dato"],format="%Y-%m-%d")
df["fecha_alta"] = pd.to_datetime(df["fecha_alta"],format="%Y-%m-%d")
df["fecha_dato"].unique()


Out[23]:
array(['2015-01-28T00:00:00.000000000', '2015-02-28T00:00:00.000000000'], dtype='datetime64[ns]')

In [24]:
# fill datetime missing values
dates=df.loc[:,"fecha_alta"].sort_values().reset_index()
median_date = int(np.median(dates.index.values))
df.loc[df.fecha_alta.isnull(),"fecha_alta"] = dates.loc[median_date,"fecha_alta"]

In [25]:
# check all missing values
df.isnull().any()


Out[25]:
fecha_dato               False
ncodpers                 False
ind_empleado              True
pais_residencia           True
sexo                      True
age                      False
fecha_alta               False
ind_nuevo                 True
antiguedad                True
indrel                    True
indrel_1mes               True
tiprel_1mes               True
indresi                   True
indext                    True
canal_entrada             True
indfall                   True
nomprov                   True
ind_actividad_cliente     True
renta                     True
segmento                  True
ind_ahor_fin_ult1        False
ind_aval_fin_ult1        False
ind_cco_fin_ult1         False
ind_cder_fin_ult1        False
ind_cno_fin_ult1         False
ind_ctju_fin_ult1        False
ind_ctma_fin_ult1        False
ind_ctop_fin_ult1        False
ind_ctpp_fin_ult1        False
ind_deco_fin_ult1        False
ind_deme_fin_ult1        False
ind_dela_fin_ult1        False
ind_ecue_fin_ult1        False
ind_fond_fin_ult1        False
ind_hip_fin_ult1         False
ind_plan_fin_ult1        False
ind_pres_fin_ult1        False
ind_reca_fin_ult1        False
ind_tjcr_fin_ult1        False
ind_valo_fin_ult1        False
ind_viv_fin_ult1         False
ind_nomina_ult1           True
ind_nom_pens_ult1         True
ind_recibo_ult1          False
dtype: bool

In [26]:
# Replace missing values in target features with 0
# target features = boolean indicator as to whether or not that product was owned that month
df.loc[df.ind_nomina_ult1.isnull(), "ind_nomina_ult1"] = 0
df.loc[df.ind_nom_pens_ult1.isnull(), "ind_nom_pens_ult1"] = 0

In [27]:
# Replace other missing values
df.loc[df["ind_nuevo"].isnull(),"ind_nuevo"] = 1                   # new customers id '1'
df.loc[df.antiguedad.isnull(),"antiguedad"] = df.antiguedad.min()
df.loc[df.antiguedad <0, "antiguedad"] = 0                         # new customer antiguedad '0'
df.loc[df.indrel.isnull(),"indrel"] = 1 
df.loc[df.ind_actividad_cliente.isnull(),"ind_actividad_cliente"] = \
df["ind_actividad_cliente"].median()                   # fill in customer activity missing
df.loc[df.nomprov.isnull(),"nomprov"] = "UNKNOWN"      # known values for city of residence
df.loc[df.indfall.isnull(),"indfall"] = "N"            # missing deceased index set to N
df.loc[df.tiprel_1mes.isnull(),"tiprel_1mes"] = "A"    # customer status, if missing = active 
df.tiprel_1mes = df.tiprel_1mes.astype("category")     # customer status as categorical

In [28]:
# Customer type normalization as categorical variable 
map_dict = { 1.0:"1", "1.0":"1", "1":"1", "3.0":"3", "P":"P", 3.0:"3", 2.0:"2", "3":"3", "2.0":"2", "4.0":"4", "4":"4", "2":"2"}
df.indrel_1mes.fillna("P",inplace=True)
df.indrel_1mes = df.indrel_1mes.apply(lambda x: map_dict.get(x,x))
df.indrel_1mes = df.indrel_1mes.astype("category")

In [29]:
# remove rows with any nan value left
df = df.dropna(subset=['renta', 'segmento', 'canal_entrada', 'ind_empleado', 
                       'pais_residencia', 'indresi', 'indresi', 'sexo'], how='any')

In [30]:
# check all missing values are gone
df.isnull().any()


Out[30]:
fecha_dato               False
ncodpers                 False
ind_empleado             False
pais_residencia          False
sexo                     False
age                      False
fecha_alta               False
ind_nuevo                False
antiguedad               False
indrel                   False
indrel_1mes              False
tiprel_1mes              False
indresi                  False
indext                   False
canal_entrada            False
indfall                  False
nomprov                  False
ind_actividad_cliente    False
renta                    False
segmento                 False
ind_ahor_fin_ult1        False
ind_aval_fin_ult1        False
ind_cco_fin_ult1         False
ind_cder_fin_ult1        False
ind_cno_fin_ult1         False
ind_ctju_fin_ult1        False
ind_ctma_fin_ult1        False
ind_ctop_fin_ult1        False
ind_ctpp_fin_ult1        False
ind_deco_fin_ult1        False
ind_deme_fin_ult1        False
ind_dela_fin_ult1        False
ind_ecue_fin_ult1        False
ind_fond_fin_ult1        False
ind_hip_fin_ult1         False
ind_plan_fin_ult1        False
ind_pres_fin_ult1        False
ind_reca_fin_ult1        False
ind_tjcr_fin_ult1        False
ind_valo_fin_ult1        False
ind_viv_fin_ult1         False
ind_nomina_ult1          False
ind_nom_pens_ult1        False
ind_recibo_ult1          False
dtype: bool

In [31]:
df.count() # number of instances


Out[31]:
fecha_dato               824742
ncodpers                 824742
ind_empleado             824742
pais_residencia          824742
sexo                     824742
age                      824742
fecha_alta               824742
ind_nuevo                824742
antiguedad               824742
indrel                   824742
indrel_1mes              824742
tiprel_1mes              824742
indresi                  824742
indext                   824742
canal_entrada            824742
indfall                  824742
nomprov                  824742
ind_actividad_cliente    824742
renta                    824742
segmento                 824742
ind_ahor_fin_ult1        824742
ind_aval_fin_ult1        824742
ind_cco_fin_ult1         824742
ind_cder_fin_ult1        824742
ind_cno_fin_ult1         824742
ind_ctju_fin_ult1        824742
ind_ctma_fin_ult1        824742
ind_ctop_fin_ult1        824742
ind_ctpp_fin_ult1        824742
ind_deco_fin_ult1        824742
ind_deme_fin_ult1        824742
ind_dela_fin_ult1        824742
ind_ecue_fin_ult1        824742
ind_fond_fin_ult1        824742
ind_hip_fin_ult1         824742
ind_plan_fin_ult1        824742
ind_pres_fin_ult1        824742
ind_reca_fin_ult1        824742
ind_tjcr_fin_ult1        824742
ind_valo_fin_ult1        824742
ind_viv_fin_ult1         824742
ind_nomina_ult1          824742
ind_nom_pens_ult1        824742
ind_recibo_ult1          824742
dtype: int64

2. Machine learning pipeline Implementation

Implement a machine learning pipeline in Spark, including feature extractors, transformers, and/or selectors. Test that your pipeline it is correctly implemented and explain your choice of processing steps, learning algorithms, and parameter settings.


In [32]:
# remove any previous spark session and check df file type
spark.stop()
type(df)


Out[32]:
pandas.core.frame.DataFrame

In [33]:
# Create Spark SQL dataframe 
## IMPORTANT!! - this cell usually takes time due to data volume!!!
## IMPORTANT!! - Only run this cell once! (to run it again, you need to restart the kernel)

from pyspark.sql import SQLContext
sc = SparkContext()
sqlCtx = SQLContext(sc) #print(sc)
df_spark = sqlCtx.createDataFrame(df)
type(df_spark)


Out[33]:
pyspark.sql.dataframe.DataFrame

In [34]:
# define datatypes in dataframe

df_spark = df_spark.select(df_spark.fecha_dato.cast("date"),
                                   df_spark.ncodpers.cast("float"),
                                   df_spark.ind_empleado.cast("string"),
                                   df_spark.pais_residencia.cast("string"),
                                   df_spark.sexo.cast("string"),
                                   df_spark.age.cast("float"),
                                   df_spark.fecha_alta.cast("date"),
                                   df_spark.ind_nuevo.cast("float"),
                                   df_spark.antiguedad.cast("float"),
                                   df_spark.indrel.cast("float"),
                                   df_spark.indrel_1mes.cast("float"),
                                   df_spark.tiprel_1mes.cast("string"),
                                   df_spark.indresi.cast("string"),
                                   df_spark.indext.cast("string"),
                                   df_spark.canal_entrada.cast("string"),
                                   df_spark.indfall.cast("string"),
                                   df_spark.nomprov.cast("string"),
                                   df_spark.ind_actividad_cliente.cast("float"),
                                   df_spark.renta.cast("float"),
                                   df_spark.segmento.cast("string"),
                                   df_spark.ind_ahor_fin_ult1.cast("float"),
                                   df_spark.ind_aval_fin_ult1.cast("float"),
                                   df_spark.ind_cco_fin_ult1.cast("float"),
                                   df_spark.ind_cder_fin_ult1.cast("float"),
                                   df_spark.ind_cno_fin_ult1.cast("float"),
                                   df_spark.ind_ctju_fin_ult1.cast("float"),
                                   df_spark.ind_ctma_fin_ult1.cast("float"),
                                   df_spark.ind_ctop_fin_ult1.cast("float"),
                                   df_spark.ind_ctpp_fin_ult1.cast("float"),
                                   df_spark.ind_deco_fin_ult1.cast("float"),
                                   df_spark.ind_deme_fin_ult1.cast("float"),
                                   df_spark.ind_dela_fin_ult1.cast("float"),
                                   df_spark.ind_ecue_fin_ult1.cast("float"),
                                   df_spark.ind_fond_fin_ult1.cast("float"),
                                   df_spark.ind_hip_fin_ult1.cast("float"),
                                   df_spark.ind_plan_fin_ult1.cast("float"),
                                   df_spark.ind_pres_fin_ult1.cast("float"),
                                   df_spark.ind_reca_fin_ult1.cast("float"),
                                   df_spark.ind_tjcr_fin_ult1.cast("float"),
                                   df_spark.ind_valo_fin_ult1.cast("float"),
                                   df_spark.ind_viv_fin_ult1.cast("float"),
                                   df_spark.ind_nomina_ult1.cast("float"),
                                   df_spark.ind_nom_pens_ult1.cast("float"),
                                   df_spark.ind_recibo_ult1.cast("float"))

In [35]:
df_spark.printSchema()


root
 |-- fecha_dato: date (nullable = true)
 |-- ncodpers: float (nullable = true)
 |-- ind_empleado: string (nullable = true)
 |-- pais_residencia: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- age: float (nullable = true)
 |-- fecha_alta: date (nullable = true)
 |-- ind_nuevo: float (nullable = true)
 |-- antiguedad: float (nullable = true)
 |-- indrel: float (nullable = true)
 |-- indrel_1mes: float (nullable = true)
 |-- tiprel_1mes: string (nullable = true)
 |-- indresi: string (nullable = true)
 |-- indext: string (nullable = true)
 |-- canal_entrada: string (nullable = true)
 |-- indfall: string (nullable = true)
 |-- nomprov: string (nullable = true)
 |-- ind_actividad_cliente: float (nullable = true)
 |-- renta: float (nullable = true)
 |-- segmento: string (nullable = true)
 |-- ind_ahor_fin_ult1: float (nullable = true)
 |-- ind_aval_fin_ult1: float (nullable = true)
 |-- ind_cco_fin_ult1: float (nullable = true)
 |-- ind_cder_fin_ult1: float (nullable = true)
 |-- ind_cno_fin_ult1: float (nullable = true)
 |-- ind_ctju_fin_ult1: float (nullable = true)
 |-- ind_ctma_fin_ult1: float (nullable = true)
 |-- ind_ctop_fin_ult1: float (nullable = true)
 |-- ind_ctpp_fin_ult1: float (nullable = true)
 |-- ind_deco_fin_ult1: float (nullable = true)
 |-- ind_deme_fin_ult1: float (nullable = true)
 |-- ind_dela_fin_ult1: float (nullable = true)
 |-- ind_ecue_fin_ult1: float (nullable = true)
 |-- ind_fond_fin_ult1: float (nullable = true)
 |-- ind_hip_fin_ult1: float (nullable = true)
 |-- ind_plan_fin_ult1: float (nullable = true)
 |-- ind_pres_fin_ult1: float (nullable = true)
 |-- ind_reca_fin_ult1: float (nullable = true)
 |-- ind_tjcr_fin_ult1: float (nullable = true)
 |-- ind_valo_fin_ult1: float (nullable = true)
 |-- ind_viv_fin_ult1: float (nullable = true)
 |-- ind_nomina_ult1: float (nullable = true)
 |-- ind_nom_pens_ult1: float (nullable = true)
 |-- ind_recibo_ult1: float (nullable = true)


In [36]:
# code modified from Spark documentation at:
# https://spark.apache.org/docs/2.1.0/ml-classification-regression.html#random-forest-classifier
# and DataBricks at:
# https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html

# imports dependencies for Random Forest pipeline
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, OneHotEncoder, StringIndexer, VectorAssembler


# IMPORTANT - Define target label (for prediction) from target features. Target select = mortgage products
labels = "ind_hip_fin_ult1"  

# stages in the Pipeline
stages = []
    
# define variables; categorical, countinuous and target features

numericCols = ["age","antiguedad","renta"]

categoricalColumns = ["ind_empleado","pais_residencia","sexo","ind_nuevo","indrel", 
                      "indrel_1mes","tiprel_1mes", "indresi", "indext", "canal_entrada","nomprov", 
                      "ind_actividad_cliente","segmento"]

targetsColumns = ["ind_ahor_fin_ult1", "ind_aval_fin_ult1",
                        "ind_cco_fin_ult1", "ind_cder_fin_ult1", "ind_cno_fin_ult1",
                        "ind_ctma_fin_ult1", "ind_ctop_fin_ult1",
                        "ind_ctpp_fin_ult1", "ind_deco_fin_ult1", "ind_deme_fin_ult1", 
                        "ind_dela_fin_ult1", "ind_ecue_fin_ult1", "ind_fond_fin_ult1",
                        "ind_ctju_fin_ult1", "ind_plan_fin_ult1", "ind_pres_fin_ult1",
                        "ind_reca_fin_ult1", "ind_tjcr_fin_ult1", "ind_valo_fin_ult1", 
                        "ind_viv_fin_ult1", "ind_nomina_ult1", "ind_nom_pens_ult1","ind_recibo_ult1"]

In [37]:
# Use OneHotEncoder to convert categorical variables into binary SparseVectors
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + "Index") # Category Indexing with StringIndexer
    stages += [stringIndexer]  # Add stages to the pipeline

In [38]:
# define categorical index columns 
categoricalColumnsIDX = ["ind_empleadoIndex","pais_residenciaIndex","sexoIndex",
                         "ind_nuevoIndex","indrelIndex","indrel_1mesIndex",
                         "tiprel_1mesIndex","indresiIndex","indextIndex", 
                         "canal_entradaIndex","nomprovIndex","ind_actividad_clienteIndex","segmentoIndex"]

In [39]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = labels,
                                outputCol = "label")
stages += [label_stringIdx]

In [40]:
# Transform all features into a vector using VectorAssembler
assemblerInputs = categoricalColumnsIDX + numericCols + targetsColumns
assembler = VectorAssembler(inputCols = assemblerInputs,
                            outputCol = "features")
stages += [assembler]  # Add stage to the pipeline

In [41]:
prePipeline = Pipeline(stages = stages)
pipelineModel = prePipeline.fit(df_spark)

dataset = pipelineModel.transform(df_spark)

In [42]:
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol = "label", 
                            featuresCol = "features", 
                            numTrees = 100,                 #  Number of trees in the random forest
                            impurity = 'entropy',            # Criterion used for information gain calculation
                            featureSubsetStrategy = "auto",
                            predictionCol = "prediction",
                            maxDepth = 5, 
                            maxBins = 160, 
                            minInstancesPerNode = 2) 
                            #minInfoGain=0.0, 
                            #subsamplingRate=1.0)

Section C) Evaluation of Performance and testing

Evaluate the performance of your pipeline using training and test set (don’t use CV but pyspark.ml.tuning.TrainValidationSplit).

3.1. Evaluate performance of machine learning pipeline on training data and test data.


In [43]:
# imports dependencies
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [44]:
# Split data into training set and testing set
[trainData, testData] = dataset.randomSplit([0.8, 0.2], seed = 100)

In [45]:
# evaluation of model performance
evaluator = MulticlassClassificationEvaluator(labelCol = "label", 
                                              predictionCol = "prediction", 
                                              metricName = "accuracy")
# random forest parameters
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [100]).build()

# cross-validation of model performance during grid-search 
# Method: pyspark.ml.tuning.TrainValidationSplit
crossval = TrainValidationSplit(estimator = rf,
                                estimatorParamMaps = paramGrid,
                                evaluator = evaluator,
                                trainRatio = 0.9)

# Run cross-validation, and choose the best set of parameters.
print('starting cross-validation')
cvModel = crossval.fit(trainData)  # This takes time!
print('finished cross-validation')


starting cross-validation
finished cross-validation

In [46]:
# Make predictions for test set and compute test error
predictions = cvModel.transform(trainData)
train_accuracy = evaluator.evaluate(predictions)
print("Training Accuracy = %g" % (train_accuracy))
print("Training Error = %g" % (1.0 - train_accuracy))


Training Accuracy = 0.989438
Training Error = 0.0105624

In [47]:
# Make predictions for test set and compute test error
predictions = cvModel.transform(testData)
test_accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g" % (test_accuracy))
print("Test Error = %g" % (1.0 - test_accuracy))


Test Accuracy = 0.989246
Test Error = 0.0107541

Section D) Implement a parameter grid - Model fine-tuning

Note: This section takes long time to compute!

Implement a parameter grid (using pyspark.ml.tuning.ParamGridBuilder[source]), varying at least one feature preprocessing step, one machine learning parameter, and the training set size. Document the training and test performance and the time taken for training and testing. Comment on your findings.

4.1. Evaluate model performance using a subset of preprocessing variables

No numeric predictors used, relaunch pipeline with this new preprocessing structure


In [48]:
# New preprocessing stage, without numeric predictors
new_stages = []

# remove preprocessing numeric predictors by including an empty vector
New_numericCols = [] # empty numeric predictors

# Add Newstages to the pipeline
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + "Index")
    new_stages += [stringIndexer]  # Add stages to the pipeline

new_stages += [label_stringIdx]

# empty vector is inserted here
new_assemblerInputs = categoricalColumnsIDX + New_numericCols + targetsColumns
new_assembler = VectorAssembler(inputCols = new_assemblerInputs, outputCol = "features")

new_stages += [new_assembler]

In [49]:
# Creating new pipeline
from pyspark.ml import Pipeline

new_prePipeline = Pipeline(stages = new_stages)
new_pipelineModel = new_prePipeline.fit(df_spark)

new_dataset = new_pipelineModel.transform(df_spark)

In [50]:
[new_trainData, new_testData] = dataset.randomSplit([0.8, 0.2], seed = 100)

new_cvModel = crossval.fit(new_trainData)  # This takes time!

# Results:
print('Numerical predictors not used for training')

new_predictions = cvModel.transform(new_trainData)
new_train_accuracy = evaluator.evaluate(new_predictions)
print("New Training Accuracy = %g" % (new_train_accuracy))
print("New Training Error = %g" % (1.0 - new_train_accuracy))

new_test_predictions = cvModel.transform(new_testData)
new_test_accuracy = evaluator.evaluate(new_test_predictions)
print("New Test Accuracy = %g" % (new_test_accuracy))
print("New Test Error = %g" % (1.0 - new_test_accuracy))


Numerical predictors not used for training
New Training Accuracy = 0.989438
New Training Error = 0.0105624
New Test Accuracy = 0.989246
New Test Error = 0.0107541

In [51]:
print('Financial products not used for training (only personal data)')

# Add Newstages to the pipeline
stages2 = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + "Index")
    stages2 += [stringIndexer]  # Add stages to the pipeline

stages2 += [label_stringIdx]

# empty vector is inserted here
new_assemblerInputs = categoricalColumnsIDX + numericCols
new_assembler = VectorAssembler(inputCols = new_assemblerInputs, outputCol = "features")

stages2 += [new_assembler]

# Creating new pipeline
from pyspark.ml import Pipeline

prePipeline2 = Pipeline(stages = stages2)
pipelineModel2 = prePipeline2.fit(df_spark)

dataset2 = pipelineModel2.transform(df_spark)

[trainData2, testData2] = dataset2.randomSplit([0.8, 0.2], seed = 100)

cvModel2 = crossval.fit(trainData2)  # This takes time!

# Results:
predictions2 = cvModel2.transform(trainData2)
train_accuracy2 = evaluator.evaluate(predictions2)
print("Training Accuracy = %g" % (train_accuracy2))
print("Training Error = %g" % (1.0 - train_accuracy2))

test_predictions2 = cvModel2.transform(testData2)
test_accuracy2 = evaluator.evaluate(test_predictions2)
print("Test Accuracy = %g" % (test_accuracy2))
print("Test Error = %g" % (1.0 - test_accuracy2))


Financial products not used for training (only personal data)
Training Accuracy = 0.989438
Training Error = 0.0105624
Test Accuracy = 0.989246
Test Error = 0.0107541

4.2. Training set size evaluation


In [ ]:
print('Training set size evaluation')

%time

# size of different training set to be evaluated, and split of training set
sizes = [0.1, 0.001, 0.00001]
data = trainData.randomSplit(sizes, seed = 100)

# model performance with full dataset, from previous experiment
print('\n\n=== training set of size 100%:')
print("Classification Error = %g" % (1.0 - new_train_accuracy))

i = 0
for split in data:
    print('\n\n=== training set of size reduced to {}%, wait please'.format(sizes[i]*100))
    cvModel = crossval.fit(split)
    predictions = cvModel.transform(split)
    accuracy = evaluator.evaluate(predictions)
    print("Classification Error = %g" % (1.0 - accuracy))
    i+=1

In [ ]:
# Define hyperparameters and their values to search and evaluate
%time

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10,100,500]) \
    .addGrid(rf.maxDepth, [2,10]).build()

# cross-validation of model performance during grid-search 
crossval = TrainValidationSplit(estimator = rf,
                                estimatorParamMaps = paramGrid,
                                evaluator = evaluator,
                                trainRatio = 0.9)

# Run cross-validation, and choose the best set of parameters.
print('starting Hyperparameter Grid Search with cross-validation')
cvModel = crossval.fit(trainData)
print('Grid Search with cross-validation has finished')

# pick best model
rfModel = cvModel.bestModel
print (rfModel)

In [ ]:
# Make predictions for test set and compute test error
predictions = rfModel.transform(testData)
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Findings and conclusions:

As expected, Random Forest is able to return a good generalisation accuracy (validated on the Test Data). The average classification accuracy on the Test Data across different models tested is higher than 98%. The level of abstraction and flexibility offer by the RF model allows for a excellent accuracy results even when the number of predictors is reduced, e.i. not using existing financial products as predictors, or removing numerical predictors from the training set (as seen in section 4.1). This process resulted in a simpler model yet equally powerful.

As expected, the model is sensitive to the reduction in the size of the training data set. The more data the better the model. More training data reduces the effect of outliers and increases the generalisation accuracy of the final model. Besides, more data is likely to reduce the effect of bias in the data. As seen in section 4.2, the accuracy can vary widely depending on randomness selecting training/validation data. Less data makes more likely selecting a no representative sample for training the model, or for the evaluation.

The grid search performed as part of this analysis shows that a RF containing more small trees (low depth) provided better results with less computational cost than a smaller number of deeper trees. Depth trees increase computational cost exponentially and have greater risk of overfitting, without a significant gain in performance (section 4.3).

In summary, Random Forest is a valid approach to perform classification predictions given the nature of the data, large, complex, non-linear and non-uniform.


In [ ]: