In [1]:
    
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('ggplot')
%matplotlib inline
import os
import sys
try:
    # Append PySpark to PYTHONPATH / Spark 2.1.0
    sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python"))
    sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python", "lib",
                                 "py4j-0.10.4-src.zip"))
except KeyError as e:
    print("SPARK_HOME is not set", e)
    sys.exit(1)
    
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml.linalg import Vectors
    
In [2]:
    
spark_session = SparkSession.builder.getOrCreate()
    
In [3]:
    
# y = b_o + b_1*x + error
# b_0 = 0.5, b_1 = 0.3, error = normally distributed
np.random.seed(1) # set the seed
x = np.arange(100)
error = np.random.normal(0, size=(100,))
y = 0.5 + 0.3 * x + error
    
In [4]:
    
sum(x), sum(y) # for testing
    
    Out[4]:
In [5]:
    
plt.scatter(x, y)
pass
    
    
In [6]:
    
data = pd.DataFrame([(i, j) for i, j in zip(x, y)], columns = ["x", "y"])
    
In [7]:
    
data_spark = spark_session.createDataFrame(data)
    
In [8]:
    
data_spark.show()
    
    
In [9]:
    
df = spark_session.createDataFrame((data_spark
 .rdd
 .map(lambda row: (row[1], 0.5, Vectors.dense(row[0]))) 
), ["label", "weight", "features"])
    
In [10]:
    
df.columns, df.count() # for testing
    
    Out[10]:
In [11]:
    
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight")
    
In [12]:
    
model = lr.fit(df)
    
In [13]:
    
model.coefficients.values[0] # for testing
    
    Out[13]:
In [14]:
    
model.intercept # for testing
    
    Out[14]:
In [15]:
    
b_0_hat = model.intercept
b_1_hat = model.coefficients.values[0]
y_hat = b_0_hat + b_1_hat * x
    
In [16]:
    
plt.scatter(x, y)
plt.plot(x, y_hat)
pass
    
    
In [17]:
    
model.transform(df).show()
    
    
In [18]:
    
# save model
model.write().overwrite().save("model")
    
In [19]:
    
model_load = LinearRegressionModel.load("model")
    
In [20]:
    
# need the index as dummy column
predict_df = spark_session.createDataFrame([(1, Vectors.dense(0))], ["index", "features"])
    
In [21]:
    
model_load.transform(predict_df).show()