Imports And Setup

You may see a lot of posts online telling you to set up an individual sparkcontext variable. Please note those are from versions ~1.6 and lower and are no longer relevent in 2.0. Now you should only make one SparkSession and access spark context from spark.sparkContext.


In [16]:
# imports
import pandas as pd
import numpy as np
import time
import os
from tabulate import tabulate

import sys
from operator import add
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F #https://stackoverflow.com/questions/39504950/python-pyspark-get-sum-of-a-pyspark-dataframe-column-values
from pyspark.sql.functions import monotonically_increasing_id

from DataPreperation import DataPreperation


#.config('spark.executor.cores','6') \
spark = SparkSession.builder \
        .appName("App") \
        .getOrCreate()
        # .master("local[*]") \
        # .config('spark.cores.max','16')
        #.master("local") \
        # .config("spark.some.config.option", "some-value") \

spark.sparkContext.setLogLevel('WARN') #Get rid of all the junk in output

Y            = 'SalePrice'
ID_VAR       = 'Id'
DROPS        = [ID_VAR]

original_train = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/train.csv')
original_test = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/test.csv')


#add an id column for row reference
# original_train.withColumn("id", monotonically_increasing_id())
# original_test.withColumn("id", monotonically_increasing_id())


#this needs to be done for h2o glm.predict() bug (which needs same number of columns)
# test = test.withColumn(Y,test[ID_VAR])


# (train,valid) = original_train.randomSplit([0.7,0.3], seed=123)

# train.describe().show()

Data types

Lets see which variables are categorical and which are numeric. We will need to handle the numeric data later.


In [17]:
numerics, categoricals = DataPreperation.get_type_lists(frame=original_train,rejects=[ID_VAR,Y],frame_type='spark')


Numeric = ['MSSubClass', 'LotArea', 'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd', 'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'TotRmsAbvGrd', 'Fireplaces', 'GarageCars', 'GarageArea', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch', 'PoolArea', 'MiscVal', 'MoSold', 'YrSold']

Categorical = ['MSZoning', 'LotFrontage', 'Street', 'Alley', 'LotShape', 'LandContour', 'Utilities', 'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1', 'Condition2', 'BldgType', 'HouseStyle', 'RoofStyle', 'RoofMatl', 'Exterior1st', 'Exterior2nd', 'MasVnrType', 'MasVnrArea', 'ExterQual', 'ExterCond', 'Foundation', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2', 'Heating', 'HeatingQC', 'CentralAir', 'Electrical', 'KitchenQual', 'Functional', 'FireplaceQu', 'GarageType', 'GarageYrBlt', 'GarageFinish', 'GarageQual', 'GarageCond', 'PavedDrive', 'PoolQC', 'Fence', 'MiscFeature', 'SaleType', 'SaleCondition']

Dealing with Outliers

Lets look a possible outlier. It may not be an outlier and it may be best to keep the column as is, but lets just pretend it is actually an outlier.


In [18]:
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)

original_train.select('TotalBsmtSF',Y).toPandas().head()
trace = go.Scatter(
    x = original_train.select('TotalBsmtSF').rdd.flatMap(list).collect(),
    y = original_train.select(Y).rdd.flatMap(list).collect(),
    mode = 'markers'
)
data = [trace]

# Plot and embed in ipython notebook!
iplot(data)#, filename='basic-scatter')


Winsorize for Outliers


In [19]:
original_train = DataPreperation.winsorize_columns(original_train,['TotalBsmtSF'],\
                                                   winzerize_type='percentile',limits =0.1)


For TotalBsmtSF the lower limit is 0.0
For TotalBsmtSF the upper limit is 3206.0

New Chart

After winsorizing the new chart moved all the values > 3200 are now = 3200


In [20]:
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)

original_train.select('TotalBsmtSF',Y).toPandas().head()
trace = go.Scatter(
    x = original_train.select('TotalBsmtSF').rdd.flatMap(list).collect(),
    y = original_train.select(Y).rdd.flatMap(list).collect(),
    mode = 'markers'
)
data = [trace]

# Plot and embed in ipython notebook!
iplot(data)#, filename='basic-scatter')


Label Encoding

When you have an algorithm like an SVM or decision tree that doesn't always numerical values as greater then one another. Or you have an ordinal variable label encoding is a good choice.

(example XGBoost requires you to do this)

convert not likely, likely, very likey into lets say 1,2,3

Note: this must be done before you split the data unlike other data prep techniques


In [21]:
print('Column before encoding...')
print(original_train.select('RoofStyle').rdd.flatMap(list).collect()[0:49])
print()
original_train = DataPreperation.label_encoder(original_train,['RoofStyle'])
print()
numerics, categoricals = DataPreperation.get_type_lists(frame=original_train,rejects=[ID_VAR,Y],frame_type='spark')
print()
print('Column after encoding...')
print(original_train.select('RoofStyle_encoded').rdd.flatMap(list).collect()[0:49])


Column before encoding...
['Gable', 'Gable', 'Gable', 'Gable', 'Gable', 'Gable', 'Gable', 'Gable', 'Gable', 'Gable', 'Hip', 'Hip', 'Hip', 'Gable', 'Hip', 'Gable', 'Gable', 'Gable', 'Gable', 'Hip', 'Gable', 'Gable', 'Hip', 'Gable', 'Gable', 'Gable', 'Gable', 'Gable', 'Gable', 'Gable', 'Gambrel', 'Gable', 'Gable', 'Hip', 'Hip', 'Gable', 'Gable', 'Hip', 'Gable', 'Gable', 'Gable', 'Gable', 'Gable', 'Hip', 'Gable', 'Hip', 'Gable', 'Gable', 'Gable']


Numeric = ['MSSubClass', 'LotArea', 'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd', 'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath', 'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'TotRmsAbvGrd', 'Fireplaces', 'GarageCars', 'GarageArea', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch', 'PoolArea', 'MiscVal', 'MoSold', 'YrSold', 'RoofStyle_encoded']

Categorical = ['MSZoning', 'LotFrontage', 'Street', 'Alley', 'LotShape', 'LandContour', 'Utilities', 'LotConfig', 'LandSlope', 'Neighborhood', 'Condition1', 'Condition2', 'BldgType', 'HouseStyle', 'RoofStyle', 'RoofMatl', 'Exterior1st', 'Exterior2nd', 'MasVnrType', 'MasVnrArea', 'ExterQual', 'ExterCond', 'Foundation', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2', 'Heating', 'HeatingQC', 'CentralAir', 'Electrical', 'KitchenQual', 'Functional', 'FireplaceQu', 'GarageType', 'GarageYrBlt', 'GarageFinish', 'GarageQual', 'GarageCond', 'PavedDrive', 'PoolQC', 'Fence', 'MiscFeature', 'SaleType', 'SaleCondition']

Column after encoding...
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0]

Feature interaction

Feature interaction is multiplying two variables together (example columns $x$ and $y$ -> $xy$)


In [22]:
#Here is how to do polynomical expansion
train_corr = DataPreperation.get_top_correlations(original_train,numerics)

In [23]:
# https://plot.ly/python/figure-factory/table/
import plotly.figure_factory as ff

corr_df = pd.DataFrame(columns=['columns', 'correlation', 'correlation_abs'])
for idx, d in enumerate(train_corr):
    corr_df.loc[idx] = [d['columns'],d['correlation'],d['correlation_abs']]
    
table = ff.create_table(corr_df)
iplot(table, filename='pandas_table')



In [24]:
for idx, row in corr_df.iterrows():
    if(corr_df.loc[idx]['correlation_abs'] >0.5 and corr_df.loc[idx]['correlation_abs'] != 1): #Set a cutoff only combine values greater then .7
        original_train = DataPreperation.feature_combiner(original_train,columns=corr_df.loc[idx]['columns'])
        original_test = DataPreperation.feature_combiner(original_test,columns=corr_df.loc[idx]['columns'])
#show the results 
table = ff.create_table(original_train.select('GarageArea','GarageCars','GarageArea|GarageCars').toPandas().sample(10))
table.layout.width=1000
iplot(table, filename='pandas_table')


Combining: GarageArea & GarageCars (1/1)...
DONE combining features.
Combining: GarageArea & GarageCars (1/1)...
DONE combining features.
Combining: 1stFlrSF & TotalBsmtSF (1/1)...
DONE combining features.
Combining: 1stFlrSF & TotalBsmtSF (1/1)...
DONE combining features.
Combining: YearRemodAdd & YearBuilt (1/1)...
DONE combining features.
Combining: YearRemodAdd & YearBuilt (1/1)...
DONE combining features.

Polynomial Expansion

Polynomial expansion is taking a variable and adding polynomial terms such as $x^2$$ $$x ^3$ etc. This can be very helpful especially in regression based models.


In [25]:
original_train = DataPreperation.polynomial_expansion(original_train,['1stFlrSF'],degree=3)
original_test = DataPreperation.polynomial_expansion(original_test,['1stFlrSF'],degree=3)

#show the results 
print(original_train.select(ID_VAR,'1stFlrSF','1stFlrSF_^2','1stFlrSF_^3').toPandas().sample(2))
table = ff.create_table(original_train.select(ID_VAR,'1stFlrSF','1stFlrSF_^2','1stFlrSF_^3').toPandas().sample(10))
table.layout.width=1000
iplot(table, filename='pandas_table')


      Id  1stFlrSF  1stFlrSF_^2   1stFlrSF_^3
924  925      1686    2842596.0  4.792617e+09
649  650       630     396900.0  2.500470e+08

Perturbed Rate-by-Level with Shrunken Averages

This algorithm is good for hanlding any kind of categorical column when the algoithm needs a continuous column. For this slgorithm you MUST split the data BEFORE putting it in other wise you will have feature leakage and will overfit very very very very badly. You also want to perturb the data in insert random noise to further prevent overfitting.

Formula: $$(1 − λ) * levelmean + λ * overallmean*purtubedamount$$


In [26]:
(train,valid) = original_train.randomSplit([0.7,0.3], seed=123)

print("Encoding numberic variables...")
for i, var in enumerate(['MSZoning']):
    total = len(categoricals)

    print('Encoding: ' + var + ' (' + str(i+1) + '/' + str(total) + ') ...')
    train,valid, original_test = DataPreperation.shrunken_averages_encoder(train, valid_frame = valid,test_frame=original_test,\
                                                     x=var, y=Y, lambda_=0.15, perturb_range=0.05,threshold=150,\
                                                     test=False, frame_type='spark',test_does_have_y=False,id_col=ID_VAR)        
table = ff.create_table(train.select('MSZoning','MSZoning_Tencode').toPandas().sample(15))
iplot(table, filename='pandas_table')


Encoding numberic variables...
Encoding: MSZoning (1/46) ...

Dimensionality Reduction PCA

This is a way to make your feature set less wide and make a smaller number of features out of a hopefully large number of features. The most common and historic algorithm to do this is Principal Component Analysis (PCA).

Note n_comp will set the number of eigen vectors to return. If its 1 it'll pick the top 1 of all the eigen vectors. Below we can use an n_comp value of 1 or 2 b/c we have two features that we're feeding in.


In [27]:
# original_train = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/train.csv')
# original_test = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/test.csv')
# (train,valid) = original_train.randomSplit([0.7,0.3], seed=123)

#PCA does not handle null values and there was some in test
# train.na.drop()
# valid.na.drop()
# original_test.na.drop()
# original_test.GarageArea.cast('float')
# original_test.GarageCars.cast('float')

for idx, row in corr_df.iterrows():
    if(corr_df.loc[idx]['correlation_abs'] >.7 and corr_df.loc[idx]['correlation_abs'] != 1): #Set a cutoff only combine values greater then .7
        print('Doing PCA for', corr_df.loc[idx]['columns'])
        #The test data was messy so i couldnt include test it has 'NA' which made for errors
        train,valid = DataPreperation.dimensionality_reduction(train, valid_frame = valid,test_frame=None,\
                                                                     columns=corr_df.loc[idx]['columns'],n_comp=2,\
                                                                    random_seed=420,decompositions_to_run=['PCA'],\
                                                                      frame_type='spark',test_does_have_y=False,\
                                                                      only_return_decompositions=False,id_col=ID_VAR,\
                                                                      column_name=corr_df.loc[idx]['columns'][0]+'&'+corr_df.loc[idx]['columns'][1])#show the results 

        
        
table = ff.create_table(train.select('GarageArea','GarageCars','GarageArea&GarageCars_pca_1','1stFlrSF&TotalBsmtSF_pca_2').toPandas()[0:10])
# table = ff.create_table(train.select('1stFlrSF','TotalBsmtSF','1stFlrSF&TotalBsmtSF_pca_1','1stFlrSF&TotalBsmtSF_pca_2').toPandas()[0:10])
table.layout.width=1000
iplot(table, filename='pandas_table')


Doing PCA for ['GarageArea', 'GarageCars']
Doing PCA for ['1stFlrSF', 'TotalBsmtSF']

Dimensionality Reduction SVD (cont.)

SVD's are a nother type of decomposition. Many people claim they work better on large datasets compared to PCA.

"Singular value decomposition is often preferred over eigendecomposition of the covariance matrix because the calculation of the covariance matrix is a source of error. In singular value decomposition, with such a large dataset, we are much more robust to errors due to dynamic range of numbers or computational error."


In [28]:
# original_train = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/train.csv')
# original_test = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/test.csv')
# (train,valid) = original_train.randomSplit([0.7,0.3], seed=123)

#PCA does not handle null values and there was some in test
# train.na.drop()
# valid.na.drop()
# original_test.na.drop()
# original_test.GarageArea.cast('float')
# original_test.GarageCars.cast('float')

for idx, row in corr_df.iterrows():
    if(corr_df.loc[idx]['correlation_abs'] >.7 and corr_df.loc[idx]['correlation_abs'] != 1): #Set a cutoff only combine values greater then .7
        print('Doing SVD for', corr_df.loc[idx]['columns'])
        #The test data was messy so i couldnt include test it has 'NA' which made for errors
        train,valid = DataPreperation.dimensionality_reduction(train, valid_frame = valid,test_frame=None,\
                                                                     columns=corr_df.loc[idx]['columns'],n_comp=2,\
                                                                    random_seed=420,decompositions_to_run=['SVD'],\
                                                                      frame_type='spark',test_does_have_y=False,\
                                                                      only_return_decompositions=False,id_col=ID_VAR,\
                                                                      column_name=corr_df.loc[idx]['columns'][0]+'&'+corr_df.loc[idx]['columns'][1])#show the results 

        
table = ff.create_table(train.select('GarageArea','GarageCars','GarageArea&GarageCars_svd_1','1stFlrSF&TotalBsmtSF_svd_2').toPandas()[0:10])
# table = ff.create_table(train.select('1stFlrSF','TotalBsmtSF','1stFlrSF&TotalBsmtSF_pca_1','1stFlrSF&TotalBsmtSF_pca_2').toPandas()[0:10])
table.layout.width=1000
iplot(table, filename='pandas_table')


Doing SVD for ['GarageArea', 'GarageCars']
Doing SVD for ['1stFlrSF', 'TotalBsmtSF']