In [ ]:
# http://www.techpoweredmath.com/spark-dataframes-mllib-tutorial/

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [2]:
from pyspark.sql import Row

In [3]:
sc = SparkContext(appName="real_estate_example")
rdd = sc.textFile('realEstate.csv')

In [4]:
rdd.take(5)


Out[4]:
['street,city,zip,state,beds,baths,sq__ft,type,sale_date,price,latitude,longitude',
 '3526 HIGH ST,SACRAMENTO,95838,CA,2,1,836,Residential,Wed May 21 00:00:00 EDT 2008,59222,38.631913,-121.434879',
 '51 OMAHA CT,SACRAMENTO,95823,CA,3,1,1167,Residential,Wed May 21 00:00:00 EDT 2008,68212,38.478902,-121.431028',
 '2796 BRANCH ST,SACRAMENTO,95815,CA,2,1,796,Residential,Wed May 21 00:00:00 EDT 2008,68880,38.618305,-121.443839',
 '2805 JANETTE WAY,SACRAMENTO,95815,CA,2,1,852,Residential,Wed May 21 00:00:00 EDT 2008,69307,38.616835,-121.439146']

In [5]:
rdd = rdd.map(lambda line: line.split(","))

In [6]:
rdd.take(2)


Out[6]:
[['street',
  'city',
  'zip',
  'state',
  'beds',
  'baths',
  'sq__ft',
  'type',
  'sale_date',
  'price',
  'latitude',
  'longitude'],
 ['3526 HIGH ST',
  'SACRAMENTO',
  '95838',
  'CA',
  '2',
  '1',
  '836',
  'Residential',
  'Wed May 21 00:00:00 EDT 2008',
  '59222',
  '38.631913',
  '-121.434879']]

In [7]:
header = rdd.first()
rdd = rdd.filter(lambda line:line != header)

In [8]:
rdd.take(2)


Out[8]:
[['3526 HIGH ST',
  'SACRAMENTO',
  '95838',
  'CA',
  '2',
  '1',
  '836',
  'Residential',
  'Wed May 21 00:00:00 EDT 2008',
  '59222',
  '38.631913',
  '-121.434879'],
 ['51 OMAHA CT',
  'SACRAMENTO',
  '95823',
  'CA',
  '3',
  '1',
  '1167',
  'Residential',
  'Wed May 21 00:00:00 EDT 2008',
  '68212',
  '38.478902',
  '-121.431028']]

In [9]:
newrdd = rdd.map(lambda line: Row(street = line[0], city = line[1], zip=line[2], beds=line[4], baths=line[5], sqft=line[6], price=line[9]))

In [10]:
hasattr(newrdd, "toDF")


Out[10]:
False

In [12]:
spark = SparkSession(sc)
hasattr(newrdd, "toDF")


Out[12]:
True

In [13]:
df = newrdd.toDF()

In [14]:
df.take(5)


Out[14]:
[Row(baths='1', beds='2', city='SACRAMENTO', price='59222', sqft='836', street='3526 HIGH ST', zip='95838'),
 Row(baths='1', beds='3', city='SACRAMENTO', price='68212', sqft='1167', street='51 OMAHA CT', zip='95823'),
 Row(baths='1', beds='2', city='SACRAMENTO', price='68880', sqft='796', street='2796 BRANCH ST', zip='95815'),
 Row(baths='1', beds='2', city='SACRAMENTO', price='69307', sqft='852', street='2805 JANETTE WAY', zip='95815'),
 Row(baths='1', beds='2', city='SACRAMENTO', price='81900', sqft='797', street='6001 MCMAHON DR', zip='95824')]

In [15]:
df.show(5)


+-----+----+----------+-----+----+----------------+-----+
|baths|beds|      city|price|sqft|          street|  zip|
+-----+----+----------+-----+----+----------------+-----+
|    1|   2|SACRAMENTO|59222| 836|    3526 HIGH ST|95838|
|    1|   3|SACRAMENTO|68212|1167|     51 OMAHA CT|95823|
|    1|   2|SACRAMENTO|68880| 796|  2796 BRANCH ST|95815|
|    1|   2|SACRAMENTO|69307| 852|2805 JANETTE WAY|95815|
|    1|   2|SACRAMENTO|81900| 797| 6001 MCMAHON DR|95824|
+-----+----+----------+-----+----+----------------+-----+
only showing top 5 rows


In [16]:
df.toPandas().head()


Out[16]:
baths beds city price sqft street zip
0 1 2 SACRAMENTO 59222 836 3526 HIGH ST 95838
1 1 3 SACRAMENTO 68212 1167 51 OMAHA CT 95823
2 1 2 SACRAMENTO 68880 796 2796 BRANCH ST 95815
3 1 2 SACRAMENTO 69307 852 2805 JANETTE WAY 95815
4 1 2 SACRAMENTO 81900 797 6001 MCMAHON DR 95824

In [17]:
favorite_zip = df[df.zip == 95815]

In [18]:
favorite_zip.show(5)


+-----+----+----------+------+----+----------------+-----+
|baths|beds|      city| price|sqft|          street|  zip|
+-----+----+----------+------+----+----------------+-----+
|    1|   2|SACRAMENTO| 68880| 796|  2796 BRANCH ST|95815|
|    1|   2|SACRAMENTO| 69307| 852|2805 JANETTE WAY|95815|
|    1|   1|SACRAMENTO|106852| 871| 2930 LA ROSA RD|95815|
|    1|   2|SACRAMENTO| 78000| 800|    3132 CLAY ST|95815|
|    2|   4|SACRAMENTO| 89000|1316| 483 ARCADE BLVD|95815|
+-----+----+----------+------+----+----------------+-----+
only showing top 5 rows


In [19]:
df.select('city','beds').show(10)


+--------------+----+
|          city|beds|
+--------------+----+
|    SACRAMENTO|   2|
|    SACRAMENTO|   3|
|    SACRAMENTO|   2|
|    SACRAMENTO|   2|
|    SACRAMENTO|   2|
|    SACRAMENTO|   3|
|    SACRAMENTO|   3|
|    SACRAMENTO|   3|
|RANCHO CORDOVA|   2|
|     RIO LINDA|   3|
+--------------+----+
only showing top 10 rows


In [20]:
df.groupBy("beds").count().show()


+----+-----+
|beds|count|
+----+-----+
|   3|  413|
|   8|    1|
|   0|  108|
|   5|   59|
|   6|    3|
|   1|   10|
|   4|  258|
|   2|  133|
+----+-----+


In [21]:
df.describe(['baths', 'beds','price','sqft']).show()


+-------+------------------+------------------+------------------+------------------+
|summary|             baths|              beds|             price|              sqft|
+-------+------------------+------------------+------------------+------------------+
|  count|               985|               985|               985|               985|
|   mean|1.7766497461928934|2.9116751269035532|234144.26395939087|1314.9167512690356|
| stddev| 0.895371422318646|1.3079322320435811|  138365.839084928| 853.0482425034447|
|    min|                 0|                 0|            100000|                 0|
|    max|                 5|                 8|             99000|               998|
+-------+------------------+------------------+------------------+------------------+


In [22]:
import pyspark.mllib
import pyspark.mllib.regression
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import *

In [23]:
df = df.select('price','baths','beds','sqft')

In [24]:
df = df[df.baths > 0]
df = df[df.beds > 0]
df = df[df.sqft > 0]
df.describe(['baths','beds','price','sqft']).show()


+-------+------------------+------------------+------------------+------------------+
|summary|             baths|              beds|             price|              sqft|
+-------+------------------+------------------+------------------+------------------+
|  count|               814|               814|               814|               814|
|   mean|1.9606879606879606|3.2444717444717446| 229448.3697788698|1591.1461916461917|
| stddev|0.6698038253879438|0.8521372615281976|119825.57606009026| 663.8419297942894|
|    min|                 1|                 1|            100000|              1000|
|    max|                 5|                 8|             99000|               998|
+-------+------------------+------------------+------------------+------------------+


In [28]:
temp = df.rdd.map(lambda line:LabeledPoint(line[0],[line[1:]]))
temp.take(5)


Out[28]:
[LabeledPoint(59222.0, [1.0,2.0,836.0]),
 LabeledPoint(68212.0, [1.0,3.0,1167.0]),
 LabeledPoint(68880.0, [1.0,2.0,796.0]),
 LabeledPoint(69307.0, [1.0,2.0,852.0]),
 LabeledPoint(81900.0, [1.0,2.0,797.0])]

In [29]:
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler

In [30]:
features = df.rdd.map(lambda row: row[1:])
features.take(5)


Out[30]:
[('1', '2', '836'),
 ('1', '3', '1167'),
 ('1', '2', '796'),
 ('1', '2', '852'),
 ('1', '2', '797')]

In [31]:
standardizer = StandardScaler()
model = standardizer.fit(features)
features_transform = model.transform(features)

In [32]:
features_transform.take(5)


Out[32]:
[DenseVector([1.493, 2.347, 1.2593]),
 DenseVector([1.493, 3.5206, 1.7579]),
 DenseVector([1.493, 2.347, 1.1991]),
 DenseVector([1.493, 2.347, 1.2834]),
 DenseVector([1.493, 2.347, 1.2006])]

In [33]:
lab = df.rdd.map(lambda row: row[0])
lab.take(5)


Out[33]:
['59222', '68212', '68880', '69307', '81900']

In [34]:
transformedData = lab.zip(features_transform)
transformedData.take(5)


Out[34]:
[('59222', DenseVector([1.493, 2.347, 1.2593])),
 ('68212', DenseVector([1.493, 3.5206, 1.7579])),
 ('68880', DenseVector([1.493, 2.347, 1.1991])),
 ('69307', DenseVector([1.493, 2.347, 1.2834])),
 ('81900', DenseVector([1.493, 2.347, 1.2006]))]

In [35]:
transformedData = transformedData.map(lambda row: LabeledPoint(row[0],[row[1]]))
transformedData.take(5)


Out[35]:
[LabeledPoint(59222.0, [1.49297445326,2.34703972035,1.25933593899]),
 LabeledPoint(68212.0, [1.49297445326,3.52055958053,1.7579486134]),
 LabeledPoint(68880.0, [1.49297445326,2.34703972035,1.19908063091]),
 LabeledPoint(69307.0, [1.49297445326,2.34703972035,1.28343806223]),
 LabeledPoint(81900.0, [1.49297445326,2.34703972035,1.20058701361])]

In [43]:
trainingData, testingData = transformedData.randomSplit([.8,.2],seed=1234)

In [44]:
from pyspark.mllib.regression import LinearRegressionWithSGD
linearModel = LinearRegressionWithSGD.train(trainingData,1000,.2)


/spark/python/pyspark/mllib/regression.py:281: UserWarning: Deprecated in 2.0.0. Use ml.regression.LinearRegression.
  warnings.warn("Deprecated in 2.0.0. Use ml.regression.LinearRegression.")

In [45]:
linearModel.weights


Out[45]:
DenseVector([15911.6446, 4526.9663, 68332.1903])

In [46]:
testingData.take(10)


Out[46]:
[LabeledPoint(100309.0, [2.98594890652,3.52055958053,1.36930187625]),
 LabeledPoint(124100.0, [2.98594890652,3.52055958053,2.41171870613]),
 LabeledPoint(148750.0, [2.98594890652,4.69407944071,2.21739533756]),
 LabeledPoint(150000.0, [1.49297445326,1.17351986018,1.14485085363]),
 LabeledPoint(161500.0, [2.98594890652,4.69407944071,2.3906293483]),
 LabeledPoint(166357.0, [1.49297445326,4.69407944071,2.94497818269]),
 LabeledPoint(168000.0, [2.98594890652,3.52055958053,2.22492725107]),
 LabeledPoint(178480.0, [2.98594890652,3.52055958053,1.78506350204]),
 LabeledPoint(181872.0, [1.49297445326,3.52055958053,1.73535287287]),
 LabeledPoint(182587.0, [4.47892335978,4.69407944071,2.78831438167])]

In [47]:
linearModel.predict([1.49297445326,3.52055958053,1.73535287287])


Out[47]:
158273.59605366364

In [48]:
from pyspark.mllib.evaluation import RegressionMetrics

In [49]:
prediObserRDDin = trainingData.map(lambda row: (float(linearModel.predict(row.features[0])),row.label))

In [50]:
metrics = RegressionMetrics(prediObserRDDin)

In [51]:
metrics.r2


Out[51]:
0.466721671313607

In [52]:
prediObserRDDout = testingData.map(lambda row: (float(linearModel.predict(row.features[0])),row.label))
metrics = RegressionMetrics(prediObserRDDout)

In [53]:
metrics.rootMeanSquaredError


Out[53]:
85023.27994408192