HW2.0: What is a race condition in the context of parallel computation? Give an example.
What is MapReduce?
How does it differ from Hadoop?
Which programming paradigm is Hadoop based on? Explain and give a simple example in code and show the code running.
A race condition is when a variable's value can be different depending on the order of completion of parallel processes. For example, if two processes access a shared variable at the same time, then one would overwrite the other (eg. both processes increment a variable, then the variable has been incremented by 1); however, if one process completes before another process accesses the variable, then the second process would act on the new value instead of the original value (eg. the second process would increment, the already incremented variable, resulting in adding 2)
MapReduce is a parallel programing model for processing big data developed by Google, in which mappers operate on chunks of the data producing key, vlaue pairs, which are sent to a reducer to be combined for the final results. Hadoop is a popular open-source implementation of MapReduce that includes a distributed file system and fault tolerance.
Hadoop is based on a functional programming paradigm. A simple example of a Hadoop program is counting words. The map is simply to emit each word in the data with a count of 1, then in the reduce, the word keys are combined and the counts are summed to get a total count for each word in the data set. This example is shown below.
In [148]:
# make directory for problem and change to that dir
!mkdir ~/Documents/W261/hw2/hw2_0/
%cd ~/Documents/W261/hw2/hw2_0/
!cp ../enronemail_1h_cleaned.txt ./
In [149]:
%%writefile ./mapper.py
#!/usr/bin/python
import sys
import re
from collections import defaultdict
# regular expression for extracting word tokens from email content
WORD_RE = re.compile(r"[\w']+")
# Word to count passed in as command line argument
vocabwords = sys.argv[1].lower().split(' ')
# input as stdin
for line in sys.stdin:
# email content is in last column of input file
linelist = line.split('\t')
content = linelist[-1]
# extract words from content
words = WORD_RE.findall(content.lower())
# emit (word,1)
for w in words:
print w+",1\n"
In [155]:
%%writefile reducer.py
#!/usr/bin/python
from operator import itemgetter
import sys
from collections import defaultdict
# initialize word count dictionary
word_counts = defaultdict(int)
# input comes from STDIN
for line in sys.stdin:
# get count from fourth column and increment total
line = line.strip()
word = line.split(',')[0]
word_counts[word]+=1
# output word, count
for w,c in word_counts.iteritems():
print w+'\t'+str(c)
In [156]:
# Run mapper and reducer cells first to write mapper.py and reducer.py
def hw2_0():
# change to problem dir and copy data into dir
%cd ~/Documents/W261/hw2/hw2_0/
!cp ../enronemail_1h_cleaned.txt ./
# put data file into HDFS
!hdfs dfs -rm enronemail_1h_cleaned.txt
!hdfs dfs -put enronemail_1h_cleaned.txt /user/davidadams
# run Hadoop streaming mapreduce with mapper.py and reducer.py
!hdfs dfs -rm -r hw2_0Output
!hadoop jar ~/Documents/W261/hw2/hadoop-*streaming*.jar -mapper "mapper.py 'assistance'" -reducer reducer.py -input enronemail_1h_cleaned.txt -output hw2_0Output
# show results
!hdfs dfs -cat hw2_0Output/part-00000
hw2_0()
HW2.1.: Sort in Hadoop MapReduce
Given as input: Records of the form (integer, “NA”), where integer is any integer, and “NA” is just the empty string.
Output: sorted key value pairs of the form (integer, “NA”); what happens if you have multiple reducers? Do you need additional steps? Explain.
Write code to generate N random records of the form (integer, “NA”). Let N = 10,000.
Write the python Hadoop streaming map-reduce job to perform this sort.
In [121]:
'''
HW2.1.: Sort in Hadoop MapReduce
Given as input: Records of the form <integer, “NA”>,
where integer is any integer, and “NA” is just the empty string.
Output: sorted key value pairs of the form <integer, “NA”>
Write code to generate N random records of the form <integer, “NA”>.
Let N = 10,000.
Write the python Hadoop streaming map-reduce job to perform this sort.
'''
# make directory for problem and change to that dir
!mkdir ~/Documents/W261/hw2/hw2_1/
%cd ~/Documents/W261/hw2/hw2_1/
In [122]:
%%writefile mapper.py
#!/usr/bin/python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split(',')
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited
print '%d\t%s' % (int(words[0]), words[1])
In [123]:
%%writefile reducer.py
#!/usr/bin/python
from operator import itemgetter
import sys
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
record_int, record_str = line.split('\t')
# convert count (currently a string) to int
try:
record_int = int(record_int)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# Hadoop sorts map output
# by key (here: count) before it is passed to the reducer
print '%d\t%s' % (record_int, record_str)
In [124]:
# Run mapper and reducer cells first to write mapper.py and reducer.py
def hw2_1():
!hdfs dfs -mkdir -p /user/davidadams #this dir seems to have to match the computer's user and my husband (davidadams) set up this computer years ago
import numpy as np
from numpy.random import randint
def make_randomrecords(N, datafile):
'''
Inputs: N = number of random records to generate
datafile = file to store random records into
Generates N random numbers and stores as <n,"NA">
'''
with open(datafile, 'w') as f:
# generate N random integers in the range [0,N)
randrecord_ints = randint(N, size=N)
# write n,"NA" to data file
for i in randrecord_ints:
f.write(str(i)+',NA\n')
# generate 10,000 random records to sort
N = 10000
datafile = 'randomrecords.txt'
make_randomrecords(N, datafile)
# put data file into HDFS
!hdfs dfs -rm randomrecords.txt
!hdfs dfs -put randomrecords.txt /user/davidadams
# run Hadoop streaming mapreduce with mapper.py and reducer.py
!hdfs dfs -rm -r sortedrecordsOutput
!hadoop jar ~/Documents/W261/hw2/hadoop-*streaming*.jar -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapred.text.key.comparator.options=-n -mapper mapper.py -reducer reducer.py -input randomrecords.txt -output sortedrecordsOutput -numReduceTasks 2
# show sorted results
!hdfs dfs -cat sortedrecordsOutput/part-00000
hw2_1()
What happens if you have multiple reducers? Do you need additional steps? Explain.
If you have multiple reducers, then each reducer outputs results sorted by the keys that it receives; however, by default the reducers do not receive continuous ranges of keys so the results are divided between the reducer outputs and are only sorted within reducer outputs not among them. You would need to add a step to define how the keys are to be distributed to the reducers. For example, all integer keys between 0 and 4,999 could go to reducer 1 and 5,000 through 10,000 could go to reducer 2.
HW2.2: Count the occurrences of a single word in the Enron emails
In [125]:
'''
HW2.2. Using the Enron data from HW1 and Hadoop MapReduce streaming,
write mapper/reducer pair that will determine the number of
occurrences of a single, user-specified word.
Examine the word “assistance” and report your results.
- mapper.py counts all occurrences of a single word, and
- reducer.py collates the counts of the single word.
CROSSCHECK: >grep assistance enronemail_1h.txt|cut -d$'\t' -f4| grep assistance|wc -l
8
'''
# make directory for problem and change to that dir
!mkdir ~/Documents/W261/hw2/hw2_2/
%cd ~/Documents/W261/hw2/hw2_2/
!cp ../enronemail_1h_cleaned.txt ./
In [126]:
%%writefile ./mapper.py
#!/usr/bin/python
import sys
import re
from collections import defaultdict
# regular expression for extracting word tokens from email content
WORD_RE = re.compile(r"[\w']+")
# Word to count passed in as command line argument
vocabwords = sys.argv[1].lower().split(' ')
# input as stdin
for line in sys.stdin:
# email content is in last column of input file
linelist = line.split('\t')
content = linelist[-1]
# extract words from content
words = WORD_RE.findall(content.lower())
# Initialize word counts
vocabWordsCount = dict()
for v in vocabwords:
vocabWordsCount[v]=0
# loop over words in email
for w in words:
if w in vocabWordsCount.keys():
# if word is in vocab, increment count
vocabWordsCount[w]+=1
# output count of each vocab word in each email
for k in vocabWordsCount.keys():
print linelist[0]+'\t'+linelist[1]+'\t'+k+'\t'+str(vocabWordsCount[k])
In [127]:
%%writefile reducer.py
#!/usr/bin/python
from operator import itemgetter
import sys
# initialize total count
total_count = 0
# input comes from STDIN
for line in sys.stdin:
# get count from fourth column and increment total
line = line.strip()
count = int(line.split('\t')[3])
total_count+=count
# output total count
print total_count
In [132]:
# Run mapper and reducer cells first to write mapper.py and reducer.py
def hw2_2():
# change to problem dir and copy data into dir
%cd ~/Documents/W261/hw2/hw2_2/
!cp ../enronemail_1h_cleaned.txt ./
# put data file into HDFS
!hdfs dfs -rm enronemail_1h_cleaned.txt
!hdfs dfs -put enronemail_1h_cleaned.txt /user/davidadams
# run Hadoop streaming mapreduce with mapper.py and reducer.py
!hdfs dfs -rm -r hw2_2Output
!hadoop jar ~/Documents/W261/hw2/hadoop-*streaming*.jar -mapper "mapper.py 'assistance'" -reducer reducer.py -input enronemail_1h_cleaned.txt -output hw2_2Output
# show results
total = !hdfs dfs -cat hw2_2Output/part-00000
print "\n\nThere are %s emails that contain the word assistance" %total[-1]
hw2_2()
In [133]:
'''
HW2.3. Using the Enron data from HW1 and Hadoop MapReduce,
write a mapper/reducer pair that will classify the email messages
by a single, user-specified word.
Examine the word “assistance” and report your results.
- mapper.py
- reducer.py performs a single word multinomial Naive Bayes classification.
'''
# make directory for problem and change to that dir
!mkdir ~/Documents/W261/hw2/hw2_3/
%cd ~/Documents/W261/hw2/hw2_3/
!cp ../enronemail_1h_cleaned.txt ./
In [134]:
%%writefile ./mapper.py
#!/usr/bin/python
import sys
import re
from collections import defaultdict
# regular expression for extracting word tokens from email content
WORD_RE = re.compile(r"[\w']+")
# Word to count passed in as command line argument
vocabwords = sys.argv[1].lower().split(' ')
# input as stdin
for line in sys.stdin:
# email content is in last column of input file
linelist = line.split('\t')
content = linelist[-1]
# extract words from content
words = WORD_RE.findall(content.lower())
# Initialize word counts
vocabWordsCount = dict()
for v in vocabwords:
vocabWordsCount[v]=0
# loop over words in email
for w in words:
if w in vocabWordsCount.keys():
# if word is in vocab, increment count
vocabWordsCount[w]+=1
# output count of each vocab word in each email
for k in vocabWordsCount.keys():
print linelist[0]+'\t'+linelist[1]+'\t'+k+'\t'+str(vocabWordsCount[k])
In [135]:
%%writefile ./reducer.py
#!/usr/bin/python
import sys
from collections import defaultdict
from math import log
# initialize counters
vocabWordCounts_spam = defaultdict(int)
vocabWordCounts_ham = defaultdict(int)
totalCount = 0
spamCount = 0
wordcounts = defaultdict(dict)
# initialize set of email ids in spam and ham
spam_ids = set()
ham_ids = set()
# Input from from mappers as stdin
for line in sys.stdin:
line = line.strip()
linelist = line.split('\t')
# add count of word on line to dictionary by email ids
# {email_ids: {word1: count_of_word1, word2: count_of_word2,...}}
wordcounts[linelist[0]][linelist[2]]=int(linelist[3])
# class label for email is in second column
spam = int(linelist[1])
# add email id to correct set (spam or ham) and increment word counts
if spam==1:
spam_ids.add(linelist[0])
vocabWordCounts_spam[linelist[2]]+=int(linelist[3])
else:
ham_ids.add(linelist[0])
vocabWordCounts_ham[linelist[2]]+=int(linelist[3])
# number of emails in spam, ham, and total
spamCount = len(spam_ids)
hamCount = len(ham_ids)
totalCount = spamCount+hamCount
# prior probabilities of spam and ham calculated as (number of spam emails)/(total number of emails)
prior_spam = 1.0*spamCount/totalCount
prior_ham = 1.0-prior_spam
# sum number of words in vocab in spam and ham
total_all_vocab_in_spam = 0
total_all_vocab_in_ham = 0
for vw in vocabWordCounts_spam.keys():
total_all_vocab_in_spam+=vocabWordCounts_spam[vw]
for vw in vocabWordCounts_ham.keys():
total_all_vocab_in_ham+=vocabWordCounts_ham[vw]
# calc the total number of words in vocab as union of words in spam and ham
vocab_spam = set(vocabWordCounts_spam.keys())
vocab_ham = set(vocabWordCounts_ham.keys())
vocab = vocab_spam.union(vocab_ham)
vocab_size = len(vocab)
# initialize dicts for storing conditional probabilities for each vocab word in spam/ham
pr_vw_given_spam = dict()
pr_vw_given_ham = dict()
'''
Conditional Probabilities calculated as:
pr_vw_given_spam = (total_vw_in_spam+1)/(total_all_vocab_in_spam + vocab_size)
pr_vw_given_ham = (total_vw_in_ham+1)/(total_all_vocab_in_ham + vocab_size)
'''
# loop over all vocab words and store conditional probabilities
for vw in vocab:
if vw in vocabWordCounts_spam.keys():
pr_vw_given_spam[vw]=1.0*(vocabWordCounts_spam[vw]+1)/(total_all_vocab_in_spam + vocab_size)
else:
pr_vw_given_spam[vw]=1.0*(0+1)/(total_all_vocab_in_spam + vocab_size)
if vw in vocabWordCounts_ham.keys():
pr_vw_given_ham[vw]=1.0*(vocabWordCounts_ham[vw]+1)/(total_all_vocab_in_ham + vocab_size)
else:
pr_vw_given_ham[vw]=1.0*(0+1)/(total_all_vocab_in_ham + vocab_size)
# loop over emails to calculate spam/ham scores and make NB prediction
for email_id,email in wordcounts.iteritems():
# initialize scores as priors
spam_score = log(prior_spam)
ham_score = log(prior_ham)
# loop over vocab words in email and counts
for w,c in email.iteritems():
# add count*conditional probability for each vocab word in email
spam_score+=c*log(pr_vw_given_spam[w])
ham_score+=c*log(pr_vw_given_ham[w])
# true class
spam = 1*(email_id in spam_ids)
# predicted class
predict = 1*(spam_score>ham_score)
# output results as 'ID \t truth \t prediction'
print email_id+'\t'+str(spam)+'\t'+str(predict)
In [139]:
# Run mapper and reducer cells first to write mapper.py and reducer.py
def hw2_3():
def score(outputfile):
'''
Input: reducer Naive Bayes output file with format
ID \t truth \t prediction
Returns: Accuracy of classifier prediction
'''
# initialize count of examples and correct predictions
n = 0
correct=0
# read lines from reducer output file
with open (outputfile, "r") as outfile:
for line in outfile.readlines():
# increment count of number of examples
n+=1
# split line
lineList = line.replace('\n','').split('\t')
# true class is in second column
truth=int(lineList[1])
# predicted class is in third column
predict=int(lineList[2])
# increment number of correct examples if prediction matches truth
correct+=(1*truth==predict)
# return percent of correct predictions (accuracy)
return 1.0*correct/n
# change to problem dir and copy data into dir
%cd ~/Documents/W261/hw2/hw2_3/
!cp ../enronemail_1h_cleaned.txt ./
# put data file into HDFS
!hdfs dfs -rm enronemail_1h_cleaned.txt
!hdfs dfs -put enronemail_1h_cleaned.txt /user/davidadams
# run Hadoop streaming mapreduce with mapper.py and reducer.py
!hdfs dfs -rm -r hw2_3Output
!hadoop jar ~/Documents/W261/hw2/hadoop-*streaming*.jar -mapper "mapper.py 'assistance'" -reducer reducer.py -input enronemail_1h_cleaned.txt -output hw2_3Output
# show output file
print '\n================================================='
!hdfs dfs -cat hw2_3Output/part-00000
# calculate and show accuracy
!hdfs dfs -cat hw2_3Output/part-00000 > 'output.txt'
outputfile = 'output.txt'
accuracy = score(outputfile)
print '\n================================================='
print '\nAccuracy: ',accuracy
hw2_3()
In [93]:
'''
HW2.4. Using the Enron data from HW1 and in the Hadoop MapReduce
framework, write a mapper/reducer pair that will classify
the email messages using multinomial Naive Bayes Classifier
using a list of one or more user-specified words.
Examine the words “assistance”, “valium”, and “enlargementWithATypo”
and report your results
- mapper.py
- reducer.py performs the multiple-word multinomial Naive Bayes
classification via the chosen list.
'''
# make directory for problem and change to that dir
!mkdir ~/Documents/W261/hw2/hw2_4/
%cd ~/Documents/W261/hw2/hw2_4/
!cp ../enronemail_1h_cleaned.txt ./
In [140]:
%%writefile ./mapper.py
#!/usr/bin/python
import sys
import re
# regular expression for extracting word tokens from email content
WORD_RE = re.compile(r"[\w']+")
# Word to count passed in as command line argument
vocabwords = sys.argv[1].lower().split(' ')
# input as stdin
for line in sys.stdin:
# email content is in last column of input file
linelist = line.split('\t')
content = linelist[-1]
# extract words from content
words = WORD_RE.findall(content.lower())
# Initialize word counts
vocabWordsCount = dict()
for v in vocabwords:
vocabWordsCount[v]=0
# loop over words in email
for w in words:
if w in vocabWordsCount.keys():
# if word is in vocab, increment count
vocabWordsCount[w]+=1
# output count of each vocab word in each email
for k in vocabWordsCount.keys():
print linelist[0]+'\t'+linelist[1]+'\t'+k+'\t'+str(vocabWordsCount[k])
In [141]:
%%writefile ./reducer.py
#!/usr/bin/python
import sys
from collections import defaultdict
from math import log
# initialize counters
vocabWordCounts_spam = defaultdict(int)
vocabWordCounts_ham = defaultdict(int)
totalCount = 0
spamCount = 0
wordcounts = defaultdict(dict)
# initialize set of email ids in spam and ham
spam_ids = set()
ham_ids = set()
# Input from from mappers as stdin
for line in sys.stdin:
line = line.strip()
linelist = line.split('\t')
# add count of word on line to dictionary by email ids
# {email_ids: {word1: count_of_word1, word2: count_of_word2,...}}
wordcounts[linelist[0]][linelist[2]]=int(linelist[3])
# class label for email is in second column
spam = int(linelist[1])
# add email id to correct set (spam or ham) and increment word counts
if spam==1:
spam_ids.add(linelist[0])
vocabWordCounts_spam[linelist[2]]+=int(linelist[3])
else:
ham_ids.add(linelist[0])
vocabWordCounts_ham[linelist[2]]+=int(linelist[3])
# number of emails in spam, ham, and total
spamCount = len(spam_ids)
hamCount = len(ham_ids)
totalCount = spamCount+hamCount
# prior probabilities of spam and ham calculated as (number of spam emails)/(total number of emails)
prior_spam = 1.0*spamCount/totalCount
prior_ham = 1.0-prior_spam
# sum number of words in vocab in spam and ham
total_all_vocab_in_spam = 0
total_all_vocab_in_ham = 0
for vw in vocabWordCounts_spam.keys():
total_all_vocab_in_spam+=vocabWordCounts_spam[vw]
for vw in vocabWordCounts_ham.keys():
total_all_vocab_in_ham+=vocabWordCounts_ham[vw]
# calc the total number of words in vocab as union of words in spam and ham
vocab_spam = set(vocabWordCounts_spam.keys())
vocab_ham = set(vocabWordCounts_ham.keys())
vocab = vocab_spam.union(vocab_ham)
vocab_size = len(vocab)
# initialize dicts for storing conditional probabilities for each vocab word in spam/ham
pr_vw_given_spam = dict()
pr_vw_given_ham = dict()
'''
Conditional Probabilities calculated as:
pr_vw_given_spam = (total_vw_in_spam+1)/(total_all_vocab_in_spam + vocab_size)
pr_vw_given_ham = (total_vw_in_ham+1)/(total_all_vocab_in_ham + vocab_size)
'''
# loop over all vocab words and store conditional probabilities
for vw in vocab:
if vw in vocabWordCounts_spam.keys():
pr_vw_given_spam[vw]=1.0*(vocabWordCounts_spam[vw]+1)/(total_all_vocab_in_spam + vocab_size)
else:
pr_vw_given_spam[vw]=1.0*(0+1)/(total_all_vocab_in_spam + vocab_size)
if vw in vocabWordCounts_ham.keys():
pr_vw_given_ham[vw]=1.0*(vocabWordCounts_ham[vw]+1)/(total_all_vocab_in_ham + vocab_size)
else:
pr_vw_given_ham[vw]=1.0*(0+1)/(total_all_vocab_in_ham + vocab_size)
# loop over emails to calculate spam/ham scores and make NB prediction
for email_id,email in wordcounts.iteritems():
# initialize scores as priors
spam_score = log(prior_spam)
ham_score = log(prior_ham)
# loop over vocab words in email and counts
for w,c in email.iteritems():
# add count*conditional probability for each vocab word in email
spam_score+=c*log(pr_vw_given_spam[w])
ham_score+=c*log(pr_vw_given_ham[w])
# true class
spam = 1*(email_id in spam_ids)
# predicted class
predict = 1*(spam_score>ham_score)
# output results as 'ID \t truth \t prediction'
print email_id+'\t'+str(spam)+'\t'+str(predict)
In [143]:
# Run mapper and reducer cells first to write mapper.py and reducer.py
def hw2_4():
def score(outputfile):
'''
Input: reducer Naive Bayes output file with format
ID \t truth \t prediction
Returns: Accuracy of classifier prediction
'''
# initialize count of examples and correct predictions
n = 0
correct=0
# read lines from reducer output file
with open (outputfile, "r") as outfile:
for line in outfile.readlines():
# increment count of number of examples
n+=1
# split line
lineList = line.replace('\n','').split('\t')
# true class is in second column
truth=int(lineList[1])
# predicted class is in third column
predict=int(lineList[2])
# increment number of correct examples if prediction matches truth
correct+=(1*truth==predict)
# return percent of correct predictions (accuracy)
return 1.0*correct/n
# change to problem dir and copy data into dir
%cd ~/Documents/W261/hw2/hw2_4/
!cp ../enronemail_1h_cleaned.txt ./
# put data file into HDFS
!hdfs dfs -rm enronemail_1h_cleaned.txt
!hdfs dfs -put enronemail_1h_cleaned.txt /user/davidadams
# run Hadoop streaming mapreduce with mapper.py and reducer.py
!hdfs dfs -rm -r hw2_4Output
!hadoop jar ~/Documents/W261/hw2/hadoop-*streaming*.jar -mapper "mapper.py 'assistance valium enlargementWithATypo'" -reducer reducer.py -input enronemail_1h_cleaned.txt -output hw2_4Output
# show output file
print '\n================================================='
!hdfs dfs -cat hw2_3Output/part-00000
# calculate and show accuracy
!hdfs dfs -cat hw2_4Output/part-00000 > 'output.txt'
outputfile = 'output.txt'
accuracy = score(outputfile)
print '\n================================================='
print 'Accuracy: ',accuracy
hw2_4()
In [144]:
'''
HW2.5. Using the Enron data from HW1 an in the Hadoop MapReduce
framework, write a mapper/reducer for a multinomial Naive Bayes
Classifier that will classify the email messages using words present.
Also drop words with a frequency of less than three (3).
How does it affect the misclassifcation error of learnt
naive multinomial Bayesian Classifiers on the training dataset:
'''
# make directory for problem and change to that dir
!mkdir ~/Documents/W261/hw2/hw2_5/
%cd ~/Documents/W261/hw2/hw2_5/
!cp ../enronemail_1h_cleaned.txt ./
In [145]:
%%writefile ./mapper.py
#!/usr/bin/python
import sys
import re
from collections import defaultdict
# regular expression for extracting word tokens from email content
WORD_RE = re.compile(r"[\w']+")
# Word to count passed in as command line argument
vocabwords = sys.argv[1].lower().split(' ')
# input as stdin
for line in sys.stdin:
# email content is in last column of input file
linelist = line.split('\t')
content = linelist[-1]
# extract word tokens from content
words = WORD_RE.findall(content.lower())
# Initialize word counts
vocabWordsCount = defaultdict(int)
# loop over words in email and increment counts
for w in words:
vocabWordsCount[w]+=1
# output count of each vocab word in each email
for k in vocabWordsCount.keys():
print linelist[0]+'\t'+linelist[1]+'\t'+k+'\t'+str(vocabWordsCount[k])
In [146]:
%%writefile ./reducer.py
#!/usr/bin/python
import sys
from collections import defaultdict
from math import log
# initialize counters
vocabWordCounts_spam = defaultdict(int)
vocabWordCounts_ham = defaultdict(int)
totalCount = 0
spamCount = 0
wordcounts = defaultdict(dict)
# initialize set of email ids in spam and ham
spam_ids = set()
ham_ids = set()
# Input from from mappers as stdin
for line in sys.stdin:
line = line.strip()
linelist = line.split('\t')
# add count of word on line to dictionary by email ids
# {email_ids: {word1: count_of_word1, word2: count_of_word2,...}}
wordcounts[linelist[0]][linelist[2]]=int(linelist[3])
# class label for email is in second column
spam = int(linelist[1])
# add email id to correct set (spam or ham) and increment word counts
if spam==1:
spam_ids.add(linelist[0])
vocabWordCounts_spam[linelist[2]]+=int(linelist[3])
else:
ham_ids.add(linelist[0])
vocabWordCounts_ham[linelist[2]]+=int(linelist[3])
# number of emails in spam, ham, and total
spamCount = len(spam_ids)
hamCount = len(ham_ids)
totalCount = spamCount+hamCount
# prior probabilities of spam and ham calculated as (number of spam emails)/(total number of emails)
prior_spam = 1.0*spamCount/totalCount
prior_ham = 1.0-prior_spam
# sum number of words in vocab in spam and ham
total_all_vocab_in_spam = 0
total_all_vocab_in_ham = 0
for vw in vocabWordCounts_spam.keys():
total_all_vocab_in_spam+=vocabWordCounts_spam[vw]
for vw in vocabWordCounts_ham.keys():
total_all_vocab_in_ham+=vocabWordCounts_ham[vw]
# calc the total number of words in vocab as union of words in spam and ham
vocab_spam = set(vocabWordCounts_spam.keys())
vocab_ham = set(vocabWordCounts_ham.keys())
vocab = vocab_spam.union(vocab_ham)
vocab_size = len(vocab)
# initialize dicts for storing conditional probabilities for each vocab word in spam/ham
pr_vw_given_spam = dict()
pr_vw_given_ham = dict()
'''
Conditional Probabilities calculated as:
pr_vw_given_spam = (total_vw_in_spam+1)/(total_all_vocab_in_spam + vocab_size)
pr_vw_given_ham = (total_vw_in_ham+1)/(total_all_vocab_in_ham + vocab_size)
'''
# loop over all vocab words and store conditional probabilities
for vw in vocab:
if vw in vocabWordCounts_spam.keys():
pr_vw_given_spam[vw]=1.0*(vocabWordCounts_spam[vw]+1)/(total_all_vocab_in_spam + vocab_size)
else:
pr_vw_given_spam[vw]=1.0*(0+1)/(total_all_vocab_in_spam + vocab_size)
if vw in vocabWordCounts_ham.keys():
pr_vw_given_ham[vw]=1.0*(vocabWordCounts_ham[vw]+1)/(total_all_vocab_in_ham + vocab_size)
else:
pr_vw_given_ham[vw]=1.0*(0+1)/(total_all_vocab_in_ham + vocab_size)
# loop over emails to calculate spam/ham scores and make NB prediction
for email_id,email in wordcounts.iteritems():
# initialize scores as priors
spam_score = log(prior_spam)
ham_score = log(prior_ham)
# loop over vocab words in email and counts
for w,c in email.iteritems():
# add count*conditional probability for each vocab word in email
spam_score+=c*log(pr_vw_given_spam[w])
ham_score+=c*log(pr_vw_given_ham[w])
# true class
spam = 1*(email_id in spam_ids)
# predicted class
predict = 1*(spam_score>ham_score)
# output results as 'ID \t truth \t prediction'
print email_id+'\t'+str(spam)+'\t'+str(predict)
In [147]:
def hw2_5():
def score(outputfile):
'''
Input: reducer Naive Bayes output file with format
ID \t truth \t prediction
Returns: Accuracy of classifier prediction
'''
# initialize count of examples and correct predictions
n = 0
correct=0
# read lines from reducer output file
with open (outputfile, "r") as outfile:
for line in outfile.readlines():
# increment count of number of examples
n+=1
# split line
lineList = line.replace('\n','').split('\t')
# true class is in second column
truth=int(lineList[1])
# predicted class is in third column
predict=int(lineList[2])
# increment number of correct examples if prediction matches truth
correct+=(1*truth==predict)
# return percent of correct predictions (accuracy)
return 1.0*correct/n
# change to problem dir and copy data into dir
%cd ~/Documents/W261/hw2/hw2_5/
!cp ../enronemail_1h_cleaned.txt ./
# put data file into HDFS
!hdfs dfs -rm enronemail_1h_cleaned.txt
!hdfs dfs -put enronemail_1h_cleaned.txt /user/davidadams
# run Hadoop streaming mapreduce with mapper.py and reducer.py
!hdfs dfs -rm -r hw2_5Output
!hadoop jar ~/Documents/W261/hw2/hadoop-*streaming*.jar -mapper "mapper.py '*'" -reducer reducer.py -input enronemail_1h_cleaned.txt -output hw2_5Output
# show output file
print '\n================================================='
!hdfs dfs -cat hw2_5Output/part-00000
# calculate and show accuracy
!hdfs dfs -cat hw2_5Output/part-00000 > 'output.txt'
outputfile = 'output.txt'
accuracy = score(outputfile)
print 'Accuracy: ',accuracy
hw2_5()
In [ ]: