In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
In [2]:
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').load("./data/Advertising.csv",header=True);
In [3]:
df.take(2)
df.printSchema()
In [4]:
df.show(6)
In [5]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
In [6]:
# convert the data to dense vector
#def transData(row):
# return Row(label=row["Sales"],
# features=Vectors.dense([row["TV"],
# row["Radio"],
# row["Newspaper"]]))
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])
In [ ]:
# convert the data to dense vector
#def transData(row):
# return Row(label=row["Sales"],
# features=Vectors.dense([row["TV"],
# row["Radio"],
# row["Newspaper"]]))
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])
In [12]:
#transformed = df.rdd.map(transData).toDF()
data= transData(df)
data.show(6)
In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
In [17]:
featureIndexer = VectorIndexer(inputCol="features", \
outputCol="indexedFeatures",\
maxCategories=4).fit(data)
In [18]:
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])
In [19]:
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
In [21]:
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, lr])
In [22]:
# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)
In [30]:
lrmodel= model.stages[1]
In [31]:
lrmodel.coefficients
Out[31]:
In [29]:
# or
[stage.coefficients for stage in model.stages if hasattr(stage, "coefficients")]
Out[29]:
In [38]:
lrmodel.summary.meanAbsoluteError
Out[38]:
In [25]:
predictions = model.transform(testData)
In [26]:
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
In [27]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
In [ ]:
temp_path = 'temp/Users/wenqiangfeng/Dropbox/Spark/Code/model'
modelPath = temp_path + "/lr_model"
model.save(modelPath)
In [ ]:
lr2 = model.load(modelPath)
In [ ]:
lr2.coefficients