24.1 예시: 단어 수 세기
In [4]:
from __future__ import division
import math, random, re, datetime
from collections import defaultdict, Counter
from functools import partial
from naive_bayes import tokenize
def word_count_old(documents):
"""맵리듀스를 사용하지 않고 단어의 빈도 수를 세어 줌"""
return Counter(word
for document in documents
for word in tokenize(document))
In [6]:
def wc_mapper(document):
"""문서의 각 단어마다 (단어, 1)을 보냄"""
for word in tokenize(document):
yield (word, 1)
def wc_reducer(word, counts):
"""단어의 모든 빈도 수를 더함"""
yield (word, sum(counts))
def word_count(documents):
"""맵리듀스를 사용해서 입려 문서의 빈도 수를 세어 줌"""
# 쌍으로 묶인 값을 저장할 공간
collector = defaultdict(list)
for document in documents:
for word, count in wc_mapper(document):
collector[word].append(count)
return [output
for word, counts in collector.items()
for output in wc_reducer(word, counts)]
24.2 왜 맵리듀스인가?
24.3 맵리듀스 일반화하기
In [7]:
def map_reduce(inputs, mapper, reducer):
"""mapper와 reducer를 사용해서 inputs에 맵리듀스를 적용"""
collector = defaultdict(list)
for input in inputs:
for key, value in mapper(input):
collector[key].append(value)
return [output
for key, values in collector.items()
for output in reducer(key,values)]
def reduce_with(aggregation_fn, key, values):
"""reduces a key-values pair by applying aggregation_fn to the values"""
yield (key, aggregation_fn(values))
def values_reducer(aggregation_fn):
"""turns a function (values -> output) into a reducer"""
return partial(reduce_with, aggregation_fn)
sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))
24.4 예시: 사용자의 글 분석하기
In [8]:
status_updates = [
{"id": 1,
"username" : "joelgrus",
"text" : "Is anyone interested in a data science book?",
"created_at" : datetime.datetime(2013, 12, 21, 11, 47, 0),
"liked_by" : ["data_guy", "data_gal", "bill"] },
# add your own
]
def data_science_day_mapper(status_update):
"""yields (day_of_week, 1) if status_update contains "data science" """
if "data science" in status_update["text"].lower():
day_of_week = status_update["created_at"].weekday()
yield (day_of_week, 1)
data_science_days = map_reduce(status_updates,
data_science_day_mapper,
sum_reducer)
In [9]:
def words_per_user_mapper(status_update):
user = status_update["username"]
for word in tokenize(status_update["text"]):
yield (user, (word, 1))
def most_popular_word_reducer(user, words_and_counts):
"""given a sequence of (word, count) pairs,
return the word with the highest total count"""
word_counts = Counter()
for word, count in words_and_counts:
word_counts[word] += count
word, count = word_counts.most_common(1)[0]
yield (user, (word, count))
user_words = map_reduce(status_updates,
words_per_user_mapper,
most_popular_word_reducer)
def liker_mapper(status_update):
user = status_update["username"]
for liker in status_update["liked_by"]:
yield (user, liker)
distinct_likers_per_user = map_reduce(status_updates,
liker_mapper,
count_distinct_reducer)
24.5 예시: 행렬 연산
In [10]:
def matrix_multiply_mapper(m, element):
"""m is the common dimension (columns of A, rows of B)
element is a tuple (matrix_name, i, j, value)"""
matrix, i, j, value = element
if matrix == "A":
for column in range(m):
# A_ij is the jth entry in the sum for each C_i_column
yield((i, column), (j, value))
else:
for row in range(m):
# B_ij is the ith entry in the sum for each C_row_j
yield((row, j), (i, value))
def matrix_multiply_reducer(m, key, indexed_values):
results_by_index = defaultdict(list)
for index, value in indexed_values:
results_by_index[index].append(value)
# sum up all the products of the positions with two results
sum_product = sum(results[0] * results[1]
for results in results_by_index.values()
if len(results) == 2)
if sum_product != 0.0:
yield (key, sum_product)
In [11]:
documents = ["data science", "big data", "science fiction"]
wc_mapper_results = [result
for document in documents
for result in wc_mapper(document)]
print("wc_mapper results")
print(wc_mapper_results)
print()
print("word count results")
print(word_count(documents))
print()
print("word count using map_reduce function")
print(map_reduce(documents, wc_mapper, wc_reducer))
print()
print("data science days")
print(data_science_days)
print()
print("user words")
print(user_words)
print()
print("distinct likers")
print(distinct_likers_per_user)
print()
# matrix multiplication
entries = [("A", 0, 0, 3), ("A", 0, 1, 2),
("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)
print("map-reduce matrix multiplication")
print("entries:", entries)
print("result:", map_reduce(entries, mapper, reducer))
In [ ]: