In [ ]:
# http://www.techpoweredmath.com/spark-dataframes-mllib-tutorial/
In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
In [2]:
from pyspark.sql import Row
In [3]:
sc = SparkContext(appName="real_estate_example")
rdd = sc.textFile('realEstate.csv')
In [4]:
rdd.take(5)
Out[4]:
In [5]:
rdd = rdd.map(lambda line: line.split(","))
In [6]:
rdd.take(2)
Out[6]:
In [7]:
header = rdd.first()
rdd = rdd.filter(lambda line:line != header)
In [8]:
rdd.take(2)
Out[8]:
In [9]:
newrdd = rdd.map(lambda line: Row(street = line[0], city = line[1], zip=line[2], beds=line[4], baths=line[5], sqft=line[6], price=line[9]))
In [10]:
hasattr(newrdd, "toDF")
Out[10]:
In [12]:
spark = SparkSession(sc)
hasattr(newrdd, "toDF")
Out[12]:
In [13]:
df = newrdd.toDF()
In [14]:
df.take(5)
Out[14]:
In [15]:
df.show(5)
In [16]:
df.toPandas().head()
Out[16]:
In [17]:
favorite_zip = df[df.zip == 95815]
In [18]:
favorite_zip.show(5)
In [19]:
df.select('city','beds').show(10)
In [20]:
df.groupBy("beds").count().show()
In [21]:
df.describe(['baths', 'beds','price','sqft']).show()
In [22]:
import pyspark.mllib
import pyspark.mllib.regression
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import *
In [23]:
df = df.select('price','baths','beds','sqft')
In [24]:
df = df[df.baths > 0]
df = df[df.beds > 0]
df = df[df.sqft > 0]
df.describe(['baths','beds','price','sqft']).show()
In [28]:
temp = df.rdd.map(lambda line:LabeledPoint(line[0],[line[1:]]))
temp.take(5)
Out[28]:
In [29]:
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler
In [30]:
features = df.rdd.map(lambda row: row[1:])
features.take(5)
Out[30]:
In [31]:
standardizer = StandardScaler()
model = standardizer.fit(features)
features_transform = model.transform(features)
In [32]:
features_transform.take(5)
Out[32]:
In [33]:
lab = df.rdd.map(lambda row: row[0])
lab.take(5)
Out[33]:
In [34]:
transformedData = lab.zip(features_transform)
transformedData.take(5)
Out[34]:
In [35]:
transformedData = transformedData.map(lambda row: LabeledPoint(row[0],[row[1]]))
transformedData.take(5)
Out[35]:
In [43]:
trainingData, testingData = transformedData.randomSplit([.8,.2],seed=1234)
In [44]:
from pyspark.mllib.regression import LinearRegressionWithSGD
linearModel = LinearRegressionWithSGD.train(trainingData,1000,.2)
In [45]:
linearModel.weights
Out[45]:
In [46]:
testingData.take(10)
Out[46]:
In [47]:
linearModel.predict([1.49297445326,3.52055958053,1.73535287287])
Out[47]:
In [48]:
from pyspark.mllib.evaluation import RegressionMetrics
In [49]:
prediObserRDDin = trainingData.map(lambda row: (float(linearModel.predict(row.features[0])),row.label))
In [50]:
metrics = RegressionMetrics(prediObserRDDin)
In [51]:
metrics.r2
Out[51]:
In [52]:
prediObserRDDout = testingData.map(lambda row: (float(linearModel.predict(row.features[0])),row.label))
metrics = RegressionMetrics(prediObserRDDout)
In [53]:
metrics.rootMeanSquaredError
Out[53]: