In [176]:
cuse = spark.read.csv('data/cuse_binary.csv', header=True, inferSchema=True)
cuse.show(5)
In [254]:
cuse.columns[0:3]
# cuse.select('age').distinct().show()
cuse.select('age').rdd.countByValue()
# cuse.select('education').rdd.countByValue()
Out[254]:
In [256]:
# string index each categorical string columns
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
indexers = [StringIndexer(inputCol=column, outputCol="indexed_"+column) for column in ('age', 'education', 'wantsMore')]
pipeline = Pipeline(stages=indexers)
indexed_cuse = pipeline.fit(cuse).transform(cuse)
indexed_cuse.select('age', 'indexed_age').distinct().show(5)
In [283]:
# onehotencode each indexed categorical columns
from pyspark.ml.feature import OneHotEncoder
columns = indexed_cuse.columns[0:3]
onehoteencoders = [OneHotEncoder(inputCol="indexed_"+column, outputCol="onehotencode_"+column) for column in columns]
pipeline = Pipeline(stages=onehoteencoders)
onehotencode_columns = ['onehotencode_age', 'onehotencode_education', 'onehotencode_wantsMore', 'y']
onehotencode_cuse = pipeline.fit(indexed_cuse).transform(indexed_cuse).select(onehotencode_columns)
onehotencode_cuse.distinct().show(5)
In [206]:
# assemble all feature columns into on single vector column
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['onehotencode_age', 'onehotencode_education', 'onehotencode_wantsMore'], outputCol='features')
cuse_df_2 = assembler.transform(onehotencode_cuse).withColumnRenamed('y', 'label')
cuse_df_2.show(5)
In [284]:
# split data into training and test datasets
training, test = cuse_df_2.randomSplit([0.8, 0.2], seed=1234)
training.show(5)
In [285]:
## ======= build cross validation model ===========
# estimator
from pyspark.ml.regression import GeneralizedLinearRegression
glm = GeneralizedLinearRegression(featuresCol='features', labelCol='label', family='binomial')
# parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
addGrid(glm.regParam, [0, 0.5, 1, 2, 4]).\
build()
# evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction')
# build cross-validation model
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=glm, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)
In [294]:
# fit model
# cv_model = cv.fit(training)
cv_model = cv.fit(cuse_df_2)
In [302]:
# prediction
pred_training_cv = cv_model.transform(training)
pred_test_cv = cv_model.transform(test)
pred_training_cv.show(5)
pred_test_cv.show(5, truncate=False)
In [303]:
cv_model.bestModel.coefficients
Out[303]:
In [304]:
cv_model.bestModel.intercept
Out[304]:
In [305]:
cv_model.bestModel
Out[305]:
In [306]:
evaluator.evaluate(pred_training_cv)
Out[306]:
In [307]:
evaluator.evaluate(pred_test_cv)
Out[307]:
In [301]:
cv_model.bestModel
Out[301]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [320]:
import pandas as pd
pdf = pd.DataFrame({
'x1': ['a','a','b','c'],
'x2': ['apple', 'orange', 'orange', 'peach'],
'x3': [1, 1, 2, 4],
'x4': [2.4, 2.5, 3.5, 1.4],
'y1': [1, 0, 0, 1],
'y2': ['yes', 'no', 'no', 'yes']
})
df = spark.createDataFrame(pdf)
In [321]:
df.show()
In [ ]: