Pyspark for Linear Regression

1. Set up spark context and SparkSession


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

2. Load dataset


In [2]:
df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').load("./data/Advertising.csv",header=True);

In [3]:
df.take(2)
df.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)


In [4]:
df.show(6)


+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
|  6|  8.7| 48.9|     75.0|  7.2|
+---+-----+-----+---------+-----+
only showing top 6 rows

3. Convert the data to features (dense vector) and label


In [5]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

In [6]:
# convert the data to dense vector
#def transData(row):
#    return Row(label=row["Sales"],
#               features=Vectors.dense([row["TV"],
#                                       row["Radio"],
#                                       row["Newspaper"]]))
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [ ]:
# convert the data to dense vector
#def transData(row):
#    return Row(label=row["Sales"],
#               features=Vectors.dense([row["TV"],
#                                       row["Radio"],
#                                       row["Newspaper"]]))
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

4. Transform the dataset to DataFrame


In [12]:
#transformed = df.rdd.map(transData).toDF() 
data= transData(df)
data.show(6)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.0,230.1,37.8,6...| 22.1|
|[2.0,44.5,39.3,45.1]| 10.4|
|[3.0,17.2,45.9,69.3]|  9.3|
|[4.0,151.5,41.3,5...| 18.5|
|[5.0,180.8,10.8,5...| 12.9|
| [6.0,8.7,48.9,75.0]|  7.2|
+--------------------+-----+
only showing top 6 rows

5. Convert features data format and set up training and test data sets


In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [17]:
featureIndexer = VectorIndexer(inputCol="features", \
                               outputCol="indexedFeatures",\
                               maxCategories=4).fit(data)

In [18]:
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])

5. Fit model (Ridge Regression and the LASSO)


In [19]:
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [21]:
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, lr])

In [22]:
# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

In [30]:
lrmodel= model.stages[1]

In [31]:
lrmodel.coefficients


Out[31]:
DenseVector([0.0, 0.045, 0.1714, 0.0])

In [29]:
# or 
[stage.coefficients for stage in model.stages if hasattr(stage, "coefficients")]


Out[29]:
[DenseVector([0.0, 0.045, 0.1714, 0.0])]

In [38]:
lrmodel.summary.meanAbsoluteError


Out[38]:
1.370089789983176

6. Make predictions


In [25]:
predictions = model.transform(testData)

In [26]:
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)


+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|12.192355219183316|  7.2| [6.0,8.7,48.9,75.0]|
| 11.62882026449108| 11.8|[7.0,57.5,32.8,23.5]|
|12.187974870101684| 13.2|[8.0,120.2,19.6,1...|
|4.1655336452224265|  4.8|   [9.0,8.6,2.1,1.0]|
|12.856341613250077| 10.6|[10.0,199.8,2.6,2...|
+------------------+-----+--------------------+
only showing top 5 rows

8. Evaluation


In [27]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 1.4827
  • summary

In [ ]:
temp_path = 'temp/Users/wenqiangfeng/Dropbox/Spark/Code/model'
modelPath = temp_path + "/lr_model"
model.save(modelPath)
  • save and extract model

In [ ]:
lr2 = model.load(modelPath)

In [ ]:
lr2.coefficients