In [4]:
#from pyspark import SparkConf, SparkContext
## set up spark context
#from pyspark.sql import SQLContext
#sc = SparkContext()
#sqlContext = SQLContext(sc)
## set up SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
In [5]:
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').load("./data/Advertising.csv",header=True);
In [6]:
df.take(2)
df.printSchema()
In [7]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
In [8]:
# convert the data to dense vector
#def transData(row):
# return Row(label=row["Sales"],
# features=Vectors.dense([row["TV"],
# row["Radio"],
# row["Newspaper"]]))
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])
In [9]:
#transformed = df.rdd.map(transData).toDF()
transformed= transData(df)
transformed.show(6)
In [10]:
# Import LinearRegression class
from pyspark.ml.regression import LinearRegression
# Define LinearRegression algorithm
lr = LinearRegression()
# Fit 2 models, using different regularization parameters
modelA = lr.fit(transformed, {lr.regParam:0.0})
modelB = lr.fit(transformed, {lr.regParam:1.0})
In [11]:
modelA.coefficients
Out[11]:
In [12]:
modelA.intercept
Out[12]:
In [13]:
# Make predictions
predictionsA = modelA.transform(transformed)
predictionsA.show()
In [14]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse")
RMSE = evaluator.evaluate(predictionsA)
print("ModelA: Root Mean Squared Error = " + str(RMSE))
In [15]:
predictionsB = modelB.transform(transformed)
predictionsB.show()
In [16]:
RMSE = evaluator.evaluate(predictionsB)
print("ModelB: Root Mean Squared Error = " + str(RMSE))
In [17]:
# Import numpy, pandas, and ggplot
import numpy as np
from pandas import *
from ggplot import *
# Create Python DataFrame
pop = transformed.rdd.map(lambda p: (p.features[0])).collect()
sales = transformed.rdd.map(lambda p: (p.label)).collect()
predA = predictionsA.select("prediction").rdd.map(lambda r: r[0]).collect()
predB = predictionsB.select("prediction").rdd.map(lambda r: r[0]).collect()
pydf = DataFrame([predA])
nx,ny = pydf.shape
type1 = Series([0 for x in range(ny)])
type2 = Series([1 for x in range(ny)])
#pydf
# pandas DataFrame
pydf1 = DataFrame({'pop':pop,'sales':sales,'pred':predA,'type':type1})
pydf2 = DataFrame({'pop':pop,'sales':sales,'pred':predB,'type':type2})
frames = [pydf1, pydf2]
result = concat(frames)
result['type'] = result['type'].astype(object)
result
Out[17]:
In [18]:
# Create scatter plot and two regression models (scaling exponential) using ggplot from ggplot import *
ggplot(result, aes(x='pop',y='pred',color='type')) +\
geom_point(colors='blue')
Out[18]:
In [19]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
In [20]:
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').load("./data/Advertising.csv",header=True);
In [21]:
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])
In [22]:
transformed = transData(df)
#transformed.show()
In [23]:
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal")
In [24]:
model = lr.fit(transformed)
In [25]:
model.coefficients
Out[25]:
In [26]:
model.intercept
Out[26]:
In [34]:
def modelsummary(model):
import numpy as np
print ("Note: the last rows are the information for Intercept")
print ("##","-------------------------------------------------")
print ("##"," Estimate Std.Error t Values P-value")
coef = np.append(list(model.coefficients),model.intercept)
Summary=model.summary
for i in range(len(Summary.pValues)):
print ("##",'{:10.6f}'.format(coef[i]),\
'{:10.6f}'.format(Summary.coefficientStandardErrors[i]),\
'{:8.3f}'.format(Summary.tValues[i]),\
'{:10.6f}'.format(Summary.pValues[i]))
print ("##",'---')
print ("##","Mean squared error: % .6f" % Summary.meanSquaredError, ", RMSE: % .6f" % Summary.rootMeanSquaredError )
print ("##","Multiple R-squared: %f" % Summary.r2, ", Total iterations: %i"% Summary.totalIterations)
In [35]:
modelsummary(model)
In [ ]:
temp_path = 'temp/Users/wenqiangfeng/Dropbox/Spark/Code/model'
modelPath = temp_path + "/lr_model"
model.save(modelPath)
In [ ]:
lr2 = model.load(modelPath)
In [ ]:
lr2.coefficients