In [ ]:
%load data/numbers.txt
In [ ]:
# %load code/MRSortByString.py
from mrjob.job import MRJob
class MRSortByString(MRJob):
def mapper(self, _, line):
"""
"""
l = line.split(' ')
print l
yield l[1], l[0]
def reducer(self, key, val):
yield key, [v for v in val][0]
if __name__ == '__main__':
MRSortByString.run()
In [5]:
%run code/MRSortByString.py data/numbers.txt
How were they sorted?
In [ ]:
# %load code/MRSortByInt.py
from mrjob.job import MRJob
class MRSortByInt(MRJob):
def mapper(self, _, line):
"""
"""
l = line.strip('\n').split()
yield '%01d'%int(l[1]), l[0]
def reducer(self, key, val):
yield int(key), int(list(val)[0])
if __name__ == '__main__':
MRSortByInt.run()
In [11]:
%run code/MRSortByInt.py data/numbers.txt
In [36]:
%%writefile data/sortdata.txt
1 1
2 4
3 8
4 2
4 7
5 5
6 10
7 11
In [39]:
# Running code inline example
In [38]:
# -*- coding: utf-8 -*-
# Testing word frequency count
import os, sys
sys.path.append(os.path.join(os.getcwd(),"code"))
from MRSortByString import *
from mrjob.job import MRJob
'''
This is a simple wrapper that runs mrjob MapReduce jobs, the inputs are:
MRJobClass - the class of the job to be run
argsArr - an array of strings to be used when creating the MRJob.
@author: Peter Harrington if you have any questions: peter.b.harrington@gmail.com
'''
def runJob(MRJobClass, argsArr, loc='local'):
if loc == 'emr':
argsArr.extend(['-r', 'emr'])
print "starting %s job on %s" % (MRJobClass.__name__, loc)
mrJob = MRJobClass(args=argsArr)
runner = mrJob.make_runner()
runner.run()
print "finished %s job" % MRJobClass.__name__
return mrJob, runner
def runParallelJob(MRJobClass, argsArr): #TO DO: add threading to allow jobs to run in
pass #parallel
#launch a new thread
#call runJob(MRJobClass, argsArr) on the new thread
if __name__ == '__main__':
# pass in file from outside
# MRWordFrequencyCount.run()
#setup file here
mr_job, runner = runJob(MRSortByString,[os.path.join(os.path.join(os.getcwd(),"data"),"sortdata.txt")],"local")
print "Sorting sortdata.txt"
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
print "%s: %s "%(key,value)
Note the second column is reported by their string values