24장 맵리듀스

  • mapper 함수로 각 아이템을 0개 이상의 키-값 쌍으로 변환
  • 동일한 키를 가진 모든 키-값 쌍을 모은다.
  • reducer 함수로 같은 키를 가진 쌍들에 대해 원하는 방식으로 처리된 거친값을 반환

24.1 예시: 단어 수 세기


In [4]:
from __future__ import division
import math, random, re, datetime
from collections import defaultdict, Counter
from functools import partial
from naive_bayes import tokenize

def word_count_old(documents):
    """맵리듀스를 사용하지 않고 단어의 빈도 수를 세어 줌"""
    return Counter(word 
        for document in documents 
        for word in tokenize(document))

In [6]:
def wc_mapper(document):
    """문서의 각 단어마다 (단어, 1)을 보냄"""
    for word in tokenize(document):
        yield (word, 1)
        
def wc_reducer(word, counts):
    """단어의 모든 빈도 수를 더함"""
    yield (word, sum(counts))

def word_count(documents):
    """맵리듀스를 사용해서 입려 문서의 빈도 수를 세어 줌"""

    # 쌍으로 묶인 값을 저장할 공간
    collector = defaultdict(list)

    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)

    return [output
            for word, counts in collector.items()
            for output in wc_reducer(word, counts)]

24.2 왜 맵리듀스인가?

  • 수십억 개의 문서가 100대의 컴퓨터에 흩어져 있다.
  • 각 컴퓨터는 자신이 보유하고 있는 문서에 mapper를 수행하고, 여러 쌍의 key-value를 생성
  • key-value 쌍들을 지정된 몇 개의 reduce 작업을 수행하는 컴퓨터로 보냄
  • 리듀스 작업을 수행하는 각 컴퓨터는 키별로 쌍들을 묶고 reducer를 실행
  • 각 key-최종 값 쌍을 반환

24.3 맵리듀스 일반화하기


In [7]:
def map_reduce(inputs, mapper, reducer):
    """mapper와 reducer를 사용해서 inputs에 맵리듀스를 적용"""
    collector = defaultdict(list)

    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)

    return [output
            for key, values in collector.items()
            for output in reducer(key,values)]

def reduce_with(aggregation_fn, key, values):
    """reduces a key-values pair by applying aggregation_fn to the values"""
    yield (key, aggregation_fn(values))

def values_reducer(aggregation_fn):
    """turns a function (values -> output) into a reducer"""
    return partial(reduce_with, aggregation_fn)

sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))

24.4 예시: 사용자의 글 분석하기


In [8]:
status_updates = [
    {"id": 1,
     "username" : "joelgrus",
     "text" : "Is anyone interested in a data science book?",
     "created_at" : datetime.datetime(2013, 12, 21, 11, 47, 0),
     "liked_by" : ["data_guy", "data_gal", "bill"] },
    # add your own
]

def data_science_day_mapper(status_update):
    """yields (day_of_week, 1) if status_update contains "data science" """
    if "data science" in status_update["text"].lower():
        day_of_week = status_update["created_at"].weekday()
        yield (day_of_week, 1)

data_science_days = map_reduce(status_updates,
                               data_science_day_mapper,
                               sum_reducer)

In [9]:
def words_per_user_mapper(status_update):
    user = status_update["username"]
    for word in tokenize(status_update["text"]):
        yield (user, (word, 1))

def most_popular_word_reducer(user, words_and_counts):
    """given a sequence of (word, count) pairs,
    return the word with the highest total count"""

    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count

    word, count = word_counts.most_common(1)[0]

    yield (user, (word, count))

user_words = map_reduce(status_updates,
                        words_per_user_mapper,
                        most_popular_word_reducer)

def liker_mapper(status_update):
    user = status_update["username"]
    for liker in status_update["liked_by"]:
        yield (user, liker)

distinct_likers_per_user = map_reduce(status_updates,
                                      liker_mapper,
                                      count_distinct_reducer)

24.5 예시: 행렬 연산


In [10]:
def matrix_multiply_mapper(m, element):
    """m is the common dimension (columns of A, rows of B)
    element is a tuple (matrix_name, i, j, value)"""
    matrix, i, j, value = element

    if matrix == "A":
        for column in range(m):
            # A_ij is the jth entry in the sum for each C_i_column
            yield((i, column), (j, value))
    else:
        for row in range(m):
            # B_ij is the ith entry in the sum for each C_row_j
            yield((row, j), (i, value))

def matrix_multiply_reducer(m, key, indexed_values):
    results_by_index = defaultdict(list)
    for index, value in indexed_values:
        results_by_index[index].append(value)

    # sum up all the products of the positions with two results
    sum_product = sum(results[0] * results[1]
                      for results in results_by_index.values()
                      if len(results) == 2)

    if sum_product != 0.0:
        yield (key, sum_product)

In [11]:
documents = ["data science", "big data", "science fiction"]

wc_mapper_results = [result
                     for document in documents
                     for result in wc_mapper(document)]

print("wc_mapper results")
print(wc_mapper_results)
print()

print("word count results")
print(word_count(documents))
print()

print("word count using map_reduce function")
print(map_reduce(documents, wc_mapper, wc_reducer))
print()

print("data science days")
print(data_science_days)
print()

print("user words")
print(user_words)
print()

print("distinct likers")
print(distinct_likers_per_user)
print()

# matrix multiplication

entries = [("A", 0, 0, 3), ("A", 0, 1,  2),
       ("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)

print("map-reduce matrix multiplication")
print("entries:", entries)
print("result:", map_reduce(entries, mapper, reducer))


wc_mapper results
[('science', 1), ('data', 1), ('big', 1), ('data', 1), ('fiction', 1), ('science', 1)]

word count results
[('big', 1), ('science', 2), ('data', 2), ('fiction', 1)]

word count using map_reduce function
[('big', 1), ('science', 2), ('data', 2), ('fiction', 1)]

data science days
[(5, 1)]

user words
[('joelgrus', ('science', 1))]

distinct likers
[('joelgrus', 3)]

map-reduce matrix multiplication
entries: [('A', 0, 0, 3), ('A', 0, 1, 2), ('B', 0, 0, 4), ('B', 0, 1, -1), ('B', 1, 0, 10)]
result: [((0, 1), -3), ((0, 0), 32)]

In [ ]: