In [ ]:
from __future__ import print_function, division, absolute_import

Data Management Part 2: Map Reduce

Version 0.1

Problem 2 has been adapted from a homework developed by Bill Howe at the University of Washington department of Computer Science and Engineering. He says:

In this assignment, you will be designing and implementing MapReduce algorithms for a variety of common data processing tasks. The MapReduce programming model (and a corresponding system) was proposed in a 2004 paper from a team at Google as a simpler abstraction for processing very large datasets in parallel. The goal of this assignment is to give you experience “thinking in MapReduce.” We will be using small datasets that you can inspect directly to determine the correctness of your results and to internalize how MapReduce works.

On Friday, we'll do a demo of a MapReduce-based system to process the large datasets for which it was designed.


Problem 1:

python builtins, map, reduce, and filter

Recall yesterday's challenge problem, we define a function that returned true if a triangle was smaller than some threshold and False otherwise. We filtered the triangles as follows:

idx = [isTriangleLargerThan(triangle) for triangle in triangles]
onlySmallTriangles = triangles[idx]

You could also do this with the map function:

idx = map(isTriangleLargerThan, triangles)
onlySmallTriangles = triangles[idx]

or filter:

onlySmallTriangles = filter(isTriangleLargerThan, triangles)

The following code example is how we'd use them to compute a sum of 3 partitions. Pretend that the 3 lists are on different nodes. :)

Note 1) this is operating on a set of values rather than key/value pairs (which we'll introduce in Problem 2).

Note 2) Yes, this is contrived. In real life, you wouldn't go through this trouble to compute a simple sum, but it is a warm up for Problem 2


In [ ]:
import numpy as np

def mapper(arr):
    return np.sum(arr)

def reducer(x, y):
    return x + y

a = [1, 12, 3]
b = [4, 12, 6, 3]
c = [8, 1, 12, 11, 12, 2]

inputData = [a, b, c]

# Find the sum of all the numbers:
intermediate = map(mapper, inputData)
reduce(reducer, intermediate)

Problem 1a) Re-write the mapper and reducer to return the maximum number in all 3 lists.


In [ ]:
def mapper(arr):
    # COMPLETE

def reducer(x, y):
    # COMPLETE

intermediate = map(mapper, inputData)
reduce(reducer, intermediate)

Problem 1b) How would you use this to compute the MEAN of the input data.

Problem 1c) Think about how you would adapt this this to compute the MEDIAN of the input data. Do not implement it today! If it seems hard, it is because it is.

What special properties do SUM, MAX, MEAN have that make it trivial to represent in MapReduce?

Problem 2)

Let's go through a more complete example. The following MapReduce class faithfully implements the MapReduce programming model, but it executes entirely on one processor -- it does not involve parallel computation.

Setup

First, download the data:

$ curl -O https://lsst-web.ncsa.illinois.edu/~yusra/escience_mr/books.json
$ curl -O https://lsst-web.ncsa.illinois.edu/~yusra/escience_mr/records.json

In [ ]:
DATA_DIR = './data' # Set your path to the data files

In [ ]:
import json
import sys

class MapReduce:
    def __init__(self):
        self.intermediate = {}
        self.result = []

    def emit_intermediate(self, key, value):
        self.intermediate.setdefault(key, [])
        self.intermediate[key].append(value)

    def emit(self, value):
        self.result.append(value) 

    def execute(self, data, mapper, reducer):
        for line in data:
            record = json.loads(line)
            mapper(record)

        for key in self.intermediate:
            reducer(key, self.intermediate[key])

        jenc = json.JSONEncoder()
        for item in self.result:
            print(jenc.encode(item))

Here is the word count example discussed in class implemented as a MapReduce program using the framework:


In [ ]:
# Part 1
mr = MapReduce()

# Part 2
def mapper(record):
    # key: document identifier
    # value: document contents
    key = record[0]
    value = record[1]
    words = value.split()
    for w in words:
        mr.emit_intermediate(w, 1)

# Part 3
def reducer(key, list_of_values):
    # key: word
    # value: list of occurrence counts
    total = 0
    for v in list_of_values:
        total += v
    mr.emit((key, total))

# Part 4
inputdata = open(os.path.join(DATA_DIR, "books.json"))
mr.execute(inputdata, mapper, reducer)

Probelm 2a)

Create an Inverted index. Given a set of documents, an inverted index is a dictionary where each word is associated with a list of the document identifiers in which that word appears.

Mapper Input

The input is a 2 element list: [document_id, text], where document_id is a string representing a document identifier and text is a string representing the text of the document. The document text may have words in upper or lower case and may contain punctuation. You should treat each token as if it was a valid word; that is, you can just use value.split() to tokenize the string.

Reducer Output

The output should be a (word, document ID list) tuple where word is a String and document ID list is a list of Strings like:

["all", ["milton-paradise.txt", "blake-poems.txt", "melville-moby_dick.txt"]]
["Rossmore", ["edgeworth-parents.txt"]]
["Consumptive", ["melville-moby_dick.txt"]]
["forbidden", ["milton-paradise.txt"]]
["child", ["blake-poems.txt"]]
["eldest", ["edgeworth-parents.txt"]]
["four", ["edgeworth-parents.txt"]]
["Caesar", ["shakespeare-caesar.txt"]]
["winds", ["whitman-leaves.txt"]]
["Moses", ["bible-kjv.txt"]]
["children", ["edgeworth-parents.txt"]]
["seemed", ["chesterton-ball.txt", "austen-emma.txt"]]
etc...

In [ ]:
mr = MapReduce()

def mapper(record):
    # COMPELTE

def reducer(key, list_of_values):
    # COMPLETE

inputdata = open(os.path.join(DATA_DIR, "books.json"))
mr.execute(inputdata, mapper, reducer)

Challenge Problem

Implement a relational join as a MapReduce query

Consider the following query:

SELECT * 
FROM Orders, LineItem 
WHERE Order.order_id = LineItem.order_id

Your MapReduce query should produce the same result as this SQL query executed against an appropriate database. You can consider the two input tables, Order and LineItem, as one big concatenated bag of records that will be processed by the map function record by record.

Map Input

Each input record is a list of strings representing a tuple in the database. Each list element corresponds to a different attribute of the table

The first item (index 0) in each record is a string that identifies the table the record originates from. This field has two possible values:

"line_item" indicates that the record is a line item. "order" indicates that the record is an order.

  • The second element (index 1) in each record is the order_id. <--- JOIN ON THIS ELEMENT

LineItem records have 17 attributes including the identifier string.

Order records have 10 elements including the identifier string.

Reduce Output

The output should be a joined record: a single list of length 27 that contains the attributes from the order record followed by the fields from the line item record. Each list element should be a string like

["order", "32", "130057", "O", "208660.75", "1995-07-16", "2-HIGH", "Clerk#000000616", "0", "ise blithely bold, regular requests. quickly unusual dep", "line_item", "32", "82704", "7721", "1", "28", "47227.60", "0.05", "0.08", "N", "O", "1995-10-23", "1995-08-27", "1995-10-26", "TAKE BACK RETURN", "TRUCK", "sleep quickly. req"]
["order", "32", "130057", "O", "208660.75", "1995-07-16", "2-HIGH", "Clerk#000000616", "0", "ise blithely bold, regular requests. quickly unusual dep", "line_item", "32", "197921", "441", "2", "32", "64605.44", "0.02", "0.00", "N", "O", "1995-08-14", "1995-10-07", "1995-08-27", "COLLECT COD", "AIR", "lithely regular deposits. fluffily "]
["order", "32", "130057", "O", "208660.75", "1995-07-16", "2-HIGH", "Clerk#000000616", "0", "ise blithely bold, regular requests. quickly unusual dep", "line_item", "32", "44161", "6666", "3", "2", "2210.32", "0.09", "0.02", "N", "O", "1995-08-07", "1995-10-07", "1995-08-23", "DELIVER IN PERSON", "AIR", " express accounts wake according to the"]
["order", "32", "130057", "O", "208660.75", "1995-07-16", "2-HIGH", "Clerk#000000616", "0", "ise blithely bold, regular requests. quickly unusual dep", "line_item", "32", "2743", "7744", "4", "4", "6582.96", "0.09", "0.03", "N", "O", "1995-08-04", "1995-10-01", "1995-09-03", "NONE", "REG AIR", "e slyly final pac"]

In [ ]:
mr = MapReduce()

def mapper(record):
    # COMPLETE

def reducer(key, list_of_values):
    # COMPLETE

inputdata = open(os.path.join(DATA_DIR, "records.json"))
mr.execute(inputdata, mapper, reducer)

In [ ]: