In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
In [2]:
ad = spark.read.csv('data/Advertising.csv', header=True, inferSchema=True)
ad.show(5)
In [3]:
from pyspark.ml.linalg import Vectors
ad_df = ad.rdd.map(lambda x: [Vectors.dense(x[0:3]), x[-1]]).toDF(['features', 'label'])
ad_df.show(5)
In [4]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol = 'label')
In [8]:
lr_model = lr.fit(ad_df)
In [11]:
pred = lr_model.transform(ad_df)
pred.show(5)
In [12]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label')
evaluator.setMetricName('r2').evaluate(pred)
Out[12]:
{r}
# intercept and coefficients from R
advertise = read.csv('data/Advertising.csv', header = TRUE)
lr_ad = lm(Sales~., data = advertise)
lr_ad$coefficients
(Intercept) TV Radio Newspaper
2.938889369 0.045764645 0.188530017 -0.001037493
# intercept and coefficents from pyspark
lr_model.intercept
2.9388893694594134
lr_model.coefficients
DenseVector([0.0458, 0.1885, -0.001])
# R squared from R
summary(lr_ad)$r.squared
0.8972106
# R squared from pyspark
evaluator.evaluate(ad_pred, {evaluator.metricName: "r2"})
0.897210638178952
In [13]:
training, test = ad_df.randomSplit([0.8, 0.2], seed=123)
In [14]:
##=====build cross valiation model======
# estimator
lr = LinearRegression(featuresCol = 'features', labelCol = 'label')
# parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
addGrid(lr.regParam, [0, 0.5, 1]).\
addGrid(lr.elasticNetParam, [0, 0.5, 1]).\
build()
# evaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='r2')
# cross-validation model
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)
In [15]:
cv_model = cv.fit(training)
In [16]:
pred_training_cv = cv_model.transform(training)
pred_test_cv = cv_model.transform(test)
In [17]:
# performance on training data
evaluator.setMetricName('r2').evaluate(pred_training_cv)
Out[17]:
In [18]:
# performance on test data
evaluator.setMetricName('r2').evaluate(pred_test_cv)
Out[18]:
In [20]:
print('Intercept: ', cv_model.bestModel.intercept, "\n",
'coefficients: ', cv_model.bestModel.coefficients)
Parameters can be extracted by calling the java property.
In [21]:
print('best regParam: ' + str(cv_model.bestModel._java_obj.getRegParam()) + "\n" +
'best ElasticNetParam:' + str(cv_model.bestModel._java_obj.getElasticNetParam()))
In [ ]: