In [1]:
# create entry points to spark
try:
sc.stop()
except:
pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)
Two aggregate functions:
aggregate()aggregateByKey()aggregate(zeroValue, seqOp, combOp)The code below calculates the total sum of squares for mpg and disp in data set mtcars.
Step 1: get some data.
In [2]:
mtcars_df = spark.read.csv('../../data/mtcars.csv', inferSchema=True, header=True).select(['mpg', 'disp'])
mtcars_df.take(5)
Out[2]:
Step 2: calculate averages of mgp and disp
In [3]:
mpg_mean = mtcars_df.select('mpg').rdd.map(lambda x: x[0]).mean()
disp_mean = mtcars_df.select('disp').rdd.map(lambda x: x[0]).mean()
print('mpg mean = ', mpg_mean, '; '
'disp mean = ', disp_mean)
Step 3: build zeroValue, seqOp and combOp
We are calculating two TSS. We create a tuple to store two values.
In [4]:
zeroValue = (0, 0)
The z below refers to zeroValue. Its values get updated after every run. The x refers to an element in an RDD partition. In this case, both z and x have two values.
In [5]:
seqOp = lambda z, x: (z[0] + (x[0] - mpg_mean)**2, z[1] + (x[1] - disp_mean)**2)
The combOp function simply aggrate all zeroValues into one.
In [6]:
combOp = lambda px, py: ( px[0] + py[0], px[1] + py[1] )
Implement aggregate() function.
mtcars_df.rdd.aggregate(zeroValue, seqOp, combOp)
In [2]:
iris_rdd = sc.textFile('../../data/iris.csv', use_unicode=True)
iris_rdd.take(2)
Out[2]:
In [3]:
iris_rdd_2 = iris_rdd.map(lambda x: x.split(',')).\
filter(lambda x: x[0] != 'sepal_length').\
map(lambda x: (x[-1], [*map(float, x[:-1])]))
iris_rdd_2.take(5)
Out[3]:
In [4]:
zero_value = (0, 0)
seqOp = (lambda x, y: (x[0] + (y[0])**2, x[1] + (y[1])**2))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
In [5]:
iris_rdd_2.aggregateByKey(zero_value, seqOp, combOp).collect()
Out[5]:
In [ ]: