In [1]:
# create entry points to spark
try:
sc.stop()
except:
pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)
These functions are probably the most commonly used functions when dealing with an RDD object.
map()
mapValues()
flatMap()
flatMapValues()
map
The map()
method applies a function to each elements of the RDD. Each element has to be a valid input to the function. The returned RDD has the function outputs as its new elements.
Elements in the RDD object map_exp_rdd
below are rows of the mtcars in string format. We are going to apply the map()
function multiple times to convert each string elements as a list elements. Each list element has two values: the first value will be the auto model in string format; the second value will be a list of numeric values.
In [2]:
# create an example RDD
map_exp_rdd = sc.textFile('../../data/mtcars.csv')
map_exp_rdd.take(4)
Out[2]:
In [3]:
# split auto model from other feature values
map_exp_rdd_1 = map_exp_rdd.map(lambda x: x.split(',')).map(lambda x: (x[0], x[1:]))
map_exp_rdd_1.take(4)
Out[3]:
In [4]:
# remove the header row
header = map_exp_rdd_1.first()
# the filter method apply a function to each elemnts. The function output is a boolean value (TRUE or FALSE)
# elements that have output TRUE will be kept.
map_exp_rdd_2 = map_exp_rdd_1.filter(lambda x: x != header)
map_exp_rdd_2.take(4)
Out[4]:
In [5]:
# convert string values to numeric values
map_exp_rdd_3 = map_exp_rdd_2.map(lambda x: (x[0], list(map(float, x[1]))))
map_exp_rdd_3.take(4)
Out[5]:
mapValues
The mapValues
function requires that each element in the RDD has a key/value pair structure, for example, a tuple of 2 items, or a list of 2 items. The mapValues
function applies a function to each of the element values. The element key will remain unchanged.
We can apply the mapValues
function to the RDD object mapValues_exp_rdd
below.
In [6]:
mapValues_exp_rdd = map_exp_rdd_3
mapValues_exp_rdd.take(4)
Out[6]:
In [7]:
import numpy as np
mapValues_exp_rdd_1 = mapValues_exp_rdd.mapValues(lambda x: np.mean(x))
mapValues_exp_rdd_1.take(4)
Out[7]:
When using mapValues(), the x in the above lambda function refers to the element value, not including the element key.
In [8]:
x = [('a', 'b', 'c'), ('a', 'a'), ('c', 'c', 'c', 'd')]
flatMap_exp_rdd = sc.parallelize(x)
flatMap_exp_rdd.collect()
Out[8]:
In [9]:
flatMap_exp_rdd_1 = flatMap_exp_rdd.flatMap(lambda x: x)
flatMap_exp_rdd_1.collect()
Out[9]:
flatMapValues
The flatMapValues
function requires that each element in the RDD has a key/value pair structure. It applies a function to each element value of the RDD object and then flatten the results.
For example, my raw data looks like below. But I would like to transform the data so that it has three columns: the first column is the sample id; the second the column is the three types (A,B or C); the third column is the values.
sample id | A | B | C |
---|---|---|---|
1 | 23 | 18 | 32 |
2 | 18 | 29 | 31 |
3 | 34 | 21 | 18 |
In [10]:
# example data
my_data = [
[1, (23, 28, 32)],
[2, (18, 29, 31)],
[3, (34, 21, 18)]
]
flatMapValues_exp_rdd = sc.parallelize(my_data)
flatMapValues_exp_rdd.collect()
Out[10]:
In [11]:
# merge A,B,and C columns into on column and add the type column
flatMapValues_exp_rdd_1 = flatMapValues_exp_rdd.flatMapValues(lambda x: list(zip(list('ABC'), x)))
flatMapValues_exp_rdd_1.collect()
Out[11]:
In [12]:
# unpack the element values
flatMapValues_exp_rdd_2 = flatMapValues_exp_rdd_1.map(lambda x: [x[0]] + list(x[1]) )
flatMapValues_exp_rdd_2.collect()
Out[12]:
In [ ]: