In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
In [3]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)
In [4]:
cuse = spark.read.csv('data/cuse_binary.csv', header=True, inferSchema=True)
cuse.show(5)
The following code does three things with pipeline:
StringIndexer
all categorical columnsOneHotEncoder
all categorical index columnsVectorAssembler
all feature columns into one vector column
In [5]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
# categorical columns
categorical_columns = cuse.columns[0:3]
In [6]:
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical_columns]
# encode label column and add it to stringindexer_stages
stringindexer_stages += [StringIndexer(inputCol='y', outputCol='label')]
In [7]:
onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical_columns]
In [8]:
feature_columns = ['onehot_' + c for c in categorical_columns]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')
In [9]:
# all stages
all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]
pipeline = Pipeline(stages=all_stages)
In [10]:
pipeline_model = pipeline.fit(cuse)
In [11]:
final_columns = feature_columns + ['features', 'label']
cuse_df = pipeline_model.transform(cuse).\
select(final_columns)
cuse_df.show(5)
In [12]:
training, test = cuse_df.randomSplit([0.8, 0.2], seed=1234)
In [13]:
from pyspark.ml.classification import LogisticRegression
logr = LogisticRegression(featuresCol='features', labelCol='label')
In [14]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
addGrid(logr.regParam, [0, 0.5, 1, 2]).\
addGrid(logr.elasticNetParam, [0, 0.5, 1]).\
build()
In [15]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
In [16]:
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=logr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)
In [17]:
cv_model = cv.fit(cuse_df)
In [18]:
show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']
In [19]:
pred_training_cv = cv_model.transform(training)
pred_training_cv.select(show_columns).show(5, truncate=False)
In [20]:
pred_test_cv = cv_model.transform(test)
pred_test_cv.select(show_columns).show(5, truncate=False)
In [21]:
print('Intercept: ' + str(cv_model.bestModel.intercept) + "\n"
'coefficients: ' + str(cv_model.bestModel.coefficients))
In [22]:
print('The best RegParam is: ', cv_model.bestModel._java_obj.getRegParam(), "\n",
'The best ElasticNetParam is: cv_model.bestModel._java_obj.getElasticNetParam()')
#====== This is R code! =========
cuse = read.table('http://data.princeton.edu/wws509/datasets/cuse.dat', header = T)
# convert count data to binary data
not_using = rep(1:nrow(cuse), times=cuse$notUsing)
using = rep(1:nrow(cuse), times=cuse$using)
cuse_binary = cuse[c(not_using, using), 1:3]
cuse_binary$y = c(rep(0, length(not_using)), rep(1, length(using)))
# write data into a file
write.csv(cuse_binary, file='data/cuse_binary.csv', row.names = FALSE)
#====== This is R code! =========
cuse_binary$age = factor(cuse_binary$age,
levels = names(sort(table(cuse_binary$age), decreasing = TRUE)))
cuse_binary$education = factor(cuse_binary$education,
levels = names(sort(table(cuse_binary$education), decreasing = TRUE)))
cuse_binary$wantsMore = factor(cuse_binary$wantsMore,
levels = names(sort(table(cuse_binary$wantsMore), decreasing = TRUE)))
# encode label column
cuse_binary$y = factor(cuse_binary$y,
levels = names(sort(table(cuse_binary$y))))
glm_cuse = glm(y~age + education + wantsMore, data = cuse_binary, family = binomial(link = "logit"))
#====== This is R code! =========
glm_cuse$coefficients
(Intercept) age25-29 age<25 age40-49 educationlow wantsMoreno
0.7325613 0.5192319 0.9086135 -0.2806254 0.3249947 -0.8329548
In [ ]: