map()
, mapPartitions()
, mapPartitionsWithIndex()
, filter()
, flatMap()
, reduceByKey()
, groupByKey()
#### The following actions will be covered:first()
, take()
, takeSample()
, takeOrdered()
, collect()
, count()
, countByValue()
, reduce()
, top()
#### Also covered:cache()
, unpersist()
, id()
, setName()
#### Note that, for reference, you can look up the details of these methods in Spark's Python API
In [ ]:
# This is a Python cell. You can run normal Python code here...
print 'The sum of 1 and 1 is {0}'.format(1+1)
In [ ]:
# Here is another Python cell, this time with a variable (x) declaration and an if statement:
x = 42
if x > 40:
print 'The sum of 1 and 2 is {0}'.format(1+2)
In [ ]:
# This cell relies on x being defined already.
# If we didn't run the cells from part (1a) this code would fail.
print x * 2
import
statement will import the specified module. In this tutorial and future labs, we will provide any imports that are necessary.
In [ ]:
# Import the regular expression library
import re
m = re.search('(?<=abc)def', 'abcdef')
m.group(0)
In [ ]:
# Import the datetime library
import datetime
print 'This was last run on: {0}'.format(datetime.datetime.now())
SparkContext
. When running Spark, you start a new Spark application by creating a SparkContext. When the SparkContext
is created, it asks the master for some cores to use to do work. The master sets these cores aside just for you; they won't be used for other applications. When using Databricks Cloud or the virtual machine provisioned for this class, the SparkContext
is created for you automatically as sc
.sc
) is the main entry point for Spark functionality. A Spark context can be used to create Resilient Distributed Datasets (RDDs) on a cluster.sc
to see its type.
In [ ]:
# Display the type of the Spark Context sc
type(sc)
SparkContext
attributessc
object.
In [ ]:
# List sc's attributes
dir(sc)
sc
object has.
In [ ]:
# Use help to obtain more detailed information
help(sc)
In [ ]:
# After reading the help we've decided we want to use sc.version to see what version of Spark we are running
sc.version
In [ ]:
# Help can be used on any Python object
help(map)
map
collect
to view resultscount
to view countsfilter
and view results with collect
xrange()
only generates values as they are needed. This is different from the behavior of range() which generates the complete list upon execution. Because of this xrange()
is more memory efficient than range()
, especially for large ranges.
In [ ]:
data = xrange(1, 10001)
In [ ]:
# Data is just a normal Python list
# Obtain data's first element
data[0]
In [ ]:
# We can check the size of the list using the len() function
len(data)
sc.parallelize()
, which tells Spark to create a new set of input data based on data that is passed in. In this example, we will provide an xrange
. The second argument to the sc.parallelize() method tells Spark how many partitions to break the data into when it stores the data in memory (we'll talk more about this later in this tutorial). Note that for better performance when using parallelize
, xrange()
is recommended if the input represents a range. This is the reason why we used xrange()
in 3a.pyspark.RDD
. Since the other RDD types inherit from pyspark.RDD
they have the same APIs and are functionally identical. We'll see that sc.parallelize()
generates a pyspark.rdd.PipelinedRDD
when its input is an xrange
, and a pyspark.RDD
when its input is a range
.
In [ ]:
# Parallelize data using 8 partitions
# This operation is a transformation of data into an RDD
# Spark uses lazy evaluation, so no Spark jobs are run at this point
xrangeRDD = sc.parallelize(data, 8)
In [ ]:
# Let's view help on parallelize
help(sc.parallelize)
In [ ]:
# Let's see what type sc.parallelize() returned
print 'type of xrangeRDD: {0}'.format(type(xrangeRDD))
# How about if we use a range
dataRange = range(1, 10001)
rangeRDD = sc.parallelize(dataRange, 8)
print 'type of dataRangeRDD: {0}'.format(type(rangeRDD))
In [ ]:
# Each RDD gets a unique ID
print 'xrangeRDD id: {0}'.format(xrangeRDD.id())
print 'rangeRDD id: {0}'.format(rangeRDD.id())
In [ ]:
# We can name each newly created RDD using the setName() method
xrangeRDD.setName('My first RDD')
In [ ]:
# Let's view the lineage (the set of transformations) of the RDD using toDebugString()
print xrangeRDD.toDebugString()
In [ ]:
# Let's use help to see what methods we can call on this RDD
help(xrangeRDD)
In [ ]:
# Let's see how many partitions the RDD will be split into by using the getNumPartitions()
xrangeRDD.getNumPartitions()
map
map(f)
, the most common Spark transformation, is one such example: it applies a function f
to each item in the dataset, and outputs the resulting dataset. When you run map()
on a dataset, a single stage of tasks is launched. A stage is a group of tasks that all perform the same computation, but on different input data. One task is launched for each partitition, as shown in the example below. A task is a unit of execution that runs on a single machine. When we run map(f)
within a partition, a new task applies f
to all of the entries in a particular partition, and outputs a new partition. In this example figure, the dataset is broken into four partitions, so four map()
tasks are launched.map()
transformation, each item in the parent RDD will map to one element in the new RDD. So, if the parent RDD has twenty elements, the new RDD will also have twenty items.map()
to subtract one from each value in the base RDD we just created. First, we define a Python function called sub()
that will subtract one from the input integer. Second, we will pass each item in the base RDD into a map()
transformation that applies the sub()
function to each element. And finally, we print out the RDD transformation hierarchy using toDebugString()
.
In [ ]:
# Create sub function to subtract 1
def sub(value):
""""Subtracts one from `value`.
Args:
value (int): A number.
Returns:
int: `value` minus one.
"""
return (value - 1)
# Transform xrangeRDD through map transformation using sub function
# Because map is a transformation and Spark uses lazy evaluation, no jobs, stages,
# or tasks will be launched when we run this code.
subRDD = xrangeRDD.map(sub)
# Let's see the RDD transformation hierarchy
print subRDD.toDebugString()
collect
to view results collect()
method on our RDD. collect()
is often used after a filter or other operation to ensure that we are only returning a small amount of data to the driver. This is done because the data returned to the driver must fit into the driver's available memory. If not, the driver will crash.collect()
method is the first action operation that we have encountered. Action operations cause Spark to perform the (lazy) transformation operations that are required to compute the RDD returned by the action. In our example, this means that tasks will now be launched to perform the parallelize
, map
, and collect
operations.collect()
tasks are launched. Each task collects the entries in its partition and sends the result to the SparkContext, which creates a list of the values, as shown in the figure below.collect()
on a small example dataset with just four partitions.collect()
on subRDD
.
In [ ]:
# Let's collect the data
print subRDD.collect()
count
to view counts count()
job which will count the number of elements in an RDD using the count()
action. Since map()
creates a new RDD with the same number of elements as the starting RDD, we expect that applying count()
to each RDD will return the same result.count()
is an action operation, if we had not already performed an action with collect()
, then Spark would now perform the transformation operations when we executed count()
.count()
on a small example dataset with just four partitions.
In [ ]:
print xrangeRDD.count()
print subRDD.count()
filter
and view results with collect
filter(f)
data-parallel operation. The filter(f)
method is a transformation operation that creates a new RDD from the input RDD by applying filter function f
to each item in the parent RDD and only passing those elements where the filter function returns True
. Elements that do not return True
will be dropped. Like map()
, filter can be applied individually to each entry in the dataset, so is easily parallelized using Spark.ten()
, which returns True
if the input is less than 10 and False
otherwise. This function will be passed to the filter()
transformation as the filter function f
.collect()
method to return a list that contains all of the elements in this filtered RDD to the driver program.
In [ ]:
# Define a function to filter a single value
def ten(value):
"""Return whether value is below ten.
Args:
value (int): A number.
Returns:
bool: Whether `value` is less than ten.
"""
if (value < 10):
return True
else:
return False
# The ten function could also be written concisely as: def ten(value): return value < 10
# Pass the function ten to the filter transformation
# Filter is a transformation so no tasks are run
filteredRDD = subRDD.filter(ten)
# View the results using collect()
# Collect is an action and triggers the filter transformation to run
print filteredRDD.collect()
lambda()
functions lambda
functions can be used wherever function objects are required. They are syntactically restricted to a single expression. Remember that lambda
functions are a matter of style and using them is never required - semantically, they are just syntactic sugar for a normal function definition. You can always define a separate normal function instead, but using a lambda()
function is an equivalent and more compact form of coding. Ideally you should consider using lambda
functions where you want to encapsulate non-reusable code without littering your code with one-line functions.filter()
transformation, we will use an inline lambda()
function.
In [ ]:
lambdaRDD = subRDD.filter(lambda x: x < 10)
lambdaRDD.collect()
In [ ]:
# Let's collect the even values less than 10
evenRDD = lambdaRDD.filter(lambda x: x % 2 == 0)
evenRDD.collect()
first()
, take()
, top()
, and takeOrdered()
actions. Note that for the first()
and take()
actions, the elements that are returned depend on how the RDD is partitioned.collect()
action, we can use the take(n)
action to return the first n elements of the RDD. The first()
action returns the first element of an RDD, and is equivalent to take(1)
.takeOrdered()
action returns the first n elements of the RDD, using either their natural order or a custom comparator. The key advantage of using takeOrdered()
instead of first()
or take()
is that takeOrdered()
returns a deterministic result, while the other two actions may return differing results, depending on the number of partions or execution environment. takeOrdered()
returns the list sorted in ascending order. The top()
action is similar to takeOrdered()
except that it returns the list in descending order.reduce()
action reduces the elements of a RDD to a single value by applying a function that takes two parameters and returns a single value. The function should be commutative and associative, as reduce()
is applied at the partition level and then again to aggregate results from partitions. If these rules don't hold, the results from reduce()
will be inconsistent. Reducing locally at partitions makes reduce()
very efficient.
In [ ]:
# Let's get the first element
print filteredRDD.first()
# The first 4
print filteredRDD.take(4)
# Note that it is ok to take more elements than the RDD has
print filteredRDD.take(12)
In [ ]:
# Retrieve the three smallest elements
print filteredRDD.takeOrdered(3)
# Retrieve the five largest elements
print filteredRDD.top(5)
In [ ]:
# Pass a lambda function to takeOrdered to reverse the order
filteredRDD.takeOrdered(4, lambda s: -s)
In [ ]:
# Obtain Python's add function
from operator import add
# Efficiently sum the RDD using reduce
print filteredRDD.reduce(add)
# Sum using reduce with a lambda function
print filteredRDD.reduce(lambda a, b: a + b)
# Note that subtraction is not both associative and commutative
print filteredRDD.reduce(lambda a, b: a - b)
print filteredRDD.repartition(4).reduce(lambda a, b: a - b)
# While addition is
print filteredRDD.repartition(4).reduce(lambda a, b: a + b)
takeSample()
action returns an array with a random sample of elements from the dataset. It takes in a withReplacement
argument, which specifies whether it is okay to randomly pick the same item multiple times from the parent RDD (so when withReplacement=True
, you can get the same item back multiple times). It also takes an optional seed
parameter that allows you to specify a seed value for the random number generator, so that reproducible results can be obtained.countByValue()
action returns the count of each unique value in the RDD as a dictionary that maps values to counts.
In [ ]:
# takeSample reusing elements
print filteredRDD.takeSample(withReplacement=True, num=6)
# takeSample without reuse
print filteredRDD.takeSample(withReplacement=False, num=6)
In [ ]:
# Set seed for predictability
print filteredRDD.takeSample(withReplacement=False, num=6, seed=500)
# Try reruning this cell and the cell above -- the results from this cell will remain constant
# Use ctrl-enter to run without moving to the next cell
In [ ]:
# Create new base RDD to show countByValue
repetitiveRDD = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 1, 2, 3, 3, 3, 4, 5, 4, 6])
print repetitiveRDD.countByValue()
flatMap
map()
transformation using a function, sometimes the function will return more (or less) than one element. We would like the newly created RDD to consist of the elements outputted by the function. Simply applying a map()
transformation would yield a new RDD made up of iterators. Each iterator could have zero or more elements. Instead, we often want an RDD consisting of the values contained in those iterators. The solution is to use a flatMap() transformation, flatMap()
is similar to map()
, except that with flatMap()
each input item can be mapped to zero or more output elements.flatMap()
, we will first emit a word along with its plural, and then a range that grows in length with each subsequent operation.
In [ ]:
# Let's create a new base RDD to work from
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Use map
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
# Use flatMap
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))
# View the results
print singularAndPluralWordsRDDMap.collect()
print singularAndPluralWordsRDD.collect()
# View the number of elements in the RDD
print singularAndPluralWordsRDDMap.count()
print singularAndPluralWordsRDD.count()
In [ ]:
simpleRDD = sc.parallelize([2, 3, 4])
print simpleRDD.map(lambda x: range(1, x)).collect()
print simpleRDD.flatMap(lambda x: range(1, x)).collect()
groupByKey
and reduceByKey
sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
would create a pair RDD where the keys are 'a', 'a', 'b' and the values are 1, 2, 1.reduceByKey()
transformation gathers together pairs that have the same key and applies a function to two associated values at a time. reduceByKey()
operates by applying the function first within each partition on a per-key basis and then across the partitions.groupByKey()
and reduceByKey()
transformations can often be used to solve the same problem and will produce the same answer, the reduceByKey()
transformation works much better for large distributed datasets. This is because Spark knows it can combine output with a common key on each partition before shuffling (redistributing) the data across nodes. Only use groupByKey()
if the operation would not benefit from reducing the data before the shuffle occurs.reduceByKey
works. Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.groupByKey()
transformation - all the key-value pairs are shuffled around, causing a lot of unnecessary data to being transferred over the network.reduceByKey()
and groupByKey()
transformations, becomes increasingly exaggerated.groupByKey()
:groupByKey()
and reduceByKey()
example.
In [ ]:
pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
# mapValues only used to improve format for printing
print pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()
# Different ways to sum by key
print pairRDD.groupByKey().map(lambda (k, v): (k, sum(v))).collect()
# Using mapValues, which is recommended when they key doesn't change
print pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect()
# reduceByKey is more efficient / scalable
print pairRDD.reduceByKey(add).collect()
mapPartitions()
transformation uses a function that takes in an iterator (to the items in that specific partition) and returns an iterator. The function is applied on a partition by partition basis.mapPartitionsWithIndex()
transformation uses a function that takes in a partition index (think of this like the partition number) and an iterator (to the items in that specific partition). For every partition (index, iterator) pair, the function returns a tuple of the same partition index number and an iterator of the transformed items in that partition.
In [ ]:
# mapPartitions takes a function that takes an iterator and returns an iterator
print wordsRDD.collect()
itemsRDD = wordsRDD.mapPartitions(lambda iterator: [','.join(iterator)])
print itemsRDD.collect()
In [ ]:
itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: [(index, list(iterator))])
# We can see that three of the (partitions) workers have one element and the fourth worker has two
# elements, although things may not bode well for the rat...
print itemsByPartRDD.collect()
# Rerun without returning a list (acts more like flatMap)
itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: (index, list(iterator)))
print itemsByPartRDD.collect()
cache()
operation to keep the RDD in memory. However, if you cache too many RDDs and Spark runs out of memory, it will delete the least recently used (LRU) RDD first. Again, the RDD will be automatically recreated when accessed.is_cached
attribute, and you can see your cached RDD in the "Storage" section of the Spark web UI. If you click on the RDD's name, you can see more information about where the RDD is stored.
In [ ]:
# Name the RDD
filteredRDD.setName('My Filtered RDD')
# Cache the RDD
filteredRDD.cache()
# Is it cached
print filteredRDD.is_cached
unpersist()
method to inform Spark that you no longer need the RDD in memory.toDebugString()
method, which will provide storage information, and you can directly query the current storage information for an RDD using the getStorageLevel()
operation.persist()
operation, optionally, takes a pySpark StorageLevel object.
In [ ]:
# Note that toDebugString also provides storage information
print filteredRDD.toDebugString()
In [ ]:
# If we are done with the RDD we can unpersist it so that its memory can be reclaimed
filteredRDD.unpersist()
# Storage level for a non cached RDD
print filteredRDD.getStorageLevel()
filteredRDD.cache()
# Storage level for a cached RDD
print filteredRDD.getStorageLevel()
filter()
operation using the broken filtering function. No error will occur at this point due to Spark's use of lazy evaluation.filter()
method will not be executed until an action operation is invoked on the RDD. We will perform an action by using the collect()
method to return a list that contains all of the elements in this RDD.
In [ ]:
def brokenTen(value):
"""Incorrect implementation of the ten function.
Note:
The `if` statement checks an undefined variable `val` instead of `value`.
Args:
value (int): A number.
Returns:
bool: Whether `value` is less than ten.
Raises:
NameError: The function references `val`, which is not available in the local or global
namespace, so a `NameError` is raised.
"""
if (val < 10):
return True
else:
return False
brokenRDD = subRDD.filter(brokenTen)
In [ ]:
# Now we'll see the error
brokenRDD.collect()
filter()
method is executed, Spark evaluates the RDD by executing the parallelize()
and filter()
methods. Since our filter()
method has an error in the filtering function brokenTen()
, an error occurs.collect()
method line. There is nothing wrong with this line. However, it is an action and that caused other methods to be executed. Continue scrolling through the Traceback and you will see the following error line:NameError: global name 'val' is not defined
brokenTen()
.RDD.transformation1()
RDD.action1()
RDD.transformation2()
RDD.action2()
RDD.transformation1().transformation2().action()
lambda()
functions instead of separately defined functions when their use improves readability and conciseness.
In [ ]:
# Cleaner code through lambda use
subRDD.filter(lambda x: x < 10).collect()
In [ ]:
# Even better by moving our chain of operators into a single line.
sc.parallelize(data).map(lambda y: y - 1).filter(lambda x: x < 10).collect()
In [ ]:
# Final version
(sc
.parallelize(data)
.map(lambda y: y - 1)
.filter(lambda x: x < 10)
.collect())