In [9]:
%%writefile mr_word_count.py
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, line):
yield "chars", len(line)
yield "words", len(line.split())
yield "lines", 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRWordFrequencyCount.run()
In [28]:
%%writefile my_file.txt
Hello cat people!
All cats and friends of cats are welcome to ride the catamaran!
Cats are friends to people in need.
Simple way to run the job locally
In [11]:
!python mr_word_count.py my_file.txt
Simulate hadoop
In [12]:
!python mr_word_count.py -r local my_file.txt
No debugging messages
In [66]:
!python mr_word_count.py -r local -q my_file.txt
Use python driver
In [165]:
# %%writefile test.py
from mr_word_count import MRWordFrequencyCount
mr_job = MRWordFrequencyCount(args=['my_file.txt'])
# with mr_job.make_runner() as runner:
# runner.run()
# for line in runner.stream_output():
# print(mr_job.parse_output_line(line))
In [14]:
!python test.py
In [102]:
# Hacky way to make it work
temp = !python test.py
for t in temp:
print(eval(t))
In [12]:
%%writefile top_word.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
WORD_RE = re.compile(r"[\w']+")
class MRMostUsedWord(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_words,
combiner=self.combiner_count_words,
reducer=self.reducer_count_words),
MRStep(reducer=self.reducer_find_max_word)
]
def mapper_get_words(self, _, line):
# yield each word in the line
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
def combiner_count_words(self, word, counts):
# optimization: sum the words we've seen so far
yield (word, sum(counts))
def reducer_count_words(self, word, counts):
# send all (num_occurrences, word) pairs to the same reducer.
# num_occurrences is so we can easily use Python's max() function.
yield None, (sum(counts), word)
# discard the key; it is just None
def reducer_find_max_word(self, _, word_count_pairs):
# each item of word_count_pairs is (count, word),
# so yielding one results in key=counts, value=word
yield max(word_count_pairs)
if __name__ == '__main__':
MRMostUsedWord.run()
In [163]:
temp = [1,2,3,4,5,6,7]
temp[3:3] = [temp[3]]*5
temp
Out[163]:
In [15]:
!python top_word.py -r local my_file.txt
Word count where we try to not use builtin abstractions
In [49]:
import sys
sys.stderr
In [50]:
print("cat", file=sys.stderr)
In [91]:
%%writefile word_count_no_abstractions.py
import re
import sys
from mrjob.job import MRJob
from mrjob.step import MRStep
WORD_RE = re.compile(r"[\w']+")
class WordCount(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_emit_words,
combiner=self.combiner_count_words,
reducer=self.reducer_count_words),
MRStep(mapper=self.mapper_make_cap,
reducer_init=self.reducer_init_for_status,
reducer=self.reducer_find_max_word)
]
def mapper_emit_words(self, _, line):
self.increment_counter('stats', 'characters', len(line))
self.increment_counter('stats', 'lines', 1)
for word in WORD_RE.findall(line.lower()):
self.increment_counter('stats', 'words', 1)
yield (word, 1)
def combiner_count_words(self, word, count):
yield (word, sum(count))
def reducer_count_words(self, word, count):
yield (word, sum(count))
def mapper_make_cap(self, word, count):
yield None, (word.upper(), count)
def reducer_init_for_status(self):
self.status = 0
def reducer_find_max_word(self, _, word_count):
if self.status == 0:
self.set_status(type(word_count))
self.status = 1
yield max(word_count, key=lambda x: x[1])
if __name__ == "__main__":
WordCount.run()
In [92]:
!python word_count_no_abstractions.py my_file.txt
In [108]:
# Make a sample of the file
!head -n 2000 data/anonymous-msweb.data.txt > data/anonymous-msweb.data.sample.txt
In [144]:
# View a few instances of the file
!tail -n 10 data/anonymous-msweb.data.sample.txt
In [147]:
%%writefile AtLeastKViews.py
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.compat import jobconf_from_env
class AtLeastKViews(MRJob):
"""
Given the Vlog dataset, returns all the pages that have over K views
along with the view count.
usage:
$ python AtLeastKViews.py [--jobconf "k=1000"] [-q] <data>
"""
def steps(self):
"""
Defines a single step job
"""
mr_steps = [MRStep(mapper=self.mapper_filter_only_Vs,
combiner=self.combiner_sum_sites,
reducer_init=self.reducer_init,
reducer=self.reducer_sum_and_filter_sites)]
return mr_steps
def mapper_filter_only_Vs(self, _, lines):
if lines[0] == "V":
terms = lines.split(",")
yield(terms[1], 1)
def combiner_sum_sites(self, site, count):
# Remember, the mapper returns a generator and the value
# is a generator as well.
yield(site, sum(count))
def reducer_init(self):
# Good example on how to define arguments to use
self.greater_than = int(jobconf_from_env("k", default=50))
def reducer_sum_and_filter_sites(self, site, count):
site_count = sum(count)
if site_count >= self.greater_than:
yield(site, site_count)
if __name__ == "__main__":
AtLeastKViews.run()
In [155]:
!python AtLeastKViews.py --jobconf "k=1000" -q data/anonymous-msweb.data.txt