In [176]:
cuse = spark.read.csv('data/cuse_binary.csv', header=True, inferSchema=True)
cuse.show(5)


+---+---------+---------+---+
|age|education|wantsMore|  y|
+---+---------+---------+---+
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
+---+---------+---------+---+
only showing top 5 rows


In [254]:
cuse.columns[0:3]
# cuse.select('age').distinct().show()
cuse.select('age').rdd.countByValue()
# cuse.select('education').rdd.countByValue()


Out[254]:
defaultdict(int,
            {Row(age=u'25-29'): 404,
             Row(age=u'30-39'): 612,
             Row(age=u'40-49'): 194,
             Row(age=u'<25'): 397})

In [256]:
# string index each categorical string columns
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
indexers = [StringIndexer(inputCol=column, outputCol="indexed_"+column) for column in ('age', 'education', 'wantsMore')]
pipeline = Pipeline(stages=indexers)
indexed_cuse = pipeline.fit(cuse).transform(cuse)
indexed_cuse.select('age', 'indexed_age').distinct().show(5)


+-----+-----------+
|  age|indexed_age|
+-----+-----------+
|30-39|        0.0|
|  <25|        2.0|
|25-29|        1.0|
|40-49|        3.0|
+-----+-----------+


In [283]:
# onehotencode each indexed categorical columns
from pyspark.ml.feature import OneHotEncoder
columns = indexed_cuse.columns[0:3]
onehoteencoders = [OneHotEncoder(inputCol="indexed_"+column, outputCol="onehotencode_"+column) for column in columns]
pipeline = Pipeline(stages=onehoteencoders)
onehotencode_columns = ['onehotencode_age', 'onehotencode_education', 'onehotencode_wantsMore', 'y']
onehotencode_cuse = pipeline.fit(indexed_cuse).transform(indexed_cuse).select(onehotencode_columns)
onehotencode_cuse.distinct().show(5)


+----------------+----------------------+----------------------+---+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|  y|
+----------------+----------------------+----------------------+---+
|   (3,[1],[1.0])|             (1,[],[])|         (1,[0],[1.0])|  0|
|   (3,[2],[1.0])|         (1,[0],[1.0])|             (1,[],[])|  1|
|   (3,[0],[1.0])|         (1,[0],[1.0])|         (1,[0],[1.0])|  0|
|       (3,[],[])|         (1,[0],[1.0])|         (1,[0],[1.0])|  1|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|  0|
+----------------+----------------------+----------------------+---+
only showing top 5 rows


In [206]:
# assemble all feature columns into on single vector column
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['onehotencode_age', 'onehotencode_education', 'onehotencode_wantsMore'], outputCol='features')
cuse_df_2 = assembler.transform(onehotencode_cuse).withColumnRenamed('y', 'label')
cuse_df_2.show(5)


+----------------+----------------------+----------------------+-----+-------------------+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|label|           features|
+----------------+----------------------+----------------------+-----+-------------------+
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
+----------------+----------------------+----------------------+-----+-------------------+
only showing top 5 rows


In [284]:
# split data into training and test datasets
training, test = cuse_df_2.randomSplit([0.8, 0.2], seed=1234)
training.show(5)


+----------------+----------------------+----------------------+-----+---------+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|label| features|
+----------------+----------------------+----------------------+-----+---------+
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
+----------------+----------------------+----------------------+-----+---------+
only showing top 5 rows


In [285]:
## ======= build cross validation model ===========

# estimator
from pyspark.ml.regression import GeneralizedLinearRegression
glm = GeneralizedLinearRegression(featuresCol='features', labelCol='label', family='binomial')

# parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(glm.regParam, [0, 0.5, 1, 2, 4]).\
    build()
    
# evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction')

# build cross-validation model
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=glm, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

In [294]:
# fit model
# cv_model = cv.fit(training)
cv_model = cv.fit(cuse_df_2)

In [302]:
# prediction
pred_training_cv = cv_model.transform(training)
pred_test_cv = cv_model.transform(test)

pred_training_cv.show(5)
pred_test_cv.show(5, truncate=False)


+----------------+----------------------+----------------------+-----+---------+------------------+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|label| features|        prediction|
+----------------+----------------------+----------------------+-----+---------+------------------+
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151293|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151293|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151293|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151293|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151293|
+----------------+----------------------+----------------------+-----+---------+------------------+
only showing top 5 rows

+----------------+----------------------+----------------------+-----+---------+------------------+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|label|features |prediction        |
+----------------+----------------------+----------------------+-----+---------+------------------+
|(3,[],[])       |(1,[],[])             |(1,[],[])             |0    |(5,[],[])|0.5140024065151293|
|(3,[],[])       |(1,[],[])             |(1,[],[])             |0    |(5,[],[])|0.5140024065151293|
|(3,[],[])       |(1,[],[])             |(1,[],[])             |0    |(5,[],[])|0.5140024065151293|
|(3,[],[])       |(1,[],[])             |(1,[],[])             |0    |(5,[],[])|0.5140024065151293|
|(3,[],[])       |(1,[],[])             |(1,[],[])             |0    |(5,[],[])|0.5140024065151293|
+----------------+----------------------+----------------------+-----+---------+------------------+
only showing top 5 rows


In [303]:
cv_model.bestModel.coefficients


Out[303]:
DenseVector([-0.2806, -0.7999, -1.1892, 0.325, -0.833])

In [304]:
cv_model.bestModel.intercept


Out[304]:
0.056024275169240606

In [305]:
cv_model.bestModel


Out[305]:
GeneralizedLinearRegression_46189bdb9cb6a9a002f6

In [306]:
evaluator.evaluate(pred_training_cv)


Out[306]:
0.6722119042705738

In [307]:
evaluator.evaluate(pred_test_cv)


Out[307]:
0.6769880972917027

In [301]:
cv_model.bestModel


Out[301]:
GeneralizedLinearRegression_46189bdb9cb6a9a002f6

In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [320]:
import pandas as pd
pdf = pd.DataFrame({
        'x1': ['a','a','b','c'],
        'x2': ['apple', 'orange', 'orange', 'peach'],
        'x3': [1, 1, 2, 4],
        'x4': [2.4, 2.5, 3.5, 1.4],
        'y1': [1, 0, 0, 1],
        'y2': ['yes', 'no', 'no', 'yes']
    })

df = spark.createDataFrame(pdf)

In [321]:
df.show()


+---+------+---+---+---+---+
| x1|    x2| x3| x4| y1| y2|
+---+------+---+---+---+---+
|  a| apple|  1|2.4|  1|yes|
|  a|orange|  1|2.5|  0| no|
|  b|orange|  2|3.5|  0| no|
|  c| peach|  4|1.4|  1|yes|
+---+------+---+---+---+---+


In [ ]: