In [1]:
# create entry points to spark
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

Map functions

These functions are probably the most commonly used functions when dealing with an RDD object.

  • map()
  • mapValues()
  • flatMap()
  • flatMapValues()

map

The map() method applies a function to each elements of the RDD. Each element has to be a valid input to the function. The returned RDD has the function outputs as its new elements.

Elements in the RDD object map_exp_rdd below are rows of the mtcars in string format. We are going to apply the map() function multiple times to convert each string elements as a list elements. Each list element has two values: the first value will be the auto model in string format; the second value will be a list of numeric values.


In [2]:
# create an example RDD
map_exp_rdd = sc.textFile('../../data/mtcars.csv')
map_exp_rdd.take(4)


Out[2]:
[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1']

In [3]:
# split auto model from other feature values
map_exp_rdd_1 = map_exp_rdd.map(lambda x: x.split(',')).map(lambda x: (x[0], x[1:]))
map_exp_rdd_1.take(4)


Out[3]:
[('',
  ['mpg',
   'cyl',
   'disp',
   'hp',
   'drat',
   'wt',
   'qsec',
   'vs',
   'am',
   'gear',
   'carb']),
 ('Mazda RX4',
  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),
 ('Mazda RX4 Wag',
  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']),
 ('Datsun 710',
  ['22.8', '4', '108', '93', '3.85', '2.32', '18.61', '1', '1', '4', '1'])]

In [4]:
# remove the header row
header = map_exp_rdd_1.first()
# the filter method apply a function to each elemnts. The function output is a boolean value (TRUE or FALSE)
# elements that have output TRUE will be kept.
map_exp_rdd_2 = map_exp_rdd_1.filter(lambda x: x != header)
map_exp_rdd_2.take(4)


Out[4]:
[('Mazda RX4',
  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),
 ('Mazda RX4 Wag',
  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']),
 ('Datsun 710',
  ['22.8', '4', '108', '93', '3.85', '2.32', '18.61', '1', '1', '4', '1']),
 ('Hornet 4 Drive',
  ['21.4', '6', '258', '110', '3.08', '3.215', '19.44', '1', '0', '3', '1'])]

In [5]:
# convert string values to numeric values
map_exp_rdd_3 = map_exp_rdd_2.map(lambda x: (x[0], list(map(float, x[1]))))
map_exp_rdd_3.take(4)


Out[5]:
[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 ('Mazda RX4 Wag',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),
 ('Datsun 710',
  [22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),
 ('Hornet 4 Drive',
  [21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0])]

mapValues

The mapValues function requires that each element in the RDD has a key/value pair structure, for example, a tuple of 2 items, or a list of 2 items. The mapValues function applies a function to each of the element values. The element key will remain unchanged.

We can apply the mapValues function to the RDD object mapValues_exp_rdd below.


In [6]:
mapValues_exp_rdd = map_exp_rdd_3
mapValues_exp_rdd.take(4)


Out[6]:
[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 ('Mazda RX4 Wag',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),
 ('Datsun 710',
  [22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),
 ('Hornet 4 Drive',
  [21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0])]

In [7]:
import numpy as np
mapValues_exp_rdd_1 = mapValues_exp_rdd.mapValues(lambda x: np.mean(x))
mapValues_exp_rdd_1.take(4)


Out[7]:
[('Mazda RX4', 29.90727272727273),
 ('Mazda RX4 Wag', 29.98136363636364),
 ('Datsun 710', 23.59818181818182),
 ('Hornet 4 Drive', 38.73954545454546)]

When using mapValues(), the x in the above lambda function refers to the element value, not including the element key.

flatMap

This function first applies a function to each elements of an RDD and then flatten the results. We can simply use this function to flatten elements of an RDD without extra operation on each elements.


In [8]:
x = [('a', 'b', 'c'), ('a', 'a'), ('c', 'c', 'c', 'd')]
flatMap_exp_rdd = sc.parallelize(x)
flatMap_exp_rdd.collect()


Out[8]:
[('a', 'b', 'c'), ('a', 'a'), ('c', 'c', 'c', 'd')]

In [9]:
flatMap_exp_rdd_1 = flatMap_exp_rdd.flatMap(lambda x: x)
flatMap_exp_rdd_1.collect()


Out[9]:
['a', 'b', 'c', 'a', 'a', 'c', 'c', 'c', 'd']

flatMapValues

The flatMapValues function requires that each element in the RDD has a key/value pair structure. It applies a function to each element value of the RDD object and then flatten the results.

For example, my raw data looks like below. But I would like to transform the data so that it has three columns: the first column is the sample id; the second the column is the three types (A,B or C); the third column is the values.

sample id A B C
1 23 18 32
2 18 29 31
3 34 21 18

In [10]:
# example data
my_data = [
    [1, (23, 28, 32)],
    [2, (18, 29, 31)],
    [3, (34, 21, 18)]
]
flatMapValues_exp_rdd = sc.parallelize(my_data)
flatMapValues_exp_rdd.collect()


Out[10]:
[[1, (23, 28, 32)], [2, (18, 29, 31)], [3, (34, 21, 18)]]

In [11]:
# merge A,B,and C columns into on column and add the type column
flatMapValues_exp_rdd_1 = flatMapValues_exp_rdd.flatMapValues(lambda x: list(zip(list('ABC'), x)))
flatMapValues_exp_rdd_1.collect()


Out[11]:
[(1, ('A', 23)),
 (1, ('B', 28)),
 (1, ('C', 32)),
 (2, ('A', 18)),
 (2, ('B', 29)),
 (2, ('C', 31)),
 (3, ('A', 34)),
 (3, ('B', 21)),
 (3, ('C', 18))]

In [12]:
# unpack the element values
flatMapValues_exp_rdd_2 = flatMapValues_exp_rdd_1.map(lambda x: [x[0]] + list(x[1]) )
flatMapValues_exp_rdd_2.collect()


Out[12]:
[[1, 'A', 23],
 [1, 'B', 28],
 [1, 'C', 32],
 [2, 'A', 18],
 [2, 'B', 29],
 [2, 'C', 31],
 [3, 'A', 34],
 [3, 'B', 21],
 [3, 'C', 18]]

In [ ]: