Create entry points to spark


In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')

from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .appName("Python Spark SQL basic example") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()

Linear regression without cross-valiation

Import data


In [2]:
ad = spark.read.csv('data/Advertising.csv', header=True, inferSchema=True)
ad.show(5)


+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3|  9.3|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows

Transform data structure


In [3]:
from pyspark.ml.linalg import Vectors
ad_df = ad.rdd.map(lambda x: [Vectors.dense(x[0:3]), x[-1]]).toDF(['features', 'label'])
ad_df.show(5)


+-----------------+-----+
|         features|label|
+-----------------+-----+
|[230.1,37.8,69.2]| 22.1|
| [44.5,39.3,45.1]| 10.4|
| [17.2,45.9,69.3]|  9.3|
|[151.5,41.3,58.5]| 18.5|
|[180.8,10.8,58.4]| 12.9|
+-----------------+-----+
only showing top 5 rows

Build linear regression model


In [4]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol = 'label')

Fit the model


In [8]:
lr_model = lr.fit(ad_df)

Prediction


In [11]:
pred = lr_model.transform(ad_df)
pred.show(5)


+-----------------+-----+------------------+
|         features|label|        prediction|
+-----------------+-----+------------------+
|[230.1,37.8,69.2]| 22.1| 20.52397440971517|
| [44.5,39.3,45.1]| 10.4|12.337854820894362|
| [17.2,45.9,69.3]|  9.3|12.307670779994238|
|[151.5,41.3,58.5]| 18.5| 17.59782951168913|
|[180.8,10.8,58.4]| 12.9|13.188671856831299|
+-----------------+-----+------------------+
only showing top 5 rows

Module evaluation


In [12]:
from pyspark.ml.evaluation import RegressionEvaluator 
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label')
evaluator.setMetricName('r2').evaluate(pred)


Out[12]:
0.897210638178952

Compare results with R

The comparison below shows that the linear regression analyses from pyspark and R obtained very close results.

{r}
# intercept and coefficients from R
advertise = read.csv('data/Advertising.csv', header = TRUE)
lr_ad = lm(Sales~., data = advertise)
lr_ad$coefficients

 (Intercept)           TV        Radio    Newspaper 
 2.938889369  0.045764645  0.188530017 -0.001037493

# intercept and coefficents from pyspark
lr_model.intercept

2.9388893694594134

lr_model.coefficients

DenseVector([0.0458, 0.1885, -0.001])

# R squared from R
summary(lr_ad)$r.squared

0.8972106

# R squared from pyspark
evaluator.evaluate(ad_pred, {evaluator.metricName: "r2"})

0.897210638178952

Linear regression with cross-validation

Training and test datasets


In [13]:
training, test = ad_df.randomSplit([0.8, 0.2], seed=123)

Build cross-validation model


In [14]:
##=====build cross valiation model======

# estimator
lr = LinearRegression(featuresCol = 'features', labelCol = 'label')

# parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(lr.regParam, [0, 0.5, 1]).\
    addGrid(lr.elasticNetParam, [0, 0.5, 1]).\
    build()
    
# evaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='r2')

# cross-validation model
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

Fit cross-validation model


In [15]:
cv_model = cv.fit(training)

Prediction


In [16]:
pred_training_cv = cv_model.transform(training)
pred_test_cv = cv_model.transform(test)

Evaluation


In [17]:
# performance on training data
evaluator.setMetricName('r2').evaluate(pred_training_cv)


Out[17]:
0.8982486958337326

In [18]:
# performance on test data
evaluator.setMetricName('r2').evaluate(pred_test_cv)


Out[18]:
0.8896562076565583

Intercept and coefficients


In [20]:
print('Intercept: ', cv_model.bestModel.intercept, "\n",
     'coefficients: ', cv_model.bestModel.coefficients)


Intercept:  3.075068686285647 
 coefficients:  [0.0465074974309,0.180854522465,-0.00107520549074]

Get parameter values from the best model

Parameters can be extracted by calling the java property.


In [21]:
print('best regParam: ' + str(cv_model.bestModel._java_obj.getRegParam()) + "\n" +
     'best ElasticNetParam:' + str(cv_model.bestModel._java_obj.getElasticNetParam()))


best regParam: 0.0
best ElasticNetParam:0.0

In [ ]: