Apache Spark is "a fast and general engine for large-scale data processing." It comes from the broader Hadoop ecosystem but can be used in a near-standalone mode, which we'll use here.
This is a Jupyter notebook with PySpark enabled. To enable PySpark, you need to have Spark available, and certain environment variables set. On an Ubuntu-16.04 machine, where you've downloaded the latest Spark:
% sudo apt-get install openjdk-9-jre-headless
% tar xvzf spark-2.0.2-bin-hadoop2.7.tgz
% export PATH=`pwd`/spark-2.0.2-bin-hadoop2.7/bin:$PATH
% export PYSPARK_DRIVER_PYTHON=jupyter
% export PYSPARK_DRIVER_PYTHON_OPTS='notebook' pyspark
% pyspark
This file tells Jupyter exactly how to connect with Spark and Python. Start a new Python notebook in Jupyter after it opens up, and note that you are getting a little more output from Jupyter in your shell window than normal. That's Spark - it's a little wordy.
Working with Spark, everything goes through a SparkContext object. It's available (after several seconds of startup time - look at the shell window where you started the Jupyter notebook and you'll see a lot of Spark startup messages) as the object sc
:
In [ ]:
sc
Everything you do with Spark here will go through this object. It is a feature of pySpark to define and make this available in the shell environment, and the Jupyter kernel makes that available through a notebook.
The key construct in Spark is a Resilient Distributed Dataset, or RDD. An RDD leverages the data- and computing resource-tracking capabilities of the Hadoop infrastructure layer to make a dataset available in RAM. This is a key enhancement over the Hadoop or broader Map/Reduce model where data for every step of computation comes from disk and returns to disk between steps. Using RAM like this makes everything go faster.
Another key concept in Spark is that it will split up your data and handling the low-level details of mapping, shuffling, and reducing data for you. Rather than the Hadoop style code we saw previously, Spark introduces a few language constructs that are easy to learn and work with. Let's go through those basics now.
First, let's load up data. The simplest way is to use the SparkContext to access a text file. Let's visit our Bikeshare data one last time.
In [ ]:
!wget https://s3.amazonaws.com/capitalbikeshare-data/2015-Q4-cabi-trip-history-data.zip
In [ ]:
!unzip 2015-Q4-cabi-trip-history-data.zip
In [ ]:
!mv 2015-Q4-Trips-History-Data.csv q4.csv
In [ ]:
!wc -l q4.csv
This is the same trip dataset we looked at previously, with the familiar format:
In [ ]:
!csvcut -n q4.csv
In [ ]:
!head -5 q4.csv | csvlook
In [4]:
from operator import add
To prep the data for use as an RDD, we just need one line:
In [ ]:
rides = sc.textFile('q4.csv')
See how quickly it returned? That's because (as we learned from Hari, Nisha, and Mokeli) the processing of the data is deferred - we haven't actually done anything with it but prepare it for use as an RDD within Spark.
Let's do something simple first.
In [ ]:
rides.count()
That took a little bit longer. To see why, we can jump over to the Spark UI. On my machine right now, it's available at http://localhost:4040/jobs/ but note that that URL might not work for you - it's just local to my machine.
You can find your local Spark UI by examining the text output from the same shell window we looked back at a little while ago. The one where you started Jupyter and saw all the Spark startup information will now have a bunch of lines about the job we processed. Scroll back and look for something like this up near the top:
INFO SparkUI: Started SparkUI at http://localhost:4040/jobs/
Whatever that URL is on your VM, that's where you can find the Spark UI on your host.
Reviewing the resulting data from that one simple job -- counting lines -- tells us a lot about what the Hadoop environment and Spark on top of it do automatically for us. Remember that point about how these tools make it easier for us to write code that uses parallel computing without having to be experts?
Let's do something a little more interesting. This is a CSV file, so let's count ride stations pairs. To do this we need to map each input line and extract the start and stop station, then we need to reduce that by aggregating the count lines. Fortunately we can do that with the Python keywords map
(which maps input data to an output), filter
(which selects or filters some data from a larger set based on a test), and lambda
(which defines "anonymous" functions inline). These are common functional programming constructs and date back many, many decades, so they are a natural fit here because the Map/Reduce paradigm itself is a functional model.
First, we split up the data rows, our map step.
In [ ]:
station_pairs = rides \
.map(lambda r: r.split(",")) \
.map(lambda r: ((r[4], r[6]), 1))
Several things to note here:
lambda
takes an input (here r
) and returns a result (here the split array once, then a tuple of the two station names with a counter, 1). It's like you're defining a function right in the middle of other code and not giving it a name. That's why they're called "anonymous" or "inline" functions.map
commands together. This should look familiar - it's just like piping.This leaves us with a mapped data structure that needs to be reduced.
In [ ]:
key_func = lambda k, v: -v
station_counts = station_pairs \
.reduceByKey(add) \
.takeOrdered(10, key=lambda r: -r[1])
station_counts
(look at the Spark UI again for this job!)
There we go, the counts of station pairs. Lots of people riding around the mall, and from Eastern Market to Lincoln Park (my neighborhood!).
What just happened?
add
operation for use as a parameter to reduceByKey
reduceByKey
is a Spark verb that lets us reduce mapped data, in this case, something like a GROUP BY
operation, where we operate on the RDD provided using the function passed in as a parameter: add
takeOrdered
, which invokes the full computation and takes the top 10 results.lambda r: -r[1]
which returns a descending sort result set of the added up key/count pairs by their counts.station_counts
There we go! We just computed something using Hadoop and Spark.
Of course we can do the same thing with csvcut | sort | uniq -c | sort -rn | head
, right?
In [ ]:
!csvcut -c5,7 q4.csv | sort | uniq -c | sort -rn | head
Which took longer?
Why?
Great. Now to compute the duration in minutes, which we'll get in seconds from a datediff, then divide by 60.
In [ ]:
def get_duration_min(ms):
return int(ms) / (60 * 1000)
get_duration_min('696038')
We'll use this function in our pipeline in a moment.
Next we need to reach for Spark Statistics:
In [ ]:
import numpy as np
from pyspark.mllib.stat import Statistics
To generate summary statistics, we need to first create a vector of data, in this case our duration counts.
First we need a little trick to skip the first line, our header row.
In [ ]:
header = rides.first()
rides = rides.filter(lambda x: x != header)
Next we have to get our durations from the source data. Note that we next have to wrap these in NumPy vectors.
In [ ]:
durations = rides \
.map(lambda r: r.split(",")) \
.map(lambda r: np.array([get_duration_min(r[0])]))
durations.take(5)
Now we just feed that to a Summarizer:
In [ ]:
summary = Statistics.colStats(durations)
print(summary.mean())
print(summary.variance())
print(summary.numNonzeros())
In [ ]:
!wget https://github.com/gwsb-istm-6212-fall-2016/syllabus-and-schedule/raw/master/exercises/pg2500.txt
In [ ]:
!mv pg2500.txt siddhartha.txt
In [ ]:
siddhartha_text = sc.textFile('siddhartha.txt')
In [ ]:
word_counts = siddhartha_text \
.flatMap(lambda line: line.lower().split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(add)
In [ ]:
word_counts.takeOrdered(25, lambda w: -w[1])
What do you see that we could improve in this word count?
In [1]:
many_texts = sc.textFile('texts/*.txt')
In [5]:
word_counts = many_texts \
.flatMap(lambda line: line.lower().split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(add)
In [ ]:
word_count