In [19]:
# Import SparkSession
from pyspark.sql import SparkSession
# Build the SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("Linear Regression Model") \
.config("spark.executor.memory", "1gb") \
.getOrCreate()
sc = spark.sparkContext
Number of records: 20640
variables: Lat, Long, Median Age, #rooms, #bedrooms, population in block, households, med income, med house value
In [4]:
!ls ../datasets/CaliforniaHousing/
In [2]:
# load data file
rdd = sc.textFile('../datasets/CaliforniaHousing/cal_housing.data')
# load header
header = sc.textFile('../datasets/CaliforniaHousing/cal_housing.domain')
In [8]:
len(rdd.collect())
Out[8]:
In [10]:
len(rdd.take(5))
Out[10]:
In [11]:
rdd.take(5)
Out[11]:
In [3]:
# split by comma
rdd = rdd.map(lambda line : line.split(','))
# get the first two lines
rdd.first()
Out[3]:
In [4]:
# convert RDD to a dataframe
from pyspark.sql import Row
# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0],
latitude=line[1],
housingMedianAge=line[2],
totalRooms=line[3],
totalBedRooms=line[4],
population=line[5],
households=line[6],
medianIncome=line[7],
medianHouseValue=line[8])).toDF()
# show the top few DF rows
df.show(5)
In [5]:
df.printSchema()
In [6]:
# convert all strings to float using a User Defined Function
from pyspark.sql.types import *
def cast_columns(df):
for column in df.columns:
df = df.withColumn(column, df[column].cast(FloatType()))
return df
new_df = cast_columns(df)
In [7]:
new_df.show(2)
In [8]:
new_df.printSchema()
In [28]:
new_df.describe().show()
In [ ]:
from pyspark.sql.functions import col
df = df.withColumn('medianHouseValue', col('medianHouseValue')/100000)
In [10]:
df.first()
Out[10]:
In [11]:
# add rooms per household
df = df.withColumn('roomsPerHousehold', col('totalRooms')/col('households'))
# add population per household (num people in the home)
df = df.withColumn('popPerHousehold', col('population')/col('households'))
# add bedrooms per room
df = df.withColumn('bedroomsPerRoom', col('totalBedRooms')/col('totalRooms'))
In [12]:
df.first()
Out[12]:
In [13]:
df.columns
Out[13]:
In [14]:
df = df.select('medianHouseValue','households',
'housingMedianAge',
'latitude',
'longitude',
'medianIncome',
'population',
'totalBedRooms',
'totalRooms',
'roomsPerHousehold',
'popPerHousehold',
'bedroomsPerRoom')
Create a new DataFrame that explicitly labels the columns as labels and features. DenseVector
is used to temporarily convert the data into numpy array and regroup into a named column DataFrame
In [15]:
from pyspark.ml.linalg import DenseVector
# return a tuple of first column and all other columns
temp_data = df.rdd.map(lambda x:(x[0], DenseVector(x[1:])))
#construct back a new DataFrame
df2 = spark.createDataFrame(temp_data, ['label','features'])
In [16]:
df2.take(2)
Out[16]:
In [17]:
# use StandardScaler to scale the features to std normal distribution
from pyspark.ml.feature import StandardScaler
s_scaler_model = StandardScaler(inputCol='features', outputCol='features_scaled')
scaler_fn = s_scaler_model.fit(df2)
scaled_df = scaler_fn.transform(df2)
scaled_df.take(2)
Out[17]:
In [18]:
train_data, test_data = scaled_df.randomSplit([.8,.2], seed=101)
In [19]:
type(train_data)
Out[19]:
In [21]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol='label', maxIter=20)
linear_model = lr.fit(train_data)
In [22]:
type(linear_model)
Out[22]:
In [23]:
linear_model.coefficients
Out[23]:
Print columns and their coefficients
In [39]:
list(zip(df.columns[1:], linear_model.coefficients))
Out[39]:
In [28]:
linear_model.intercept
Out[28]:
In [31]:
linear_model.summary.numInstances
Out[31]:
MAE from training data
In [35]:
linear_model.summary.meanAbsoluteError * 100000
Out[35]:
Thus, MAE on training data is off by $50,000
In [34]:
linear_model.summary.meanSquaredError
Out[34]:
In [45]:
linear_model.summary.rootMeanSquaredError * 100000
Out[45]:
Thus, RMSE shows fitting on training data is off by $68,392
In [40]:
list(zip(df.columns[1:], linear_model.summary.pValues))
Out[40]:
In [41]:
predicted = linear_model.transform(test_data)
predicted.columns
Out[41]:
In [43]:
type(predicted)
Out[43]:
In [47]:
test_predictions = predicted.select('prediction').rdd.map(lambda x:x[0])
test_labels = predicted.select('label').rdd.map(lambda x:x[0])
test_predictions_labels = test_predictions.zip(test_labels)
test_predictions_labels_df = spark.createDataFrame(test_predictions_labels,
['predictions','labels'])
test_predictions_labels_df.take(2)
Out[47]:
In [49]:
from pyspark.ml.evaluation import RegressionEvaluator
linear_reg_eval = RegressionEvaluator(predictionCol='predictions', labelCol='labels')
In [50]:
linear_reg_eval.evaluate(test_predictions_labels_df)
Out[50]:
In [58]:
# mean absolute error
prediction_mae = linear_reg_eval.evaluate(test_predictions_labels_df,
{linear_reg_eval.metricName:'mae'}) * 100000
prediction_mae
Out[58]:
In [59]:
# RMSE
prediction_rmse = linear_reg_eval.evaluate(test_predictions_labels_df,
{linear_reg_eval.metricName:'rmse'}) * 100000
prediction_rmse
Out[59]:
In [60]:
print('(training error, prediction error)')
print((linear_model.summary.rootMeanSquaredError * 100000, prediction_rmse))
print((linear_model.summary.meanAbsoluteError * 100000, prediction_mae))
In [61]:
predicted_pandas_df = predicted.select('features','prediction').toPandas()
predicted_pandas_df.head()
Out[61]:
In [62]:
predicted_pandas_df.columns
Out[62]:
In [65]:
import pandas as pd
predicted_pandas_df2 = pd.DataFrame(predicted_pandas_df['features'].values.tolist(),
columns=df.columns[1:])
predicted_pandas_df2.head()
Out[65]:
In [66]:
predicted_pandas_df2['predictedHouseValue'] = predicted_pandas_df['prediction']
In [67]:
predicted_pandas_df2.to_csv('CA_house_prices_predicted.csv')
In [68]:
!ls
In [69]:
predicted_pandas_df2.shape
Out[69]:
In [9]:
import pandas as pd
from arcgis.gis import GIS
gis = GIS("https://www.arcgis.com","arcgis_python")
In [3]:
from arcgis.features import SpatialDataFrame
In [4]:
sdf = SpatialDataFrame.from_csv('CA_house_prices_predicted.csv')
sdf.head(5)
Out[4]:
In [11]:
houses_predicted_fc = gis.content.import_data(sdf[:999])
houses_predicted_fc
Out[11]:
In [14]:
ca_map = gis.map('California')
ca_map
In [18]:
ca_map.add_layer(houses_predicted_fc, {'renderer':'ClassedColorRenderer',
'field_name':'predictedHouseValue'})
In [ ]: