spark-ml-CA-housing


Predicting CA housing prices using SparkMLib

Boiler plate - initialize SparkSession & Context


In [19]:
# Import SparkSession
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

About CA housing dataset

Number of records: 20640

variables: Lat, Long, Median Age, #rooms, #bedrooms, population in block, households, med income, med house value

Preprocess data


In [4]:
!ls ../datasets/CaliforniaHousing/


cal_housing.data  cal_housing.domain

In [2]:
# load data file
rdd = sc.textFile('../datasets/CaliforniaHousing/cal_housing.data')

# load header
header = sc.textFile('../datasets/CaliforniaHousing/cal_housing.domain')

In [8]:
len(rdd.collect())


Out[8]:
20640

In [10]:
len(rdd.take(5))


Out[10]:
5

In [11]:
rdd.take(5)


Out[11]:
['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000',
 '-122.240000,37.850000,52.000000,1467.000000,190.000000,496.000000,177.000000,7.257400,352100.000000',
 '-122.250000,37.850000,52.000000,1274.000000,235.000000,558.000000,219.000000,5.643100,341300.000000',
 '-122.250000,37.850000,52.000000,1627.000000,280.000000,565.000000,259.000000,3.846200,342200.000000']

In [3]:
# split by comma
rdd = rdd.map(lambda line : line.split(','))

# get the first two lines
rdd.first()


Out[3]:
['-122.230000',
 '37.880000',
 '41.000000',
 '880.000000',
 '129.000000',
 '322.000000',
 '126.000000',
 '8.325200',
 '452600.000000']

Convert RDD to Spark DataFrame


In [4]:
# convert RDD to a dataframe
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

# show the top few DF rows
df.show(5)


+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
only showing top 5 rows


In [5]:
df.printSchema()


root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)


In [6]:
# convert all strings to float using a User Defined Function

from pyspark.sql.types import *

def cast_columns(df):
    for column in df.columns:
        df = df.withColumn(column, df[column].cast(FloatType()))
    return df

new_df = cast_columns(df)

In [7]:
new_df.show(2)


+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|        452600.0|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|        358500.0|      8.3014|    2401.0|       1106.0|    7099.0|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
only showing top 2 rows


In [8]:
new_df.printSchema()


root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)

Exploratory data analysis

Print the summary stats of the table


In [28]:
new_df.describe().show()


+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       households|  housingMedianAge|         latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            20640|             20640|            20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean|499.5396802325581|28.639486434108527|35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.3297528316098| 12.58555761211163|2.135952380602968|  2.003531742932898|115395.61587441359|1.8998217183639696|  1132.46212176534| 421.247905943133|2181.6152515827944|
|    min|              1.0|               1.0|            32.54|            -124.35|           14999.0|            0.4999|               3.0|              1.0|               2.0|
|    max|           6082.0|              52.0|            41.95|            -114.31|          500001.0|           15.0001|           35682.0|           6445.0|           39320.0|
+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+

Feature engineering

Add more columns such as 'number of bedrooms per room', 'rooms per household'. Also scale the 'medianHouseValue' by 1000 so it falls within range of other numbers.


In [ ]:
from pyspark.sql.functions import col

df = df.withColumn('medianHouseValue', col('medianHouseValue')/100000)

In [10]:
df.first()


Out[10]:
Row(households='126.000000', housingMedianAge='41.000000', latitude='37.880000', longitude='-122.230000', medianHouseValue=4.526, medianIncome='8.325200', population='322.000000', totalBedRooms='129.000000', totalRooms='880.000000')

In [11]:
# add rooms per household
df = df.withColumn('roomsPerHousehold', col('totalRooms')/col('households'))

# add population per household (num people in the home)
df = df.withColumn('popPerHousehold', col('population')/col('households'))

# add bedrooms per room
df = df.withColumn('bedroomsPerRoom', col('totalBedRooms')/col('totalRooms'))

In [12]:
df.first()


Out[12]:
Row(households='126.000000', housingMedianAge='41.000000', latitude='37.880000', longitude='-122.230000', medianHouseValue=4.526, medianIncome='8.325200', population='322.000000', totalBedRooms='129.000000', totalRooms='880.000000', roomsPerHousehold=6.984126984126984, popPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

Re-order columns and split table into label and features


In [13]:
df.columns


Out[13]:
['households',
 'housingMedianAge',
 'latitude',
 'longitude',
 'medianHouseValue',
 'medianIncome',
 'population',
 'totalBedRooms',
 'totalRooms',
 'roomsPerHousehold',
 'popPerHousehold',
 'bedroomsPerRoom']

In [14]:
df = df.select('medianHouseValue','households',
 'housingMedianAge',
 'latitude',
 'longitude',
 'medianIncome',
 'population',
 'totalBedRooms',
 'totalRooms',
 'roomsPerHousehold',
 'popPerHousehold',
 'bedroomsPerRoom')

Create a new DataFrame that explicitly labels the columns as labels and features. DenseVector is used to temporarily convert the data into numpy array and regroup into a named column DataFrame


In [15]:
from pyspark.ml.linalg import DenseVector

# return a tuple of first column and all other columns
temp_data = df.rdd.map(lambda x:(x[0], DenseVector(x[1:])))

#construct back a new DataFrame
df2 = spark.createDataFrame(temp_data, ['label','features'])

In [16]:
df2.take(2)


Out[16]:
[Row(label=4.526, features=DenseVector([126.0, 41.0, 37.88, -122.23, 8.3252, 322.0, 129.0, 880.0, 6.9841, 2.5556, 0.1466])),
 Row(label=3.585, features=DenseVector([1138.0, 21.0, 37.86, -122.22, 8.3014, 2401.0, 1106.0, 7099.0, 6.2381, 2.1098, 0.1558]))]

Scale data by shifting mean to 0 and making SD = 1

This ensures all columns have similar levels of variability


In [17]:
# use StandardScaler to scale the features to std normal distribution
from pyspark.ml.feature import StandardScaler

s_scaler_model = StandardScaler(inputCol='features', outputCol='features_scaled')
scaler_fn = s_scaler_model.fit(df2)
scaled_df = scaler_fn.transform(df2)

scaled_df.take(2)


Out[17]:
[Row(label=4.526, features=DenseVector([126.0, 41.0, 37.88, -122.23, 8.3252, 322.0, 129.0, 880.0, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3296, 3.2577, 17.7345, -61.0073, 4.3821, 0.2843, 0.3062, 0.4034, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1138.0, 21.0, 37.86, -122.22, 8.3014, 2401.0, 1106.0, 7099.0, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.9765, 1.6686, 17.7251, -61.0023, 4.3696, 2.1202, 2.6255, 3.254, 2.5213, 0.2031, 2.6851]))]

Split data into training and test sets


In [18]:
train_data, test_data = scaled_df.randomSplit([.8,.2], seed=101)

In [19]:
type(train_data)


Out[19]:
pyspark.sql.dataframe.DataFrame

Perform Multiple Regression

Train the model


In [21]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol='label', maxIter=20)

linear_model = lr.fit(train_data)

Inspect model properties


In [22]:
type(linear_model)


Out[22]:
pyspark.ml.regression.LinearRegressionModel

In [23]:
linear_model.coefficients


Out[23]:
DenseVector([0.0011, 0.0109, -0.4173, -0.4236, 0.4188, -0.0005, 0.0001, 0.0, 0.0275, 0.0012, 3.2844])

Print columns and their coefficients


In [39]:
list(zip(df.columns[1:], linear_model.coefficients))


Out[39]:
[('households', 0.0011435392550412861),
 ('housingMedianAge', 0.010914556934928758),
 ('latitude', -0.41728655702892636),
 ('longitude', -0.42357898833074664),
 ('medianIncome', 0.41879550542755656),
 ('population', -0.00047200983464106163),
 ('totalBedRooms', 0.00011060741530102377),
 ('totalRooms', 4.099208155268924e-05),
 ('roomsPerHousehold', 0.027483252262545631),
 ('popPerHousehold', 0.0011993665224223444),
 ('bedroomsPerRoom', 3.2844476401153044)]

In [28]:
linear_model.intercept


Out[28]:
-36.56273436779799

In [31]:
linear_model.summary.numInstances


Out[31]:
16535

MAE from training data


In [35]:
linear_model.summary.meanAbsoluteError * 100000


Out[35]:
49805.60256405839

Thus, MAE on training data is off by $50,000


In [34]:
linear_model.summary.meanSquaredError


Out[34]:
0.46775402314782377

In [45]:
linear_model.summary.rootMeanSquaredError * 100000


Out[45]:
68392.54514549255

Thus, RMSE shows fitting on training data is off by $68,392


In [40]:
list(zip(df.columns[1:], linear_model.summary.pValues))


Out[40]:
[('households', 0.0),
 ('housingMedianAge', 0.0),
 ('latitude', 0.0),
 ('longitude', 0.0),
 ('medianIncome', 0.0),
 ('population', 0.0),
 ('totalBedRooms', 0.2242631044109853),
 ('totalRooms', 0.00010585023878628697),
 ('roomsPerHousehold', 0.0),
 ('popPerHousehold', 0.011952235555041435),
 ('bedroomsPerRoom', 0.0)]

Perform predictions


In [41]:
predicted = linear_model.transform(test_data)
predicted.columns


Out[41]:
['label', 'features', 'features_scaled', 'prediction']

In [43]:
type(predicted)


Out[43]:
pyspark.sql.dataframe.DataFrame

In [47]:
test_predictions = predicted.select('prediction').rdd.map(lambda x:x[0])
test_labels = predicted.select('label').rdd.map(lambda x:x[0])

test_predictions_labels = test_predictions.zip(test_labels)
test_predictions_labels_df = spark.createDataFrame(test_predictions_labels, 
                                                   ['predictions','labels'])

test_predictions_labels_df.take(2)


Out[47]:
[Row(predictions=1.8357791571765532, labels=0.225),
 Row(predictions=-0.9555783395577535, labels=0.225)]

Regression evaluator


In [49]:
from pyspark.ml.evaluation import RegressionEvaluator

linear_reg_eval = RegressionEvaluator(predictionCol='predictions', labelCol='labels')

In [50]:
linear_reg_eval.evaluate(test_predictions_labels_df)


Out[50]:
0.6962295496358668

Errors - MAE, RMSE


In [58]:
# mean absolute error
prediction_mae = linear_reg_eval.evaluate(test_predictions_labels_df, 
                                          {linear_reg_eval.metricName:'mae'}) * 100000
prediction_mae


Out[58]:
49690.440586665725

In [59]:
# RMSE
prediction_rmse = linear_reg_eval.evaluate(test_predictions_labels_df, 
                                           {linear_reg_eval.metricName:'rmse'}) * 100000

prediction_rmse


Out[59]:
69622.95496358669

Compare training vs prediction errors


In [60]:
print('(training error, prediction error)')
print((linear_model.summary.rootMeanSquaredError * 100000, prediction_rmse))
print((linear_model.summary.meanAbsoluteError * 100000, prediction_mae))


(training error, prediction error)
(68392.54514549255, 69622.95496358669)
(49805.60256405839, 49690.440586665725)

Export data as a Pandas DataFrame


In [61]:
predicted_pandas_df = predicted.select('features','prediction').toPandas()
predicted_pandas_df.head()


Out[61]:
features prediction
0 [63.0, 33.0, 37.93, -122.32, 2.675, 216.0, 73.... 1.835779
1 [1439.0, 8.0, 35.43, -116.57, 2.7138, 6835.0, ... -0.955578
2 [15.0, 17.0, 33.92, -114.67, 1.2656, 29.0, 24.... -0.426930
3 [288.0, 20.0, 38.56, -121.36, 1.8288, 667.0, 3... 0.843599
4 [382.0, 52.0, 37.78, -122.41, 1.8519, 1055.0, ... 2.335877

In [62]:
predicted_pandas_df.columns


Out[62]:
Index(['features', 'prediction'], dtype='object')

In [65]:
import pandas as pd
predicted_pandas_df2 = pd.DataFrame(predicted_pandas_df['features'].values.tolist(), 
                                   columns=df.columns[1:])

predicted_pandas_df2.head()


Out[65]:
households housingMedianAge latitude longitude medianIncome population totalBedRooms totalRooms roomsPerHousehold popPerHousehold bedroomsPerRoom
0 63.0 33.0 37.93 -122.32 2.6750 216.0 73.0 296.0 4.698413 3.428571 0.246622
1 1439.0 8.0 35.43 -116.57 2.7138 6835.0 1743.0 9975.0 6.931897 4.749826 0.174737
2 15.0 17.0 33.92 -114.67 1.2656 29.0 24.0 97.0 6.466667 1.933333 0.247423
3 288.0 20.0 38.56 -121.36 1.8288 667.0 332.0 1232.0 4.277778 2.315972 0.269481
4 382.0 52.0 37.78 -122.41 1.8519 1055.0 422.0 1014.0 2.654450 2.761780 0.416174

In [66]:
predicted_pandas_df2['predictedHouseValue'] = predicted_pandas_df['prediction']

Write to disk as CSV


In [67]:
predicted_pandas_df2.to_csv('CA_house_prices_predicted.csv')

In [68]:
!ls


CA_house_prices_predicted.csv  spark_sanity.ipynb
ensure_machine.ipynb	       spark-warehouse
hello-world.ipynb	       Untitled.ipynb
spark-ml-CA-housing.ipynb      using-spark-dataframes.ipynb

In [69]:
predicted_pandas_df2.shape


Out[69]:
(4105, 12)

Publish to GIS


In [9]:
import pandas as pd
from arcgis.gis import GIS
gis = GIS("https://www.arcgis.com","arcgis_python")


Enter password: ········

In [3]:
from arcgis.features import SpatialDataFrame

In [4]:
sdf = SpatialDataFrame.from_csv('CA_house_prices_predicted.csv')
sdf.head(5)


Out[4]:
households housingMedianAge latitude longitude medianIncome population totalBedRooms totalRooms roomsPerHousehold popPerHousehold bedroomsPerRoom predictedHouseValue
0 63.0 33.0 37.93 -122.32 2.6750 216.0 73.0 296.0 4.698413 3.428571 0.246622 1.835779
1 1439.0 8.0 35.43 -116.57 2.7138 6835.0 1743.0 9975.0 6.931897 4.749826 0.174737 -0.955578
2 15.0 17.0 33.92 -114.67 1.2656 29.0 24.0 97.0 6.466667 1.933333 0.247423 -0.426930
3 288.0 20.0 38.56 -121.36 1.8288 667.0 332.0 1232.0 4.277778 2.315972 0.269481 0.843599
4 382.0 52.0 37.78 -122.41 1.8519 1055.0 422.0 1014.0 2.654450 2.761780 0.416174 2.335877

In [11]:
houses_predicted_fc = gis.content.import_data(sdf[:999])
houses_predicted_fc


Out[11]:
<FeatureCollection>

In [14]:
ca_map = gis.map('California')
ca_map



In [18]:
ca_map.add_layer(houses_predicted_fc, {'renderer':'ClassedColorRenderer',
                                      'field_name':'predictedHouseValue'})


 

Spark jobs


In [ ]: