In [ ]:
from __future__ import print_function, division, absolute_import
Version 0.1
Problem 2 has been adapted from a homework developed by Bill Howe at the University of Washington department of Computer Science and Engineering. He says:
In this assignment, you will be designing and implementing MapReduce algorithms for a variety of common data processing tasks. The MapReduce programming model (and a corresponding system) was proposed in a 2004 paper from a team at Google as a simpler abstraction for processing very large datasets in parallel. The goal of this assignment is to give you experience “thinking in MapReduce.” We will be using small datasets that you can inspect directly to determine the correctness of your results and to internalize how MapReduce works.
On Friday, we'll do a demo of a MapReduce-based system to process the large datasets for which it was designed.
Recall yesterday's challenge problem, we define a function that returned true if a triangle was smaller than some threshold and False otherwise. We filtered the triangles as follows:
idx = [isTriangleLargerThan(triangle) for triangle in triangles]
onlySmallTriangles = triangles[idx]
You could also do this with the map
function:
idx = map(isTriangleLargerThan, triangles)
onlySmallTriangles = triangles[idx]
or filter
:
onlySmallTriangles = filter(isTriangleLargerThan, triangles)
The following code example is how we'd use them to compute a sum of 3 partitions. Pretend that the 3 lists are on different nodes. :)
Note 1) this is operating on a set of values rather than key/value pairs (which we'll introduce in Problem 2).
Note 2) Yes, this is contrived. In real life, you wouldn't go through this trouble to compute a simple sum, but it is a warm up for Problem 2
In [ ]:
import numpy as np
def mapper(arr):
return np.sum(arr)
def reducer(x, y):
return x + y
a = [1, 12, 3]
b = [4, 12, 6, 3]
c = [8, 1, 12, 11, 12, 2]
inputData = [a, b, c]
# Find the sum of all the numbers:
intermediate = map(mapper, inputData)
reduce(reducer, intermediate)
Problem 1a) Re-write the mapper and reducer to return the maximum number in all 3 lists.
In [ ]:
def mapper(arr):
# COMPLETE
def reducer(x, y):
# COMPLETE
intermediate = map(mapper, inputData)
reduce(reducer, intermediate)
Problem 1b) How would you use this to compute the MEAN of the input data.
Problem 1c) Think about how you would adapt this this to compute the MEDIAN of the input data. Do not implement it today! If it seems hard, it is because it is.
What special properties do SUM, MAX, MEAN have that make it trivial to represent in MapReduce?
Setup
First, download the data:
$ curl -O https://lsst-web.ncsa.illinois.edu/~yusra/escience_mr/books.json
$ curl -O https://lsst-web.ncsa.illinois.edu/~yusra/escience_mr/records.json
In [ ]:
DATA_DIR = './data' # Set your path to the data files
In [ ]:
import json
import sys
class MapReduce:
def __init__(self):
self.intermediate = {}
self.result = []
def emit_intermediate(self, key, value):
self.intermediate.setdefault(key, [])
self.intermediate[key].append(value)
def emit(self, value):
self.result.append(value)
def execute(self, data, mapper, reducer):
for line in data:
record = json.loads(line)
mapper(record)
for key in self.intermediate:
reducer(key, self.intermediate[key])
jenc = json.JSONEncoder()
for item in self.result:
print(jenc.encode(item))
Here is the word count example discussed in class implemented as a MapReduce program using the framework:
In [ ]:
# Part 1
mr = MapReduce()
# Part 2
def mapper(record):
# key: document identifier
# value: document contents
key = record[0]
value = record[1]
words = value.split()
for w in words:
mr.emit_intermediate(w, 1)
# Part 3
def reducer(key, list_of_values):
# key: word
# value: list of occurrence counts
total = 0
for v in list_of_values:
total += v
mr.emit((key, total))
# Part 4
inputdata = open(os.path.join(DATA_DIR, "books.json"))
mr.execute(inputdata, mapper, reducer)
Create an Inverted index. Given a set of documents, an inverted index is a dictionary where each word is associated with a list of the document identifiers in which that word appears.
Mapper Input
The input is a 2 element list: [document_id, text], where document_id is a string representing a document identifier and text is a string representing the text of the document. The document text may have words in upper or lower case and may contain punctuation. You should treat each token as if it was a valid word; that is, you can just use value.split() to tokenize the string.
Reducer Output
The output should be a (word, document ID list) tuple where word is a String and document ID list is a list of Strings like:
["all", ["milton-paradise.txt", "blake-poems.txt", "melville-moby_dick.txt"]]
["Rossmore", ["edgeworth-parents.txt"]]
["Consumptive", ["melville-moby_dick.txt"]]
["forbidden", ["milton-paradise.txt"]]
["child", ["blake-poems.txt"]]
["eldest", ["edgeworth-parents.txt"]]
["four", ["edgeworth-parents.txt"]]
["Caesar", ["shakespeare-caesar.txt"]]
["winds", ["whitman-leaves.txt"]]
["Moses", ["bible-kjv.txt"]]
["children", ["edgeworth-parents.txt"]]
["seemed", ["chesterton-ball.txt", "austen-emma.txt"]]
etc...
In [ ]:
mr = MapReduce()
def mapper(record):
# COMPELTE
def reducer(key, list_of_values):
# COMPLETE
inputdata = open(os.path.join(DATA_DIR, "books.json"))
mr.execute(inputdata, mapper, reducer)
Implement a relational join as a MapReduce query
Consider the following query:
SELECT *
FROM Orders, LineItem
WHERE Order.order_id = LineItem.order_id
Your MapReduce query should produce the same result as this SQL query executed against an appropriate database. You can consider the two input tables, Order and LineItem, as one big concatenated bag of records that will be processed by the map function record by record.
Map Input
Each input record is a list of strings representing a tuple in the database. Each list element corresponds to a different attribute of the table
The first item (index 0) in each record is a string that identifies the table the record originates from. This field has two possible values:
"line_item" indicates that the record is a line item. "order" indicates that the record is an order.
order_id.
<--- JOIN ON THIS ELEMENTLineItem records have 17 attributes including the identifier string.
Order records have 10 elements including the identifier string.
Reduce Output
The output should be a joined record: a single list of length 27 that contains the attributes from the order record followed by the fields from the line item record. Each list element should be a string like
["order", "32", "130057", "O", "208660.75", "1995-07-16", "2-HIGH", "Clerk#000000616", "0", "ise blithely bold, regular requests. quickly unusual dep", "line_item", "32", "82704", "7721", "1", "28", "47227.60", "0.05", "0.08", "N", "O", "1995-10-23", "1995-08-27", "1995-10-26", "TAKE BACK RETURN", "TRUCK", "sleep quickly. req"]
["order", "32", "130057", "O", "208660.75", "1995-07-16", "2-HIGH", "Clerk#000000616", "0", "ise blithely bold, regular requests. quickly unusual dep", "line_item", "32", "197921", "441", "2", "32", "64605.44", "0.02", "0.00", "N", "O", "1995-08-14", "1995-10-07", "1995-08-27", "COLLECT COD", "AIR", "lithely regular deposits. fluffily "]
["order", "32", "130057", "O", "208660.75", "1995-07-16", "2-HIGH", "Clerk#000000616", "0", "ise blithely bold, regular requests. quickly unusual dep", "line_item", "32", "44161", "6666", "3", "2", "2210.32", "0.09", "0.02", "N", "O", "1995-08-07", "1995-10-07", "1995-08-23", "DELIVER IN PERSON", "AIR", " express accounts wake according to the"]
["order", "32", "130057", "O", "208660.75", "1995-07-16", "2-HIGH", "Clerk#000000616", "0", "ise blithely bold, regular requests. quickly unusual dep", "line_item", "32", "2743", "7744", "4", "4", "6582.96", "0.09", "0.03", "N", "O", "1995-08-04", "1995-10-01", "1995-09-03", "NONE", "REG AIR", "e slyly final pac"]
In [ ]:
mr = MapReduce()
def mapper(record):
# COMPLETE
def reducer(key, list_of_values):
# COMPLETE
inputdata = open(os.path.join(DATA_DIR, "records.json"))
mr.execute(inputdata, mapper, reducer)
In [ ]: