In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('ggplot')
%matplotlib inline
import os
import sys
try:
# Append PySpark to PYTHONPATH / Spark 2.1.0
sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python"))
sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python", "lib",
"py4j-0.10.4-src.zip"))
except KeyError as e:
print("SPARK_HOME is not set", e)
sys.exit(1)
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml.linalg import Vectors
In [2]:
spark_session = SparkSession.builder.getOrCreate()
In [3]:
# y = b_o + b_1*x + error
# b_0 = 0.5, b_1 = 0.3, error = normally distributed
np.random.seed(1) # set the seed
x = np.arange(100)
error = np.random.normal(0, size=(100,))
y = 0.5 + 0.3 * x + error
In [4]:
sum(x), sum(y) # for testing
Out[4]:
In [5]:
plt.scatter(x, y)
pass
In [6]:
data = pd.DataFrame([(i, j) for i, j in zip(x, y)], columns = ["x", "y"])
In [7]:
data_spark = spark_session.createDataFrame(data)
In [8]:
data_spark.show()
In [9]:
df = spark_session.createDataFrame((data_spark
.rdd
.map(lambda row: (row[1], 0.5, Vectors.dense(row[0])))
), ["label", "weight", "features"])
In [10]:
df.columns, df.count() # for testing
Out[10]:
In [11]:
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight")
In [12]:
model = lr.fit(df)
In [13]:
model.coefficients.values[0] # for testing
Out[13]:
In [14]:
model.intercept # for testing
Out[14]:
In [15]:
b_0_hat = model.intercept
b_1_hat = model.coefficients.values[0]
y_hat = b_0_hat + b_1_hat * x
In [16]:
plt.scatter(x, y)
plt.plot(x, y_hat)
pass
In [17]:
model.transform(df).show()
In [18]:
# save model
model.write().overwrite().save("model")
In [19]:
model_load = LinearRegressionModel.load("model")
In [20]:
# need the index as dummy column
predict_df = spark_session.createDataFrame([(1, Vectors.dense(0))], ["index", "features"])
In [21]:
model_load.transform(predict_df).show()