In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

Logistic regression with pyspark

Import data

In [4]:
cuse ='data/cuse_binary.csv', header=True, inferSchema=True)

|age|education|wantsMore|  y|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
only showing top 5 rows

Process categorical columns

The following code does three things with pipeline:

  • StringIndexer all categorical columns
  • OneHotEncoder all categorical index columns
  • VectorAssembler all feature columns into one vector column

Categorical columns

In [5]:
from import StringIndexer, OneHotEncoder, VectorAssembler
from import Pipeline

# categorical columns
categorical_columns = cuse.columns[0:3]

Build StringIndexer stages

In [6]:
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical_columns]
# encode label column and add it to stringindexer_stages
stringindexer_stages += [StringIndexer(inputCol='y', outputCol='label')]

Build OneHotEncoder stages

In [7]:
onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical_columns]

Build VectorAssembler stage

In [8]:
feature_columns = ['onehot_' + c for c in categorical_columns]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')

Build pipeline model

In [9]:
# all stages
all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]
pipeline = Pipeline(stages=all_stages)

Fit pipeline model

In [10]:
pipeline_model =

Transform data

In [11]:
final_columns = feature_columns + ['features', 'label']
cuse_df = pipeline_model.transform(cuse).\

|   onehot_age|onehot_education|onehot_wantsMore|           features|label|
|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|
|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|
|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|
|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|
|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|
only showing top 5 rows

Split data into training and test datasets

In [12]:
training, test = cuse_df.randomSplit([0.8, 0.2], seed=1234)

Build cross-validation model


In [13]:
from import LogisticRegression
logr = LogisticRegression(featuresCol='features', labelCol='label')

Parameter grid

In [14]:
from import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(logr.regParam, [0, 0.5, 1, 2]).\
    addGrid(logr.elasticNetParam, [0, 0.5, 1]).\


In [15]:
from import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

Cross-validation model

In [16]:
from import CrossValidator
cv = CrossValidator(estimator=logr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

Fit cross-validation model

In [17]:
cv_model =


In [18]:
show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']

Prediction on training data

In [19]:
pred_training_cv = cv_model.transform(training), truncate=False)

|features |label|prediction|rawPrediction                             |probability                            |
|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|
|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|
|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|
|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|
|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|
only showing top 5 rows

Prediction on test data

In [20]:
pred_test_cv = cv_model.transform(test), truncate=False)

|features |label|prediction|rawPrediction                             |probability                            |
|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|
|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|
|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|
|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|
|(5,[],[])|0.0  |1.0       |[-0.05602431718564116,0.05602431718564116]|[0.4859975829890087,0.5140024170109914]|
only showing top 5 rows

Intercept and coefficients of the regression model

In [21]:
print('Intercept: ' + str(cv_model.bestModel.intercept) + "\n"
     'coefficients: ' + str(cv_model.bestModel.coefficients))

Intercept: 0.05602431718564116
coefficients: [-0.280625539774,-0.799857435517,-1.18923909827,0.324994746147,-0.832954766261]

Parameters from the best model

In [22]:
print('The best RegParam is: ', cv_model.bestModel._java_obj.getRegParam(), "\n",
     'The best ElasticNetParam is: cv_model.bestModel._java_obj.getElasticNetParam()')

The best RegParam is:  0.0 
 The best ElasticNetParam is: cv_model.bestModel._java_obj.getElasticNetParam()

Linear regression with R

Import Data (below is R code!)

#====== This is R code! =========
cuse = read.table('', header = T)

# convert count data to binary data
not_using = rep(1:nrow(cuse), times=cuse$notUsing)
using = rep(1:nrow(cuse), times=cuse$using)
cuse_binary = cuse[c(not_using, using), 1:3]
cuse_binary$y = c(rep(0, length(not_using)), rep(1, length(using)))

# write data into a file
write.csv(cuse_binary, file='data/cuse_binary.csv', row.names = FALSE)

Process categorical variables

Process categorical variables so they have the same pattern as in pyspar. Element levels are in the descending order of element frequencies.

#====== This is R code! =========
cuse_binary$age = factor(cuse_binary$age, 
                         levels = names(sort(table(cuse_binary$age), decreasing = TRUE)))
cuse_binary$education = factor(cuse_binary$education,
                               levels = names(sort(table(cuse_binary$education), decreasing = TRUE)))
cuse_binary$wantsMore = factor(cuse_binary$wantsMore,
                               levels = names(sort(table(cuse_binary$wantsMore), decreasing = TRUE)))

# encode label column
cuse_binary$y = factor(cuse_binary$y,
                               levels = names(sort(table(cuse_binary$y))))
glm_cuse = glm(y~age + education + wantsMore, data = cuse_binary, family = binomial(link = "logit"))


#====== This is R code! =========

 (Intercept)     age25-29       age<25     age40-49 educationlow  wantsMoreno 
   0.7325613    0.5192319    0.9086135   -0.2806254    0.3249947   -0.8329548

In [ ]: