A brief tour of Spark

Apache Spark is "a fast and general engine for large-scale data processing." It comes from the broader Hadoop ecosystem but can be used in a near-standalone mode, which we'll use here.

This is a Jupyter notebook with PySpark enabled. To enable PySpark, you need to have Spark available, and certain environment variables set. On an Ubuntu-16.04 machine, where you've downloaded the latest Spark:

% sudo apt-get install openjdk-9-jre-headless
% tar xvzf spark-2.0.2-bin-hadoop2.7.tgz
% export PATH=`pwd`/spark-2.0.2-bin-hadoop2.7/bin:$PATH
% export PYSPARK_DRIVER_PYTHON=jupyter
% export PYSPARK_DRIVER_PYTHON_OPTS='notebook' pyspark
% pyspark

This file tells Jupyter exactly how to connect with Spark and Python. Start a new Python notebook in Jupyter after it opens up, and note that you are getting a little more output from Jupyter in your shell window than normal. That's Spark - it's a little wordy.

Getting started with SparkContext and RDDs

Working with Spark, everything goes through a SparkContext object. It's available (after several seconds of startup time - look at the shell window where you started the Jupyter notebook and you'll see a lot of Spark startup messages) as the object sc:


In [ ]:
sc

Everything you do with Spark here will go through this object. It is a feature of pySpark to define and make this available in the shell environment, and the Jupyter kernel makes that available through a notebook.

The key construct in Spark is a Resilient Distributed Dataset, or RDD. An RDD leverages the data- and computing resource-tracking capabilities of the Hadoop infrastructure layer to make a dataset available in RAM. This is a key enhancement over the Hadoop or broader Map/Reduce model where data for every step of computation comes from disk and returns to disk between steps. Using RAM like this makes everything go faster.

Another key concept in Spark is that it will split up your data and handling the low-level details of mapping, shuffling, and reducing data for you. Rather than the Hadoop style code we saw previously, Spark introduces a few language constructs that are easy to learn and work with. Let's go through those basics now.

First, let's load up data. The simplest way is to use the SparkContext to access a text file. Let's visit our Bikeshare data one last time.


In [ ]:
!wget https://s3.amazonaws.com/capitalbikeshare-data/2015-Q4-cabi-trip-history-data.zip

In [ ]:
!unzip 2015-Q4-cabi-trip-history-data.zip

In [ ]:
!mv 2015-Q4-Trips-History-Data.csv q4.csv

In [ ]:
!wc -l q4.csv

This is the same trip dataset we looked at previously, with the familiar format:


In [ ]:
!csvcut -n q4.csv

In [ ]:
!head -5 q4.csv | csvlook

Python modules

Always good to bring your imports together in one place.


In [4]:
from operator import add

To prep the data for use as an RDD, we just need one line:


In [ ]:
rides = sc.textFile('q4.csv')

See how quickly it returned? That's because (as we learned from Hari, Nisha, and Mokeli) the processing of the data is deferred - we haven't actually done anything with it but prepare it for use as an RDD within Spark.

Let's do something simple first.


In [ ]:
rides.count()

That took a little bit longer. To see why, we can jump over to the Spark UI. On my machine right now, it's available at http://localhost:4040/jobs/ but note that that URL might not work for you - it's just local to my machine.

(Explore the Spark UI a little bit)

You can find your local Spark UI by examining the text output from the same shell window we looked back at a little while ago. The one where you started Jupyter and saw all the Spark startup information will now have a bunch of lines about the job we processed. Scroll back and look for something like this up near the top:

INFO SparkUI: Started SparkUI at http://localhost:4040/jobs/

Whatever that URL is on your VM, that's where you can find the Spark UI on your host.

Reviewing the resulting data from that one simple job -- counting lines -- tells us a lot about what the Hadoop environment and Spark on top of it do automatically for us. Remember that point about how these tools make it easier for us to write code that uses parallel computing without having to be experts?

Let's do something a little more interesting. This is a CSV file, so let's count ride stations pairs. To do this we need to map each input line and extract the start and stop station, then we need to reduce that by aggregating the count lines. Fortunately we can do that with the Python keywords map (which maps input data to an output), filter (which selects or filters some data from a larger set based on a test), and lambda (which defines "anonymous" functions inline). These are common functional programming constructs and date back many, many decades, so they are a natural fit here because the Map/Reduce paradigm itself is a functional model.

First, we split up the data rows, our map step.


In [ ]:
station_pairs = rides \
    .map(lambda r: r.split(",")) \
    .map(lambda r: ((r[4], r[6]), 1))

Several things to note here:

  • That was instantaneous. We haven't computed anything yet - this is "lazy evaluation".
  • lambda takes an input (here r) and returns a result (here the split array once, then a tuple of the two station names with a counter, 1). It's like you're defining a function right in the middle of other code and not giving it a name. That's why they're called "anonymous" or "inline" functions.
  • We are chaining two map commands together. This should look familiar - it's just like piping.

This leaves us with a mapped data structure that needs to be reduced.


In [ ]:
key_func = lambda k, v: -v
station_counts = station_pairs \
    .reduceByKey(add) \
    .takeOrdered(10, key=lambda r: -r[1])
station_counts

(look at the Spark UI again for this job!)

There we go, the counts of station pairs. Lots of people riding around the mall, and from Eastern Market to Lincoln Park (my neighborhood!).

What just happened?

  • We imported the add operation for use as a parameter to reduceByKey
  • reduceByKey is a Spark verb that lets us reduce mapped data, in this case, something like a GROUP BY operation, where we operate on the RDD provided using the function passed in as a parameter: add
  • We finally execute the whole operation with takeOrdered, which invokes the full computation and takes the top 10 results.
  • We calculate the top 10 with the anonymous sort key function lambda r: -r[1] which returns a descending sort result set of the added up key/count pairs by their counts.
  • We show the result, station_counts

There we go! We just computed something using Hadoop and Spark.

Of course we can do the same thing with csvcut | sort | uniq -c | sort -rn | head, right?


In [ ]:
!csvcut -c5,7 q4.csv | sort | uniq -c | sort -rn | head

Which took longer?

Why?

Computing some numbers

Counting is well and good, but let's do a little math and get some basic descriptive statistics. Let's reimplement our milliseconds-to-minutes conversion, then find what the central tendencies of the duration might be.

Great. Now to compute the duration in minutes, which we'll get in seconds from a datediff, then divide by 60.


In [ ]:
def get_duration_min(ms):
    return int(ms) / (60 * 1000)

get_duration_min('696038')

We'll use this function in our pipeline in a moment.

Next we need to reach for Spark Statistics:


In [ ]:
import numpy as np
from pyspark.mllib.stat import Statistics

To generate summary statistics, we need to first create a vector of data, in this case our duration counts.

First we need a little trick to skip the first line, our header row.


In [ ]:
header = rides.first()
rides = rides.filter(lambda x: x != header)

Next we have to get our durations from the source data. Note that we next have to wrap these in NumPy vectors.


In [ ]:
durations = rides \
    .map(lambda r: r.split(",")) \
    .map(lambda r: np.array([get_duration_min(r[0])]))
durations.take(5)

Now we just feed that to a Summarizer:


In [ ]:
summary = Statistics.colStats(durations)
print(summary.mean())
print(summary.variance())
print(summary.numNonzeros())

Counting words

We can revisit our word count example as well. Let's reach back for another familiar dataset:


In [ ]:
!wget https://github.com/gwsb-istm-6212-fall-2016/syllabus-and-schedule/raw/master/exercises/pg2500.txt

In [ ]:
!mv pg2500.txt siddhartha.txt

In [ ]:
siddhartha_text = sc.textFile('siddhartha.txt')

In [ ]:
word_counts = siddhartha_text \
    .flatMap(lambda line: line.lower().split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(add)

In [ ]:
word_counts.takeOrdered(25, lambda w: -w[1])

What do you see that we could improve in this word count?


In [1]:
many_texts = sc.textFile('texts/*.txt')

In [5]:
word_counts = many_texts \
    .flatMap(lambda line: line.lower().split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(add)

In [ ]:
word_count