In [1]:
# create entry points to spark
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

Aggregate functions

Two aggregate functions:

  • aggregate()
  • aggregateByKey()

aggregate(zeroValue, seqOp, combOp)

  • zeroValue is like a data container. Its structure should match with the data structure of the returned values from the seqOp function.
  • seqOp is a function that takes two arguments: the first argument is the zeroValue and the second argument is an element from the RDD. The zeroValue gets updated with the returned value after every run.
  • combOp is a function that takes two arguments: the first argument is the final zeroValue from one partition and the other is another final zeroValue from another partition.

The code below calculates the total sum of squares for mpg and disp in data set mtcars.

Step 1: get some data.


In [2]:
mtcars_df = spark.read.csv('../../data/mtcars.csv', inferSchema=True, header=True).select(['mpg', 'disp'])
mtcars_df.take(5)


Out[2]:
[Row(mpg=21.0, disp=160.0),
 Row(mpg=21.0, disp=160.0),
 Row(mpg=22.8, disp=108.0),
 Row(mpg=21.4, disp=258.0),
 Row(mpg=18.7, disp=360.0)]

Step 2: calculate averages of mgp and disp


In [3]:
mpg_mean = mtcars_df.select('mpg').rdd.map(lambda x: x[0]).mean()
disp_mean = mtcars_df.select('disp').rdd.map(lambda x: x[0]).mean()
print('mpg mean = ', mpg_mean, '; ' 
      'disp mean = ', disp_mean)


mpg mean =  20.090625000000003 ; disp mean =  230.721875

Step 3: build zeroValue, seqOp and combOp

We are calculating two TSS. We create a tuple to store two values.


In [4]:
zeroValue = (0, 0)

The z below refers to zeroValue. Its values get updated after every run. The x refers to an element in an RDD partition. In this case, both z and x have two values.


In [5]:
seqOp = lambda z, x: (z[0] + (x[0] - mpg_mean)**2, z[1] + (x[1] - disp_mean)**2)

The combOp function simply aggrate all zeroValues into one.


In [6]:
combOp = lambda px, py: ( px[0] + py[0], px[1] + py[1] )

Implement aggregate() function.

mtcars_df.rdd.aggregate(zeroValue, seqOp, combOp)

aggregateByKey(zeroValue, seqOp, combOp)

This function does similar things as aggregate(). The aggregate() aggregate all results to the very end, but aggregateByKey() merge results by key.

Import data


In [2]:
iris_rdd = sc.textFile('../../data/iris.csv', use_unicode=True)
iris_rdd.take(2)


Out[2]:
['sepal_length,sepal_width,petal_length,petal_width,species',
 '5.1,3.5,1.4,0.2,setosa']

Transform data to a tuple RDD


In [3]:
iris_rdd_2 = iris_rdd.map(lambda x: x.split(',')).\
    filter(lambda x: x[0] != 'sepal_length').\
    map(lambda x: (x[-1], [*map(float, x[:-1])]))
iris_rdd_2.take(5)


Out[3]:
[('setosa', [5.1, 3.5, 1.4, 0.2]),
 ('setosa', [4.9, 3.0, 1.4, 0.2]),
 ('setosa', [4.7, 3.2, 1.3, 0.2]),
 ('setosa', [4.6, 3.1, 1.5, 0.2]),
 ('setosa', [5.0, 3.6, 1.4, 0.2])]

Define initial values, seqOp and combOp


In [4]:
zero_value = (0, 0)
seqOp = (lambda x, y: (x[0] + (y[0])**2, x[1] + (y[1])**2))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

Implement aggregateByKey()


In [5]:
iris_rdd_2.aggregateByKey(zero_value, seqOp, combOp).collect()


Out[5]:
[('setosa', (1259.0899999999997, 591.2500000000002)),
 ('versicolor', (1774.8600000000001, 388.47)),
 ('virginica', (2189.9000000000005, 447.33))]

In [ ]: