In [16]:
# imports
import pandas as pd
import numpy as np
import time
import os
from tabulate import tabulate
import sys
from operator import add
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F #https://stackoverflow.com/questions/39504950/python-pyspark-get-sum-of-a-pyspark-dataframe-column-values
from pyspark.sql.functions import monotonically_increasing_id
from DataPreperation import DataPreperation
#.config('spark.executor.cores','6') \
spark = SparkSession.builder \
.appName("App") \
.getOrCreate()
# .master("local[*]") \
# .config('spark.cores.max','16')
#.master("local") \
# .config("spark.some.config.option", "some-value") \
spark.sparkContext.setLogLevel('WARN') #Get rid of all the junk in output
Y = 'SalePrice'
ID_VAR = 'Id'
DROPS = [ID_VAR]
original_train = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/train.csv')
original_test = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/test.csv')
#add an id column for row reference
# original_train.withColumn("id", monotonically_increasing_id())
# original_test.withColumn("id", monotonically_increasing_id())
#this needs to be done for h2o glm.predict() bug (which needs same number of columns)
# test = test.withColumn(Y,test[ID_VAR])
# (train,valid) = original_train.randomSplit([0.7,0.3], seed=123)
# train.describe().show()
In [17]:
numerics, categoricals = DataPreperation.get_type_lists(frame=original_train,rejects=[ID_VAR,Y],frame_type='spark')
In [18]:
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)
original_train.select('TotalBsmtSF',Y).toPandas().head()
trace = go.Scatter(
x = original_train.select('TotalBsmtSF').rdd.flatMap(list).collect(),
y = original_train.select(Y).rdd.flatMap(list).collect(),
mode = 'markers'
)
data = [trace]
# Plot and embed in ipython notebook!
iplot(data)#, filename='basic-scatter')
In [19]:
original_train = DataPreperation.winsorize_columns(original_train,['TotalBsmtSF'],\
winzerize_type='percentile',limits =0.1)
In [20]:
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)
original_train.select('TotalBsmtSF',Y).toPandas().head()
trace = go.Scatter(
x = original_train.select('TotalBsmtSF').rdd.flatMap(list).collect(),
y = original_train.select(Y).rdd.flatMap(list).collect(),
mode = 'markers'
)
data = [trace]
# Plot and embed in ipython notebook!
iplot(data)#, filename='basic-scatter')
When you have an algorithm like an SVM or decision tree that doesn't always numerical values as greater then one another. Or you have an ordinal variable label encoding is a good choice.
(example XGBoost requires you to do this)
convert not likely, likely, very likey into lets say 1,2,3
Note: this must be done before you split the data unlike other data prep techniques
In [21]:
print('Column before encoding...')
print(original_train.select('RoofStyle').rdd.flatMap(list).collect()[0:49])
print()
original_train = DataPreperation.label_encoder(original_train,['RoofStyle'])
print()
numerics, categoricals = DataPreperation.get_type_lists(frame=original_train,rejects=[ID_VAR,Y],frame_type='spark')
print()
print('Column after encoding...')
print(original_train.select('RoofStyle_encoded').rdd.flatMap(list).collect()[0:49])
In [22]:
#Here is how to do polynomical expansion
train_corr = DataPreperation.get_top_correlations(original_train,numerics)
In [23]:
# https://plot.ly/python/figure-factory/table/
import plotly.figure_factory as ff
corr_df = pd.DataFrame(columns=['columns', 'correlation', 'correlation_abs'])
for idx, d in enumerate(train_corr):
corr_df.loc[idx] = [d['columns'],d['correlation'],d['correlation_abs']]
table = ff.create_table(corr_df)
iplot(table, filename='pandas_table')
In [24]:
for idx, row in corr_df.iterrows():
if(corr_df.loc[idx]['correlation_abs'] >0.5 and corr_df.loc[idx]['correlation_abs'] != 1): #Set a cutoff only combine values greater then .7
original_train = DataPreperation.feature_combiner(original_train,columns=corr_df.loc[idx]['columns'])
original_test = DataPreperation.feature_combiner(original_test,columns=corr_df.loc[idx]['columns'])
#show the results
table = ff.create_table(original_train.select('GarageArea','GarageCars','GarageArea|GarageCars').toPandas().sample(10))
table.layout.width=1000
iplot(table, filename='pandas_table')
In [25]:
original_train = DataPreperation.polynomial_expansion(original_train,['1stFlrSF'],degree=3)
original_test = DataPreperation.polynomial_expansion(original_test,['1stFlrSF'],degree=3)
#show the results
print(original_train.select(ID_VAR,'1stFlrSF','1stFlrSF_^2','1stFlrSF_^3').toPandas().sample(2))
table = ff.create_table(original_train.select(ID_VAR,'1stFlrSF','1stFlrSF_^2','1stFlrSF_^3').toPandas().sample(10))
table.layout.width=1000
iplot(table, filename='pandas_table')
This algorithm is good for hanlding any kind of categorical column when the algoithm needs a continuous column. For this slgorithm you MUST split the data BEFORE putting it in other wise you will have feature leakage and will overfit very very very very badly. You also want to perturb the data in insert random noise to further prevent overfitting.
Formula: $$(1 − λ) * levelmean + λ * overallmean*purtubedamount$$
In [26]:
(train,valid) = original_train.randomSplit([0.7,0.3], seed=123)
print("Encoding numberic variables...")
for i, var in enumerate(['MSZoning']):
total = len(categoricals)
print('Encoding: ' + var + ' (' + str(i+1) + '/' + str(total) + ') ...')
train,valid, original_test = DataPreperation.shrunken_averages_encoder(train, valid_frame = valid,test_frame=original_test,\
x=var, y=Y, lambda_=0.15, perturb_range=0.05,threshold=150,\
test=False, frame_type='spark',test_does_have_y=False,id_col=ID_VAR)
table = ff.create_table(train.select('MSZoning','MSZoning_Tencode').toPandas().sample(15))
iplot(table, filename='pandas_table')
This is a way to make your feature set less wide and make a smaller number of features out of a hopefully large number of features. The most common and historic algorithm to do this is Principal Component Analysis (PCA).
Note n_comp will set the number of eigen vectors to return. If its 1 it'll pick the top 1 of all the eigen vectors. Below we can use an n_comp value of 1 or 2 b/c we have two features that we're feeding in.
In [27]:
# original_train = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/train.csv')
# original_test = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/test.csv')
# (train,valid) = original_train.randomSplit([0.7,0.3], seed=123)
#PCA does not handle null values and there was some in test
# train.na.drop()
# valid.na.drop()
# original_test.na.drop()
# original_test.GarageArea.cast('float')
# original_test.GarageCars.cast('float')
for idx, row in corr_df.iterrows():
if(corr_df.loc[idx]['correlation_abs'] >.7 and corr_df.loc[idx]['correlation_abs'] != 1): #Set a cutoff only combine values greater then .7
print('Doing PCA for', corr_df.loc[idx]['columns'])
#The test data was messy so i couldnt include test it has 'NA' which made for errors
train,valid = DataPreperation.dimensionality_reduction(train, valid_frame = valid,test_frame=None,\
columns=corr_df.loc[idx]['columns'],n_comp=2,\
random_seed=420,decompositions_to_run=['PCA'],\
frame_type='spark',test_does_have_y=False,\
only_return_decompositions=False,id_col=ID_VAR,\
column_name=corr_df.loc[idx]['columns'][0]+'&'+corr_df.loc[idx]['columns'][1])#show the results
table = ff.create_table(train.select('GarageArea','GarageCars','GarageArea&GarageCars_pca_1','1stFlrSF&TotalBsmtSF_pca_2').toPandas()[0:10])
# table = ff.create_table(train.select('1stFlrSF','TotalBsmtSF','1stFlrSF&TotalBsmtSF_pca_1','1stFlrSF&TotalBsmtSF_pca_2').toPandas()[0:10])
table.layout.width=1000
iplot(table, filename='pandas_table')
SVD's are a nother type of decomposition. Many people claim they work better on large datasets compared to PCA.
"Singular value decomposition is often preferred over eigendecomposition of the covariance matrix because the calculation of the covariance matrix is a source of error. In singular value decomposition, with such a large dataset, we are much more robust to errors due to dynamic range of numbers or computational error."
In [28]:
# original_train = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/train.csv')
# original_test = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data_sets/kaggle_house/test.csv')
# (train,valid) = original_train.randomSplit([0.7,0.3], seed=123)
#PCA does not handle null values and there was some in test
# train.na.drop()
# valid.na.drop()
# original_test.na.drop()
# original_test.GarageArea.cast('float')
# original_test.GarageCars.cast('float')
for idx, row in corr_df.iterrows():
if(corr_df.loc[idx]['correlation_abs'] >.7 and corr_df.loc[idx]['correlation_abs'] != 1): #Set a cutoff only combine values greater then .7
print('Doing SVD for', corr_df.loc[idx]['columns'])
#The test data was messy so i couldnt include test it has 'NA' which made for errors
train,valid = DataPreperation.dimensionality_reduction(train, valid_frame = valid,test_frame=None,\
columns=corr_df.loc[idx]['columns'],n_comp=2,\
random_seed=420,decompositions_to_run=['SVD'],\
frame_type='spark',test_does_have_y=False,\
only_return_decompositions=False,id_col=ID_VAR,\
column_name=corr_df.loc[idx]['columns'][0]+'&'+corr_df.loc[idx]['columns'][1])#show the results
table = ff.create_table(train.select('GarageArea','GarageCars','GarageArea&GarageCars_svd_1','1stFlrSF&TotalBsmtSF_svd_2').toPandas()[0:10])
# table = ff.create_table(train.select('1stFlrSF','TotalBsmtSF','1stFlrSF&TotalBsmtSF_pca_1','1stFlrSF&TotalBsmtSF_pca_2').toPandas()[0:10])
table.layout.width=1000
iplot(table, filename='pandas_table')