Data Science

The World is Data

Alessandro Gagliardi
Sr. Data Scientist, Glassdoor.com

Last Time:

  • #### What is Data Science?
  • #### Goals of the Course
  • #### Python Intro

Questions?

  • Slides, etc.
  • Projects
  • Schoology & Discussions

Agenda

  1. Lab I
    1. Amazon Web Services
  2. Lecture
    1. Big Data
    2. MapReduce Programming Model
    3. Implementation Details
    4. Word Count Example
  3. Lab II
    1. Hadoop Streaming

Lab I: Amazon Web Services (AWS)

A. Big Data

As you have probably heard, big data is a hot topic these days.

Q: What does “big data” actually refer to?

A: Three ‘V’s:

  • Volume
  • Velocity
  • Variety
  • Volume (i.e. more than can fit in memory on your laptop)
    • e.g. Amazon’s user behavior data
  • Velocity (i.e. faster than standard machines can process)
    • e.g. Twitter’s “Firehose” of tweets
  • Variety (i.e. does not conform to a single structure)
    • e.g. Google’s cache of web pages
  • Volume (i.e. more than can fit in memory on your laptop)

    • e.g. Amazon’s user behavior data


  • Velocity (i.e. faster than standard machines can process)

    • e.g. Twitter’s “Firehose” of tweets


  • Variety (i.e. does not conform to a single structure)

    • e.g. Google’s cache of web pages

In this course, we will not deal with velocity. That is really more of an engineering problem than a data science problem. In fact, all three of these are engineering problems, but only the first and the third are really germane to Data Science. That said, as a Data Scientist, it is probably only a matter of time before someone asks you to create a real-time dashboard. Such a thing, while cool, is often very difficult to create and has questionable utility for anyone besides an operations engineer or a day trader, and they already have real-time dashboards.

How would you approach dealing with this kind of data?

One approach would be to get a huge supercomputer.

But this has some obvious drawbacks:

  • expensive
  • difficult to maintain
  • scalability is bounded

Instead of one huge machine, what if we got a bunch of regular (commodity) machines?

This has obvious benefits!

  • cheaper
  • easier to maintain
  • scalability is unbounded (just add more nodes to the cluster)

We can visualize this horizontal cluster architecture as a single client-multiple server relationship:

There are two ways to process data in a distributed architecture:

  1. move data to code (& processing power)
    • SETI
  2. move code to data
    • map-reduce -> less overhead (network traffic, disk I/O)

“Computing nodes are the same as storage nodes.”

Divide and conquer is a fundamental algorithmic technique for solving a given task, whose steps include:

  1. split task into subtasks
  2. solve these subtasks independently
  3. recombine the subtask results into a final result

One famous example of divide and conquer is merge sort.

The defining characteristic of a problem that is suitable for the divide and conquer approach is that it can be broken down into independent subtasks.

Tasks that can be parallelized in this way include:

  • count, sum, average
  • grep, sort, inverted index
  • graph traversals, some ML algorithms

Let’s do a toy example with average.

B. Programming Model

As we’ve discussed, the map-reduce approach involves splitting a problem into subtasks and processing these subtasks in parallel.

This takes place in two phases:

  1. the mapper phase
  2. the reducer phase

As we’ve discussed, the map-reduce approach involves splitting a problem into subtasks and processing these subtasks in parallel.

This takes place in (approximately) two phases:

  1. the mapper phase
  2. shuffle/sort <- the secret sauce
  3. the reducer phase

Map-reduce uses a functional programming paradigm. The data processing primitives are mappers and reducers, as we’ve seen.

mappers – filter & transform data
reducers – aggregate results


In [1]:
a = [1, 2, 3]
b = [4, 5, 6, 7]
c = [8, 9, 1, 2, 3]
L = map(lambda x:len(x), [a, b, c])
L


Out[1]:
[3, 4, 5]

In [2]:
N = reduce(lambda x, y: x+y, L)
N


Out[2]:
12

In [3]:
# Or, if we want to be fancy and do it in one line
reduce(lambda x, y: x+y, map(lambda x:len(x), [a, b, c]))


Out[3]:
12

Thanks to Michael Cvet for this example:

Map-reduce uses a functional programming paradigm. The data processing primitives are mappers and reducers, as we’ve seen.

mappers – filter & transform data
reducers – aggregate results

The functional paradigm is good at describing how to solve a problem, but not very good at describing data manipulations (eg, relational joins).

It’s possible to overlay the map-reduce framework with an additional declarative syntax.

This makes operations like select & join easier to implement and less error prone.

Popular examples for Hadoop include Pig and Hive.

Why Pig?

Because I bet you can read the following script.

An example pig script:

users = load 'users.csv' as (username: chararray, age: int);
users_1825 = filter users by age >= 18 and age <= 25;

pages = load 'pages.csv' as (username: chararray, url: chararray);

joined = join users_1825 by username, pages by username;
grouped = group joined by url;
summed = foreach grouped generate group as url, COUNT(joined) AS views;
top_5 = limit sorted 5;

store top_5 into 'top_5_sites.csv';

Now, just for fun... the same calculation in vanilla Hadoop MapReduce.

C. Implementation Details

MapReduce is…

A. A programming model for processing big data

B. A data processing system created and used by Google

C. A feature of Hadoop

D. All of the above

The implementation of MapReduce became popular when Google released a white paper in 2004 explaining how they did it.

This architecture was then copied. The two most popular versions are:

  • Apache’s Hadoop
  • Disco (bet you haven’t heard of this one)

Disco more friendly but not as well supported. Hadoop has a huge community and so, while more difficult to use, is the standard.

A map-reduce framework handles a lot of messy details for you:

  • parallelization & distribution (eg, input splitting)
  • partitioning (shuffle/sort/redirect)
  • fault-tolerance (fact: tasks/nodes will fail!)
  • I/O scheduling
  • status and monitoring

This (along with the functional semantics) allows you to focus on solving the problem instead of accounting & housekeeping details.

Hadoop is a popular open-source Java-based implementation of the map-reduce framework (including file storage for input/output).

You can download Hadoop and configure a set of machines to operate as a map-reduce cluster, or you can run it as a service via Amazon’s Elastic Map-Reduce.

Hadoop is written in Java, but the Hadoop Streaming utility allows client code to be supplied as executables (eg, written in any language).

Data is replicated in the (distributed) file system across several nodes.

This permits locality optimization (and fault tolerance) by allowing the mapper tasks to run on the same nodes where the data resides.

So we move code to data (instead of data to code), thus avoiding a lot of network traffic and disk I/O.

The Google File System (GFS) was developed alongside map-reduce to serve as the native file system for this type of processing.

The Hadoop platform is bundled with an open-source implementation of this file system called HDFS.

If you use Amazon EMR, you can use their file system (Amazon S3) as well. (What's wrong with this?)

D. Word Count Example

MapReduce processes data in terms of key-value pairs:

input           <k1, v1>

mapper          <k1, v1> -> <k2, v2>

(partitioner)   <k2, v2> -> <k2, [all k2 values]>

reducer         <k2, [all k2 values]> -> <k3, v3>

Using the following input, we can implement the “Hello World” of map-reduce: a word count.


In [1]:
i = {1: 'where',
     2: 'where in',
     3: 'where in the',
     4: 'where in the world',
     5: 'where in the world is',
     6: 'where in the world is carmen',
     7: 'where in the world is carmen sandiego'}

First, as for comparison, this is one way you could do it on a single machine:


In [5]:
from collections import defaultdict

import pandas as pd

wordcount = defaultdict(int)

for _, s in i:
    for w in s.split():
        wordcount[w] += 1

pd.Series(wordcount)


Out[5]:
carmen      2
in          6
is          3
sandiego    1
the         5
where       7
world       4
dtype: int64

The first processing primitive is the mapper, which filters & transforms the input data, and yields transformed key-value pairs.


In [2]:
def mapper(k1, v1):
    '''k1: line number
       v1: line contents (i.e. space-delimited string)'''
    words = v1.split()  # split string into words
    for word in words:
        yield (word, 1)

The mapper emits key-value pairs for each word encountered in the input data.


In [ ]:
from itertools import chain
map_results = list(chain.from_iterable(map(mapper, i.keys(), i.values())))

In [16]:
map_results


Out[16]:
[('where', 1),
 ('where', 1),
 ('in', 1),
 ('where', 1),
 ('in', 1),
 ('the', 1),
 ('where', 1),
 ('in', 1),
 ('the', 1),
 ('world', 1),
 ('where', 1),
 ('in', 1),
 ('the', 1),
 ('world', 1),
 ('is', 1),
 ('where', 1),
 ('in', 1),
 ('the', 1),
 ('world', 1),
 ('is', 1),
 ('carmen', 1),
 ('where', 1),
 ('in', 1),
 ('the', 1),
 ('world', 1),
 ('is', 1),
 ('carmen', 1),
 ('sandiego', 1)]

The partitioner is internal to the map-reduce framework, so we don’t have to write this ourselves. It shuffles & sorts the mapper output, and redirects all intermediate results for a given key to a single reducer.

If we were using a map-reduce framework like Hadoop or Disco, the partitioner would be internal. But since we are building this from scratch, we will create our own partitioner using a defaultdict. The partitioner shuffles & sorts the mapper output, and redirects all intermediate results for a given key to a single reducer.


In [22]:
partition_results


Out[22]:
{'carmen': [1, 1],
 'in': [1, 1, 1, 1, 1, 1],
 'is': [1, 1, 1],
 'sandiego': [1],
 'the': [1, 1, 1, 1, 1],
 'where': [1, 1, 1, 1, 1, 1, 1],
 'world': [1, 1, 1, 1]}

Finally, the reducer receives all values for a given key and aggregates (in this case, sums) the results


In [23]:
def reducer(k2, k2_vals):
    '''k2: word
       k2_vals: word counts'''
    yield k2, sum(k2_vals)

Reducer output is aggregated & sorted by key.


In [30]:
reducer_results = dict(chain.from_iterable(map(reducer, partition_results.keys(), partition_results.values())))

In [31]:
reducer_results


Out[31]:
{'carmen': 2,
 'in': 6,
 'is': 3,
 'sandiego': 1,
 'the': 5,
 'where': 7,
 'world': 4}

Lab II: Python & Hadoop Streaming

Discussion