In [ ]:
%%javascript
$.getScript('http://asimjalis.github.io/ipyn-ext/js/ipyn-present.js')
What is Spark?
Spark is a framework for distributed processing.
It is a streamlined alternative to Map-Reduce.
Spark applications can be written in Python, Scala, or Java.
Why learn Spark?
Spark enables you to analyze petabytes of data.
Spark skills are in high demand--http://indeed.com/salary.
Spark is signficantly faster than MapReduce.
Paradoxically, Spark's API is simpler than the MapReduce API.
By the end of this lecture, you will be able to:
Create RDDs to distribute data across a cluster
Use the Spark shell to compose and execute Spark commands
Use Spark to analyze stock market data
Date | Version | Changes |
---|---|---|
May 30, 2014 | Spark 1.0.0 | APIs stabilized |
September 11, 2014 | Spark 1.1.0 | New functions in MLlib, Spark SQL |
December 18, 2014 | Spark 1.2.0 | Python Streaming API and better streaming fault tolerance |
March 13, 2015 | Spark 1.3.0 | DataFrame API, Kafka integration in Streaming |
April 17, 2015 | Spark 1.3.1 | Bug fixes, minor changes |
What is the basic idea of Spark?
Spark takes the Map-Reduce paradigm and changes it in some critical ways.
Instead of writing single Map-Reduce jobs a Spark job consists of a series of map and reduce functions.
However, the intermediate data is kept in memory instead of being written to disk or written to HDFS.
Q: How can I make Spark logging less verbose?
By default Spark logs messages at the INFO
level.
Here are the steps to make it only print out warnings and errors.
cd $SPARK_HOME/conf
cp log4j.properties.template log4j.properties
log4j.properties
and replace rootCategory=INFO
with rootCategory=ERROR
Term | Meaning |
---|---|
Driver | Process that contains the Spark Context |
Executor | Process that executes one or more Spark tasks |
Master | Process which manages applications across the cluster |
E.g. Spark Master | |
Worker | Process which manages executors on a particular worker node |
E.g. Spark Worker |
Q: Flip a coin 100 times using Python's random()
function. What
fraction of the time do you get heads?
In [ ]:
from pyspark import SparkContext
sc = SparkContext()
In [ ]:
import random
flips = 1000000
heads = sc.parallelize(xrange(flips)) \
.map(lambda i: random.random()) \
.filter(lambda r: r < 0.51) \
.count()
ratio = float(heads)/float(flips)
print(heads)
print(ratio)
sc.parallelize
creates an RDD.
map
and filter
are transformations.
They create new RDDs from existing RDDs.
count
is an action and brings the data from the RDDs back to the
driver.
Term | Meaning |
---|---|
RDD | Resilient Distributed Dataset or a distributed sequence of records |
Spark Job | Sequence of transformations on data with a final action |
Spark Application | Sequence of Spark jobs and other code |
Transformation | Spark operation that produces an RDD |
Action | Spark operation that produces a local object |
A Spark job consists of a series of transformations followed by an action.
It pushes the data to the cluster, all computation happens on the executors, then the result is sent back to the driver.
Instead of lambda
you can pass in fully defined functions into
map
, filter
, and other RDD transformations.
Use lambda
for short functions.
Use def
for more substantial functions.
Q: Find all the primes less than 100.
In [ ]:
def is_prime(number):
factor_min = 2
factor_max = int(number**0.5)+1
for factor in xrange(factor_min,factor_max):
if number % factor == 0:
return False
return True
In [ ]:
numbers = xrange(2,100)
primes = sc.parallelize(numbers)\
.filter(is_prime)\
.collect()
print primes
Expression | Meaning |
---|---|
sc.parallelize(list1) |
Create RDD of elements of list |
sc.textFile(path) |
Create RDD of lines from file |
Expression | Meaning |
---|---|
filter(lambda x: x % 2 == 0) |
Discard non-even elements |
map(lambda x: x * 2) |
Multiply each RDD element by 2 |
map(lambda x: x.split()) |
Split each string into words |
flatMap(lambda x: x.split()) |
Split each string into words and flatten sequence |
sample(withReplacement=True,0.25) |
Create sample of 25% of elements with replacement |
union(rdd) |
Append rdd to existing RDD |
distinct() |
Remove duplicates in RDD |
sortBy(lambda x: x, ascending=False) |
Sort elements in descending order |
Expression | Meaning |
---|---|
collect() |
Convert RDD to in-memory list |
take(3) |
First 3 elements of RDD |
top(3) |
Top 3 elements of RDD |
takeSample(withReplacement=True,3) |
Create sample of 3 elements with replacement |
sum() |
Find element sum (assumes numeric elements) |
mean() |
Find element mean (assumes numeric elements) |
stdev() |
Find element deviation (assumes numeric elements) |
Q: What will this output?
In [ ]:
sc.parallelize([1,3,2,2,1]).distinct().collect()
Q: What will this output?
In [ ]:
sc.parallelize([1,3,2,2,1]).sortBy(lambda x: x).collect()
Q: What will this output?
In [ ]:
%%writefile input.txt
hello world
another line
yet another line
yet another another line
In [ ]:
sc.textFile('input.txt') \
.map(lambda x: x.split()) \
.count()
In [ ]:
sc.textFile('input.txt') \
.flatMap(lambda x: x.split()) \
.count()
In [ ]:
sc.textFile('input.txt') \
.map(lambda x: x.split()) \
.collect()
In [ ]:
sc.textFile('input.txt') \
.flatMap(lambda x: x.split()) \
.collect()
In [ ]:
%%writefile sales.txt
#ID Date Store State Product Amount
101 11/13/2014 100 WA 331 300.00
104 11/18/2014 700 OR 329 450.00
102 11/15/2014 203 CA 321 200.00
106 11/19/2014 202 CA 331 330.00
103 11/17/2014 101 WA 373 750.00
105 11/19/2014 202 CA 321 200.00
In [ ]:
sc.textFile('sales.txt')\
.take(2)
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.take(2)
#
.
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: x[0].startswith('#'))\
.take(2)
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: not x[0].startswith('#'))\
.take(2)
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: not x[0].startswith('#'))\
.map(lambda x: x[-1])\
.take(2)
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: not x[0].startswith('#'))\
.map(lambda x: float(x[-1]))\
.sum()
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: not x[0].startswith('#'))\
.map(lambda x: (x[-3],float(x[-1])))\
.collect()
reduceByKey
to add them up.
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: not x[0].startswith('#'))\
.map(lambda x: (x[-3],float(x[-1])))\
.reduceByKey(lambda amount1,amount2: amount1+amount2)\
.collect()
Q: Find the state with the highest total revenue.
top
or the transformation sortBy
.
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: not x[0].startswith('#'))\
.map(lambda x: (x[-3],float(x[-1])))\
.reduceByKey(lambda amount1,amount2: amount1+amount2)\
.sortBy(lambda state_amount:state_amount[1],ascending=False) \
.collect()
reduceByKey
only works on RDDs made up of 2-tuples.
reduceByKey
works as both a reducer and a combiner.
It requires that the operation is associative.
Q: Implement word count in Spark.
In [ ]:
%%writefile input.txt
hello world
another line
yet another line
yet another another line
In [ ]:
sc.textFile('input.txt')\
.flatMap(lambda line: line.split())\
.map(lambda word: (word,1))\
.reduceByKey(lambda count1,count2: count1+count2)\
.collect()
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: not x[0].startswith('#'))\
.map(lambda x: (x[-3],float(x[-1])))\
.reduceByKey(lambda amount1,amount2: amount1+amount2)\
.sortBy(lambda state_amount:state_amount[1],ascending=False) \
.collect()
In [ ]:
client = ('Dmitri','Smith','SF')
def getCity1(client):
return client[2]
def getCity2((first,last,city)):
return city
print getCity1(client)
print getCity2(client)
What is the difference between getCity1
and getCity2
?
Which is more readable?
What is the essence of argument unpacking?
In [ ]:
client = ('Dmitri','Smith',('123 Eddy','SF','CA'))
def getCity((first,last,(street,city,state))):
return city
getCity(client)
Whenever you find yourself indexing into a tuple consider using argument unpacking to make it more readable.
Here is what getCity
looks like with tuple indexing.
In [ ]:
def badGetCity(client):
return client[2][1]
getCity(client)
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: not x[0].startswith('#'))\
.map(lambda x: (x[-3],float(x[-1])))\
.reduceByKey(lambda amount1,amount2: amount1+amount2)\
.sortBy(lambda state_amount:state_amount[1],ascending=False) \
.collect()
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: not x[0].startswith('#'))\
.map(lambda (id,date,store,state,product,amount): (state,float(amount)))\
.reduceByKey(lambda amount1,amount2: amount1+amount2)\
.sortBy(lambda (state,amount):amount,ascending=False) \
.collect()
reduceByKey
lets us aggregate values using sum, max, min, and other
associative operations. But what about non-associative operations like
average? How can we calculate them?
There are several ways to do this.
The first approach is to change the RDD tuples so that the operation becomes associative.
Instead of (state, amount)
use (state, (amount, count))
.
The second approach is to use groupByKey
, which is like
reduceByKey
except it gathers together all the values in an
iterator.
The iterator can then be reduced in a map
step immediately after
the groupByKey
.
Q: Calculate the average sales per state.
In [ ]:
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: not x[0].startswith('#'))\
.map(lambda x: (x[-3],(float(x[-1]),1)))\
.reduceByKey(lambda (amount1,count1),(amount2,count2): \
(amount1+amount2, count1+count2))\
.collect()
Note the argument unpacking we are doing in reduceByKey
to name
the elements of the tuples.
Approach 2: Use groupByKey
.
In [ ]:
def mean(iter):
total = 0.0; count = 0
for x in iter:
total += x; count += 1
return total/count
sc.textFile('sales.txt')\
.map(lambda x: x.split())\
.filter(lambda x: not x[0].startswith('#'))\
.map(lambda x: (x[-3],float(x[-1])))\
.groupByKey() \
.map(lambda (state,iter): mean(iter))\
.collect()
Q: Given a table of employees and locations find the cities that the employees live in.
join
.
In [ ]:
# Employees: emp_id, loc_id, name
employee_data = [
(101, 14, 'Alice'),
(102, 15, 'Bob'),
(103, 14, 'Chad'),
(104, 15, 'Jen'),
(105, 13, 'Dee') ]
# Locations: loc_id, location
location_data = [
(14, 'SF'),
(15, 'Seattle'),
(16, 'Portland')]
employees = sc.parallelize(employee_data)
locations = sc.parallelize(location_data)
# Re-key employee records with loc_id
employees2 = employees.map(lambda (emp_id,loc_id,name):(loc_id,name));
# Now join.
employees2.join(locations).collect()
Q: How would you calculate the mean, variance, and standard deviation of a sample
produced by Python's random()
function?
In [ ]:
count = 1000
list = [random.random() for _ in xrange(count)]
rdd = sc.parallelize(list)
print rdd.mean()
print rdd.variance()
print rdd.stdev()
In [ ]:
max = 10000000
%time sc.parallelize(xrange(max)).map(lambda x:x+1).count()
In [ ]:
%time sc.parallelize(xrange(max)).map(lambda x:x+1)
Besides reading data Spark and also write data out to a file system.
Q: Calculate the squares of integers from 1 to 100 and write them out
to squares.txt
.
squares.txt
does not exist.
In [ ]:
!if [ -e squares.txt ] ; then rm -rf squares.txt ; fi
squares.txt
.
In [ ]:
rdd1 = sc.parallelize(xrange(10))
rdd2 = rdd1.map(lambda x: x*x)
rdd2.saveAsTextFile('squares.txt')
In [ ]:
!cat squares.txt
In [ ]:
!ls -l squares.txt
In [ ]:
!for i in squares.txt/part-*; do echo $i; cat $i; done
Q: Can we control the number of partitions/tasks that Spark uses for processing data? Solve the same problem as above but this time with 5 tasks.
squares.txt
does not exist.
In [ ]:
!if [ -e squares.txt ] ; then rm -rf squares.txt ; fi
squares.txt
.
In [ ]:
partitions = 5
rdd1 = sc.parallelize(xrange(10), partitions)
rdd2 = rdd1.map(lambda x: x*x)
rdd2.saveAsTextFile('squares.txt')
In [ ]:
!ls -l squares.txt
!for i in squares.txt/part-*; do echo $i; cat $i; done
Term | Meaning |
---|---|
Task | Single thread in an executor |
Partition | Data processed by a single task |
Record | Records make up a partition that is processed by a single task |
Every Spark application gets executors when you create a new SparkContext
.
You can specify how many cores to assign to each executor.
A core is equivalent to a thread.
The number of cores determine how many tasks can run concurrently on an executor.
Each task corresponds to one partition.
Q: Find the date on which AAPL's stock price was the highest.
Suppose you have stock market data from Yahoo! for AAPL from http://finance.yahoo.com/q/hp?s=AAPL+Historical+Prices. The data is in CSV format and has these values.
Date | Open | High | Low | Close | Volume | Adj Close |
---|---|---|---|---|---|---|
11-18-2014 | 113.94 | 115.69 | 113.89 | 115.47 | 44,200,300 | 115.47 |
11-17-2014 | 114.27 | 117.28 | 113.30 | 113.99 | 46,746,700 | 113.99 |
Here is what the CSV looks like:
csv = [
"#Date,Open,High,Low,Close,Volume,Adj Close\n",
"2014-11-18,113.94,115.69,113.89,115.47,44200300,115.47\n",
"2014-11-17,114.27,117.28,113.30,113.99,46746700,113.99\n",
]
Lets find the date on which the price was the highest.
In [ ]:
csv = [
"#Date,Open,High,Low,Close,Volume,Adj Close\n",
"2014-11-18,113.94,115.69,113.89,115.47,44200300,115.47\n",
"2014-11-17,114.27,117.28,113.30,113.99,46746700,113.99\n",
]
sc.parallelize(csv) \
.filter(lambda line: not line.startswith("#")) \
.map(lambda line: line.split(",")) \
.map(lambda fields: (float(fields[-1]),fields[0])) \
.sortBy(lambda (close, date): close, ascending=False) \
.take(1)
In [ ]:
import urllib2
import re
def get_stock_high(symbol):
url = 'http://real-chart.finance.yahoo.com' + \
'/table.csv?s='+symbol+'&g=d&ignore=.csv'
csv = urllib2.urlopen(url).read()
csv_lines = csv.split('\n')
stock_rdd = sc.parallelize(csv_lines) \
.filter(lambda line: re.match(r'\d', line)) \
.map(lambda line: line.split(",")) \
.map(lambda fields: (float(fields[-1]),fields[0])) \
.sortBy(lambda (close, date): close, ascending=False)
return stock_rdd.take(1)
get_stock_high('AAPL')
In [ ]:
import random
num_count = 500*1000
num_list = [random.random() for i in xrange(num_count)]
rdd1 = sc.parallelize(num_list)
rdd2 = rdd1.sortBy(lambda num: num)
count()
on rdd2
.
In [ ]:
%time rdd2.count()
%time rdd2.count()
%time rdd2.count()
The RDD does no work until an action is called. And then when an action is called it figures out the answer and then throws away all the data.
If you have an RDD that you are going to reuse in your computation
you can use cache()
to make Spark cache the RDD.
Lets cache it and try again.
In [ ]:
rdd2.cache()
%time rdd2.count()
%time rdd2.count()
%time rdd2.count()
Calling cache()
flips a flag on the RDD.
The data is not cached until an action is called.
You can uncache an RDD using unpersist()
.
Q: Persist RDD to disk instead of caching it in memory.
You can cache RDDs at different levels.
Here is an example.
In [ ]:
import pyspark
rdd = sc.parallelize(xrange(100))
rdd.persist(pyspark.StorageLevel.DISK_ONLY)
Level | Meaning |
---|---|
MEMORY_ONLY |
Same as cache() |
MEMORY_AND_DISK |
Cache in memory then overflow to disk |
MEMORY_AND_DISK_SER |
Like above; in cache keep objects serialized instead of live |
DISK_ONLY |
Cache to disk not to memory |
MEMORY_AND_DISK_SER
is a good compromise between the levels.
Fast, but not too expensive.
Make sure you unpersist when you don't need the RDD any more.
Spark transformations are narrow if each RDD has one unique child past the transformation.
Spark transformations are wide if each RDD can have multiple children past the transformation.
Narrow transformations are map-like, while wide transformations are reduce-like.
Narrow transformations are faster because they do move data between executors, while wide transformations are slower.
Over time partitions can get skewed.
Or you might have less data or more data than you started with.
You can rebalance your partitions using repartition
or coalesce
.
coalesce
is narrow while repartition
is wide.
"s3:" URLs break when Secret Key contains a slash, even if encoded https://issues.apache.org/jira/browse/HADOOP-3733
Spark 1.3.1 / Hadoop 2.6 prebuilt pacakge has broken S3 filesystem access https://issues.apache.org/jira/browse/SPARK-7442