In this lecture we will begin to delve deeper into using Spark and Python. Please view the video lecture for a full explanation.
Let's quickly go over some important terms:
| Term | Definition |
|---|---|
| RDD | Resilient Distributed Dataset |
| Transformation | Spark operation that produces an RDD |
| Action | Spark operation that produces a local object |
| Spark Job | Sequence of transformations on data with a final action |
We can use transformations to create a set of instructions we want to preform on the RDD (before we call an action and actually execute them).
| Transformation Example | Result |
|---|---|
filter(lambda x: x % 2 == 0) |
Discard non-even elements |
map(lambda x: x * 2) |
Multiply each RDD element by 2 |
map(lambda x: x.split()) |
Split each string into words |
flatMap(lambda x: x.split()) |
Split each string into words and flatten sequence |
sample(withReplacement=True,0.25) |
Create sample of 25% of elements with replacement |
union(rdd) |
Append rdd to existing RDD |
distinct() |
Remove duplicates in RDD |
sortBy(lambda x: x, ascending=False) |
Sort elements in descending order |
Once you have your 'recipe' of transformations ready, what you will do next is execute them by calling an action. Here are some common actions:
| Action | Result |
|---|---|
collect() |
Convert RDD to in-memory list |
take(3) |
First 3 elements of RDD |
top(3) |
Top 3 elements of RDD |
takeSample(withReplacement=True,3) |
Create sample of 3 elements with replacement |
sum() |
Find element sum (assumes numeric elements) |
mean() |
Find element mean (assumes numeric elements) |
stdev() |
Find element deviation (assumes numeric elements) |
In [1]:
%%writefile example2.txt
first
second line
the third line
then a fourth line
Now let's perform some transformations and actions on this text file:
In [2]:
from pyspark import SparkContext
In [3]:
sc = SparkContext()
In [4]:
# Show RDD
sc.textFile('example2.txt')
Out[4]:
In [5]:
# Save a reference to this RDD
text_rdd = sc.textFile('example2.txt')
In [7]:
# Map a function (or lambda expression) to each line
# Then collect the results.
text_rdd.map(lambda line: line.split()).collect()
Out[7]:
In [8]:
# Collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()
Out[8]:
In [9]:
%%writefile services.txt
#EventId Timestamp Customer State ServiceID Amount
201 10/13/2017 100 NY 131 100.00
204 10/18/2017 700 TX 129 450.00
202 10/15/2017 203 CA 121 200.00
206 10/19/2017 202 CA 131 500.00
203 10/17/2017 101 NY 173 750.00
205 10/19/2017 202 TX 121 200.00
In [10]:
services = sc.textFile('services.txt')
In [11]:
services.take(2)
Out[11]:
In [12]:
services.map(lambda x: x.split())
Out[12]:
In [13]:
services.map(lambda x: x.split()).take(3)
Out[13]:
Let's remove that first hash-tag!
In [26]:
services.map(lambda x: x[1:] if x[0]=='#' else x).collect()
Out[26]:
In [27]:
services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split()).collect()
Out[27]:
In [28]:
# From Previous
cleanServ = services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split())
In [29]:
cleanServ.collect()
Out[29]:
In [52]:
# Let's start by practicing grabbing fields
cleanServ.map(lambda lst: (lst[3],lst[-1])).collect()
Out[52]:
In [43]:
# Continue with reduceByKey
# Notice how it assumes that the first item is the key!
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : amt1+amt2)\
.collect()
Out[43]:
Uh oh! Looks like we forgot that the amounts are still strings! Let's fix that:
In [42]:
# Continue with reduceByKey
# Notice how it assumes that the first item is the key!
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.collect()
Out[42]:
We can continue our analysis by sorting this output:
In [69]:
# Grab state and amounts
# Add them
# Get rid of ('State','Amount')
# Sort them by the amount value
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.filter(lambda x: not x[0]=='State')\
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
.collect()
Out[69]:
Remember to try to use unpacking for readability. For example:
In [78]:
x = ['ID','State','Amount']
In [79]:
def func1(lst):
return lst[-1]
In [83]:
def func2(id_st_amt):
# Unpack Values
(Id,st,amt) = id_st_amt
return amt
In [84]:
func1(x)
Out[84]:
In [85]:
func2(x)
Out[85]: