A brief tour of Spark

Apache Spark is "a fast and general engine for large-scale data processing." It comes from the broader Hadoop ecosystem but can be used in a near-standalone mode, which we'll use here.

This is a Jupyter notebook with pySpark enabled. To enable pySpark, you need to have Spark available, and a file containing something like the following on a path like /home/dchud/.ipython/kernels/pyspark/kernel.json:

{
 "display_name": "pySpark (Spark 1.5.2)",
 "language": "python",
 "argv": [
  "/Users/dchud/anaconda/bin/python2",
  "-m",
  "IPython.kernel",
  "-f",
  "{connection_file}"
 ],
 "env": {
  "SPARK_HOME": "/usr/local/Cellar/apache-spark/1.5.2/libexec",
  "PYTHONPATH": "/usr/local/Cellar/apache-spark/1.5.2/libexec/python/:/usr/local/Cellar/apache-spark/1.5.2/libexec/python/lib/py4j-0.8.2.1-src.zip",
  "PYTHONSTARTUP": "/usr/local/Cellar/apache-spark/1.5.2/libexec/python/pyspark/shell.py",
  "PYSPARK_SUBMIT_ARGS": "pyspark-shell"
 }
}

This file tells Jupyter exactly how to connect with Spark and Python, in this case using homebrew-installed Spark and Anaconda-installed Python.

Note that this is set up for use on my mac laptop, which has more cores available to it than our class VM. Our class VM already has a file like this under /home/vagrant and a pySpark notebook available through Jupyter. Try it, it should work!

Getting started with SparkContext and RDDs

Working with Spark, everything goes through a SparkContext object. It's available (after several seconds of startup time - look at the shell window where you started the Jupyter notebook and you'll see a lot of Spark startup messages) as the object sc:


In [ ]:
sc

Everything you do with Spark here will go through this object. It is a feature of pySpark to define and make this available in the shell environment, and the Jupyter kernel makes that available through a notebook.

The key construct in Spark is a Resilient Distributed Dataset, or RDD. An RDD leverages the data- and computing resource-tracking capabilities of the Hadoop infrastructure layer to make a dataset available in RAM. This is a key enhancement over the Hadoop or broader Map/Reduce model where data for every step of computation comes from disk and returns to disk between steps. Using RAM like this makes everything go faster.

Another key concept in Spark is that it will split up your data and handling the low-level details of mapping, shuffling, and reducing data for you. Rather than the Hadoop style code we saw previously, Spark introduces a few language constructs that are easy to learn and work with. Let's go through those basics now.

First, let's load up data. The simplest way is to use the SparkContext to access a text file. Let's visit our Bikeshare data one last time.


In [1]:
!ls -lh /Users/dchud/projects/dbplus/bikeshare.csv


-rw-r--r--  1 dchud  EAD\Domain Users   246M Nov 17 20:55 /Users/dchud/projects/dbplus/bikeshare.csv

In [2]:
!wc /Users/dchud/projects/dbplus/bikeshare.csv


 2583370 33663203 257797920 /Users/dchud/projects/dbplus/bikeshare.csv

This is the combined 2.5M records from 2013 we looked at previously, with the familiar format:


In [3]:
!head -5 /Users/dchud/projects/dbplus/bikeshare.csv | csvlook


|------------------+---------------+--------------------------------------+-------------------------------------------------------+---------+-------------|
|  start_date      | end_date      | start_station                        | end_station                                           | bike_id | sub_type    |
|------------------+---------------+--------------------------------------+-------------------------------------------------------+---------+-------------|
|  3/31/2013 23:59 | 4/1/2013 0:04 | Massachusetts Ave & Dupont Circle NW | New Hampshire Ave & T St NW [formerly 16th & U St NW] | W00347  | Casual      |
|  3/31/2013 23:58 | 4/1/2013 7:44 | Massachusetts Ave & Dupont Circle NW | New Hampshire Ave & T St NW [formerly 16th & U St NW] | W00550  | Casual      |
|  3/31/2013 23:57 | 4/1/2013 0:01 | Crystal City Metro / 18th & Bell St  | 27th & Crystal Dr                                     | W20193  | Casual      |
|  3/31/2013 23:52 | 4/1/2013 0:08 | Georgia & New Hampshire Ave NW       | 1st & Rhode Island Ave NW                             | W00209  | Subscriber  |
|------------------+---------------+--------------------------------------+-------------------------------------------------------+---------+-------------|

To prep the data for use as an RDD, we just need one line:


In [4]:
rides = sc.textFile('/Users/dchud/projects/dbplus/bikeshare.csv')

See how quickly it returned? That's because (as we learned from Hari, Nisha, and Mokeli) the processing of the data is deferred - we haven't actually done anything with it but prepare it for use as an RDD within Spark.

Let's do something simple first.


In [5]:
rides.count()


Out[5]:
2583370

That took a little bit longer. To see why, we can jump over to the Spark UI. On my machine right now, it's available at http://161.253.117.35:4041/jobs/ but note that that URL will probably not work for you - it's just local to my machine.

(Explore the Spark UI a little bit)

You can find your local Spark UI by examining the text output from the same shell window we looked back at a little while ago. The one where you started Jupyter and saw all the Spark startup information will now have a bunch of lines about the job we processed. Scroll back and look for something like this up near the top:

INFO SparkUI: Started SparkUI at http://161.253.117.35:4041/jobs/

Whatever that URL is on your VM, that's where you can find the Spark UI on your host.

Reviewing the resulting data from that one simple job -- counting lines -- tells us a lot about what the Hadoop environment and Spark on top of it do automatically for us. Remember that point about how these tools make it easier for us to write code that uses parallel computing without having to be experts?

Let's do something a little more interesting. This is a CSV file, so let's count ride stations pairs. To do this we need to map each input line and extract the start and stop station, then we need to reduce that by aggregating the count lines. Fortunately we can do that with the Python keywords map (which maps input data to an output), filter (which selects or filters some data from a larger set based on a test), and lambda (which defines "anonymous" functions inline). These are common functional programming constructs and date back many, many decades, so they are a natural fit here because the Map/Reduce paradigm itself is a functional model.

First, we split up the data rows, our map step.


In [7]:
station_pairs = rides.map(lambda l: l.split(",")).map(lambda l: ((l[2], l[3]), 1))

Several things to note here:

  • That was instantaneous. We haven't computed anything yet - this is "lazy evaluation".
  • lambda takes an input (here l) and returns a result (here the split array once, then a tuple of the two station names with a counter, 1). It's like you're defining a function right in the middle of other code and not giving it a name. That's why they're called "anonymous" or "inline" functions.
  • We are chaining two map commands together. This should look familiar - it's just like piping.

This leaves us with a mapped data structure that needs to be reduced.


In [8]:
from operator import add
station_counts = station_pairs.reduceByKey(add).takeOrdered(10, key=lambda (k, v): -v)
station_counts


Out[8]:
[((u'Smithsonian / Jefferson Dr & 12th St SW',
   u'Smithsonian / Jefferson Dr & 12th St SW'),
  5138),
 ((u'Lincoln Memorial', u'Jefferson Memorial'), 5012),
 ((u'Eastern Market Metro / Pennsylvania Ave & 7th St SE',
   u'Lincoln Park / 13th & East Capitol St NE '),
  4962),
 ((u'Lincoln Park / 13th & East Capitol St NE ',
   u'Eastern Market Metro / Pennsylvania Ave & 7th St SE'),
  4740),
 ((u'Jefferson Dr & 14th St SW', u'Jefferson Dr & 14th St SW'), 4709),
 ((u'Lincoln Memorial', u'Lincoln Memorial'), 4130),
 ((u'Lincoln Memorial', u'Jefferson Dr & 14th St SW'), 3973),
 ((u'Jefferson Dr & 14th St SW', u'Lincoln Memorial'), 3683),
 ((u'15th & P St NW', u'Massachusetts Ave & Dupont Circle NW'), 3675),
 ((u'Massachusetts Ave & Dupont Circle NW', u'15th & P St NW'), 3538)]

(look at the Spark UI again for this job!)

There we go, the counts of station pairs. Lots of people riding around the mall, and from Eastern Market to Lincoln Park (my neighborhood!).

What just happened?

  • We imported the add operation for use as a parameter to reduceByKey
  • reduceByKey is a Spark verb that lets us reduce mapped data, in this case, something like a GROUP BY operation, where we operate on the RDD provided using the function passed in as a parameter: add
  • We finally execute the whole operation with takeOrdered, which invokes the full computation and takes the top 10 results.
  • We calculate the top 10 with the anonymous sort key function lambda (k, v): -v which returns a descending sort result set of the added up key/count pairs.
  • We show the result, station_counts

There we go! We just computed something using Hadoop and Spark.

Of course we can do the same thing with csvcut | sort | uniq -c | sort -rn | head, right?


In [6]:
!csvcut -c3,4 /Users/dchud/projects/dbplus/bikeshare.csv | sort | uniq -c | sort -rn | head


5138 Smithsonian / Jefferson Dr & 12th St SW,Smithsonian / Jefferson Dr & 12th St SW
5012 Lincoln Memorial,Jefferson Memorial
4962 Eastern Market Metro / Pennsylvania Ave & 7th St SE,Lincoln Park / 13th & East Capitol St NE 
4740 Lincoln Park / 13th & East Capitol St NE ,Eastern Market Metro / Pennsylvania Ave & 7th St SE
4709 Jefferson Dr & 14th St SW,Jefferson Dr & 14th St SW
4130 Lincoln Memorial,Lincoln Memorial
3973 Lincoln Memorial,Jefferson Dr & 14th St SW
3683 Jefferson Dr & 14th St SW,Lincoln Memorial
3675 15th & P St NW,Massachusetts Ave & Dupont Circle NW
3538 Massachusetts Ave & Dupont Circle NW,15th & P St NW
sort: write failed: standard output: Broken pipe
sort: write error

Which took longer?

Why?

Computing some numbers

Counting is well and good, but let's do a little math and get some basic descriptive statistics. Let's parse the datetime strings and compute a minute count, then find what the central tendencies of the duration might be.


In [9]:
from datetime import datetime
def get_date(s):
    return datetime.strptime(s, '%m/%d/%Y %H:%M')

get_date('3/31/2013 23:51')


Out[9]:
datetime.datetime(2013, 3, 31, 23, 51)

Great. Now to compute the duration in minutes, which we'll get in seconds from a datediff, then divide by 60.


In [10]:
def get_duration(start, stop):
    start_date = get_date(start)
    stop_date = get_date(stop)
    return (stop_date - start_date).seconds / 60

get_duration('3/31/2013 23:51', '3/31/2013 23:59')


Out[10]:
8

See how we composed a broader function from a smaller element we defined first? Just like the UNIX philosophy says - building complex pieces up from simpler building blocks, we can do the same with our code.

Next we need to reach for Spark Statistics:


In [12]:
import numpy as np
from pyspark.mllib.stat import Statistics

To generate summary statistics, we need to first create a vector of data, in this case our duration counts.

First we need a little trick to skip the first line, our header row.


In [11]:
header = rides.first()
rides = rides.filter(lambda x: x != header)

Next we have to get our durations from the source data. Note that we next have to wrap these in NumPy vectors.


In [13]:
durations = rides.map(lambda l: l.split(",")).map(lambda l: np.array([get_duration(l[0], l[1])]))
durations.take(5)


Out[13]:
[array([5]), array([466]), array([4]), array([16]), array([6])]

Now we just feed that to our Summarizer:


In [14]:
summary = Statistics.colStats(durations)
print(summary.mean())
print(summary.variance())
print(summary.numNonzeros())


[ 17.09474643]
[ 1002.77866119]
[ 2564710.]