Course Lead: Dr James G. Shanahan (email Jimi via James.Shanahan AT gmail.com)
Group Members:
Jim Chen, Memphis, TN, jim.chen@ischool.berkeley.edu
Manuel Moreno, Salt Lake City, UT, momoreno@ischool.berkeley.edu
Rahul Ragunathan
Jason Sanchez, San Francisco, CA, jason.sanchez@ischool.berkeley.edu
Class: MIDS w261 Fall 2016 Group 2
Week: 5
Due Time: 2 Phases.
HW5 Phase 1 This can be done on a local machine (with a unit test on the cloud such as AltaScale's PaaS or on AWS) and is due Tuesday, Week 6 by 8AM (West coast time). It will primarily focus on building a unit/systems and for pairwise similarity calculations pipeline (for stripe documents)
HW5 Phase 2 This will require the AltaScale cluster and will be due Tuesday, Week 7 by 8AM (West coast time). The focus of HW5 Phase 2 will be to scale up the unit/systems tests to the Google 5 gram corpus. This will be a group exercise
MIDS UC Berkeley, Machine Learning at Scale DATSCIW261 ASSIGNMENT #5
Version 2016-09-25
=== INSTRUCTIONS for SUBMISSIONS === Follow the instructions for submissions carefully.
https://docs.google.com/forms/d/1ZOr9RnIe_A06AcZDB6K1mJN4vrLeSmS2PD6Xm3eOiis/viewform?usp=send_form
Data warehouse: Stores a large amount of relational, semi-structured, and unstructured data. Is used for business intelligence and data science.
A star schema has fact tables and many dimension tables that connect to the fact tables. Fact tables record events such as sales or website visits and encodes details of the events as keys (user_id, product_id, store_id, ad_id). The dimension tables store the detailed information about each of these keys.
Star schemas provide simple approached to structuring data warehouses in a relational way.
3NF means third normal form. It is used to transform large flat files that have repeated data into a linked collection of smaller tables that can be joined on a set of common keys.
Machine learning does not use data in 3NF. Instead it uses large flat files so the details that are hidden by the keys can be used in the algorithms.
Log files can track specific events of interest. A denormalized log file allows a company to track these events in real time conditioned on specific customer features. Alternatively, a model can be running that triggers appropriate responses based on the next predicted action of a user given the user's latest action.
Using MRJob, implement a hashside join (memory-backed map-side) for left, right and inner joins. Run your code on the data used in HW 4.4: (Recall HW 4.4: Find the most frequent visitor of each page using mrjob and the output of 4.2 (i.e., transfromed log file). In this output please include the webpage URL, webpageID and Visitor ID.)
Justify which table you chose as the Left table in this hashside join.
Please report the number of rows resulting from:
List data files to use for joins
In [20]:
!ls | grep "mo"
Count lines in log dataset. View the first 10 lines. Rename data to log.txt
In [21]:
!wc -l anonymous-msweb-preprocessed.data && echo
!head anonymous-msweb-preprocessed.data
!cp anonymous-msweb-preprocessed.data log.txt
Convert the output of 4.4 to be just url and url_id. Save as urls.txt.
In [22]:
!cat mostFrequentVisitors.txt | cut -f 1,2 -d',' > urls.txt
!wc -l urls.txt && echo
!head urls.txt
The urls.txt file is much smaller than the log.txt data and should be what is loaded into memory. This means it would be the right-side table in a left-side join.
In [117]:
%%writefile join.py
from mrjob.job import MRJob
from mrjob.step import MRStep
# Avoid broken pipe error
from signal import signal, SIGPIPE, SIG_DFL
signal(SIGPIPE,SIG_DFL)
class Join(MRJob):
def configure_options(self):
super(Join, self).configure_options()
self.add_passthrough_option(
'--join',
default="left",
help="Options: left, inner, right")
def mapper_init(self):
self.join = self.options.join
self.urls_used = set()
self.urls = {}
try:
open("urls.txt")
filename = "urls.txt"
except FileNotFoundError:
filename = "limited_urls.txt"
with open(filename) as urls:
for line in urls:
url, key = line.strip().replace('"',"").split(",")
self.urls[key] = url
def mapper(self, _, lines):
try:
url = lines[2:6]
if self.join in ["inner", "left"]:
yield (lines, self.urls[url])
elif self.join in ["right"]:
yield (self.urls[url], lines)
self.urls_used.add(url)
except KeyError:
if self.join in ["inner", "right"]:
pass
else:
yield (lines, "None")
def mapper_final(self):
for key, value in self.urls.items():
if key not in self.urls_used:
yield (self.urls[key], "*")
def reducer(self, url, values):
quick_stash = 0
for val in values:
if val != "*":
quick_stash += 1
yield (val, url)
if quick_stash == 0:
yield ("None", url)
def steps(self):
join = self.options.join
if join in ["inner", "left"]:
mrsteps = [MRStep(mapper_init=self.mapper_init,
mapper=self.mapper)]
if join == "right":
mrsteps = [MRStep(mapper_init=self.mapper_init,
mapper=self.mapper,
mapper_final=self.mapper_final,
reducer=self.reducer)]
return mrsteps
if __name__ == "__main__":
Join.run()
Make a file with only the first five urls to test left and inner join.
In [128]:
!head -n 5 urls.txt > limited_urls.txt
Using the first ten lines of the log file and left joining it to the first five lines of the urls file, we see that some of the urls are returned as "None." This is correct behavior.
In [141]:
!head log.txt | python join.py --file limited_urls.txt --join left -q
Performing the same operation, but with an inner join, we see the lines that were "None" are dropped.
In [130]:
!head log.txt | python join.py --file limited_urls.txt --join inner -q
To prove the right-side join works, we can only use the first 100 log entries. We see that urls without corresponding log entries are listed as "None" and that all urls are returned in alphabetical order.
In [140]:
!head -n 100 log.txt | python join.py --file urls.txt --join right -r local -q | head -n 15
By using the limited urls file, we see that only five urls are returned and every logged page visit to those pages are returned (at least within the first 50 log entries).
In [139]:
!head -n 50 log.txt | python join.py --file limited_urls.txt --join right -r local -q
A large subset of the Google n-grams dataset
https://aws.amazon.com/datasets/google-books-ngrams/
which we have placed in a bucket/folder on Dropbox and on s3:
https://www.dropbox.com/sh/tmqpc4o0xswhkvz/AACUifrl6wrMrlK6a3X3lZ9Ea?dl=0
s3://filtered-5grams/
In particular, this bucket contains (~200) files (10Meg each) in the format:
(ngram) \t (count) \t (pages_count) \t (books_count)
The next cell shows the first 10 lines of the googlebooks-eng-all-5gram-20090715-0-filtered.txt file.
DISCLAIMER: Each record is already a 5-gram. We should calculate the stripes cooccurrence data from the raw text and not from the 5-gram preprocessed data. Calculating pairs on this 5-gram is a little corrupt as we will be double counting cooccurences. Having said that this exercise can still pull out some similar terms.
mini_5gram.txt
In [149]:
%%writefile mini_5gram.txt
A BILL FOR ESTABLISHING RELIGIOUS 59 59 54
A Biography of General George 92 90 74
A Case Study in Government 102 102 78
A Case Study of Female 447 447 327
A Case Study of Limited 55 55 43
A Child's Christmas in Wales 1099 1061 866
A Circumstantial Narrative of the 62 62 50
A City by the Sea 62 60 49
A Collection of Fairy Tales 123 117 80
A Collection of Forms of 116 103 82
atlas.txt
In [150]:
%%writefile atlas.txt
atlas boon 50 50 50
boon cava dipped 10 10 10
atlas dipped 15 15 15
mini_stripes.txt
In [151]:
with open("mini_stripes.txt", "w") as f:
f.writelines([
'"DocA"\t{"X":20, "Y":30, "Z":5}\n',
'"DocB"\t{"X":100, "Y":20}\n',
'"DocC"\t{"M":5, "N":20, "Z":5, "Y":1}\n'
])
!cat mini_stripes.txt
In [161]:
%%writefile MakeStripes.py
from mrjob.job import MRJob
from collections import Counter
class MakeStripes(MRJob):
def mapper(self, _, lines):
terms, term_count, page_count, book_count = lines.split("\t")
terms = terms.split()
term_count = int(term_count)
for item in terms:
yield (item, {term:term_count for term in terms if term != item})
def combiner(self, keys, values):
values_sum = Counter()
for val in values:
values_sum += Counter(val)
yield keys, dict(values_sum)
def reducer(self, keys, values):
values_sum = Counter()
for val in values:
values_sum += Counter(val)
yield keys, dict(values_sum)
if __name__ == "__main__":
MakeStripes.run()
In [164]:
%%writefile atlas_desired_results.txt
"atlas" {"dipped": 15, "boon": 50}
"boon" {"atlas": 50, "dipped": 10, "cava": 10}
"cava" {"dipped": 10, "boon": 10}
"dipped" {"atlas": 15, "boon": 10, "cava": 10}
In [168]:
!cat atlas.txt | python MakeStripes.py -q > atlas_stripes.txt
!cat atlas_stripes.txt
Actual result matches desired result
In [169]:
%%writefile InvertIndex.py
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from collections import Counter
class InvertIndex(MRJob):
MRJob.input_protocol = JSONProtocol
def mapper(self, key, words):
n_words = len(words)
for word in words:
yield (word, {key:n_words})
def combiner(self, keys, values):
values_sum = Counter()
for val in values:
values_sum += Counter(val)
yield keys, dict(values_sum)
def reducer(self, keys, values):
values_sum = Counter()
for val in values:
values_sum += Counter(val)
yield keys, dict(values_sum)
if __name__ == "__main__":
InvertIndex.run()
Systems test mini_stripes - Inverted Index
——————————————————————————————————————————
"M" | DocC 4 |
"N" | DocC 4 |
"X" | DocA 3 | DocB 2 |
"Y" | DocA 3 | DocB 2 | DocC 4 |
"Z" | DocA 3 | DocC 4 |
systems test atlas-boon - Inverted Index
——————————————————————————————————————————
"atlas" | boon 3 | dipped 3 |
"dipped" | atlas 2 | boon 3 | cava 2 |
"boon" | atlas 2 | cava 2 | dipped 3 |
"cava" | boon 3 | dipped 3 |
In [170]:
!cat mini_stripes.txt | python InvertIndex.py -q > mini_stripes_inverted.txt
!cat mini_stripes_inverted.txt
In [171]:
!cat atlas_stripes.txt | python InvertIndex.py -q > atlas_inverted.txt
!cat atlas_inverted.txt
The tests pass
In [172]:
%%writefile Similarity.py
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from itertools import combinations
from statistics import mean
class Similarity(MRJob):
MRJob.input_protocol = JSONProtocol
def mapper(self, key_term, docs):
doc_names = docs.keys()
for doc_pairs in combinations(sorted(list(doc_names)), 2):
yield (doc_pairs, 1)
for name in doc_names:
yield (name, 1)
def combiner(self, key, value):
yield (key, sum(value))
def reducer_init(self):
self.words = {}
self.results = []
def reducer(self, doc_or_docs, count):
if isinstance(doc_or_docs, str):
self.words[doc_or_docs] = sum(count)
else:
d1, d2 = doc_or_docs
d1_n_words, d2_n_words = self.words[d1], self.words[d2]
intersection = sum(count)
jaccard = round(intersection/(d1_n_words + d2_n_words - intersection), 3)
cosine = round(intersection/(d1_n_words**.5 * d2_n_words**.5), 3)
dice = round(2*intersection/(d1_n_words + d2_n_words), 3)
overlap = round(intersection/min(d1_n_words, d2_n_words), 3)
average = round(mean([jaccard, cosine, dice, overlap]), 3)
self.results.append([doc_or_docs, {"jacc":jaccard, "cos":cosine,
"dice":dice, "ol":overlap, "ave":average}])
def reducer_final(self):
for doc, result in sorted(self.results, key=lambda x: x[1]["ave"], reverse=True):
yield (doc, result)
if __name__ == "__main__":
Similarity.run()
Systems test mini_stripes - Similarity measures
average | pair | cosine | jaccard | overlap | dice |
---|---|---|---|---|---|
0.741582 | DocA - DocB | 0.816497 | 0.666667 | 1.000000 | 0.800000 |
0.488675 | DocA - DocC | 0.577350 | 0.400000 | 0.666667 | 0.571429 |
0.276777 | DocB - DocC | 0.353553 | 0.200000 | 0.500000 | 0.333333 |
Systems test atlas-boon 2 - Similarity measures
average | pair | cosine | jaccard | overlap | dice |
---|---|---|---|---|---|
1.000000 | atlas - cava | 1.000000 | 1.000000 | 1.000000 | 1.000000 |
0.625000 | boon - dipped | 0.666667 | 0.500000 | 0.666667 | 0.666667 |
0.389562 | cava - dipped | 0.408248 | 0.250000 | 0.500000 | 0.400000 |
0.389562 | boon - cava | 0.408248 | 0.250000 | 0.500000 | 0.400000 |
0.389562 | atlas - dipped | 0.408248 | 0.250000 | 0.500000 | 0.400000 |
0.389562 | atlas - boon | 0.408248 | 0.250000 | 0.500000 | 0.400000 |
In [173]:
!cat mini_stripes_inverted.txt | python Similarity.py -q --jobconf mapred.reduce.tasks=1
In [179]:
!cat atlas_inverted.txt | python Similarity.py -q --jobconf mapred.reduce.tasks=1
The numbers calculated exactly match the systems test except for the average calculations of the mini_stripes set. In this instance, the systems test calculations are not correct.
In [177]:
!cat atlas-boon-systems-test.txt | python MakeStripes.py -q | python InvertIndex.py -q | python Similarity.py -q --jobconf mapred.reduce.tasks=1
This section scans through the corpus file(s) and identify the top-n frequent words as vocabularies.
We utilize heapq to reduce the amount of data to transfer using hadoop.
This approach can run into memory constraints if our goal is to return the top k results where k is so large the resulting ordered list cannot fit into memory on a single machine (i.e. billions of results). In practice, we only care about a small number of the top results (for example, in this problem we only need to return the top 1000 results. 1000 results are trivially stored in memory).
The code uses multiple reducers. In the last MapReduce step, all data is sent to a single reducer by use of a single key; however, the data that is sent is never stored in memory (only the top k results are) and at most k*n_reducers observations would be sent to this reducer, which means the total data sent is very small and could easily fit on a single hard drive. If the data is so large it cannot fit on a single hard drive, we could add more MR steps to reduce the size of the data by 90% for each added step.
That said, we estimate that the code could work without any changes on a dataset with 100 trillion words if we were asked to return the top 100,000 words and had a cluster of 1,000 machines available.
In [142]:
%%writefile GetIndexandOtherWords.py
import heapq
from re import findall
from mrjob.job import MRJob
from mrjob.step import MRStep
class TopList(list):
def __init__(self, max_size, num_position=0):
"""
Just like a list, except the append method adds the new value to the
list only if it is larger than the smallest value (or if the size of
the list is less than max_size).
If each element of the list is an int or float, uses that value for
comparison. If the elements in the list are lists or tuples, uses the
list_position element of the list or tuple for the comparison.
"""
self.max_size = max_size
self.pos = num_position
def _get_key(self, x):
return x[self.pos] if isinstance(x, (list, tuple)) else x
def append(self, val):
if len(self) < self.max_size:
heapq.heappush(self, val)
elif self._get_key(self[0]) < self._get_key(val):
heapq.heapreplace(self, val)
def final_sort(self):
return sorted(self, key=self._get_key, reverse=True)
class GetIndexandOtherWords(MRJob):
"""
Usage: python GetIndexandOtherWords.py --index-range 9000-10000 --top-n-words 10000 --use-term-counts True
Given n-gram formatted data, outputs a file of the form:
index term
index term
...
word term
word term
...
Where there would be 1001 index words and 10000 total words. Each word would be ranked based
on either the term count listed in the Google n-gram data (i.e. the counts found in the
underlying books) or the ranks would be based on the word count of the n-grams in the actual
dataset (i.e. ignore the numbers/counts associated with each n-gram and count each n-gram
exactly once).
"""
def configure_options(self):
super(GetIndexandOtherWords, self).configure_options()
self.add_passthrough_option(
'--index-range',
default="9-10",
help="Specify the range of the index words. ex. 9-10 means the ninth and " +
"tenth most popular words will serve as the index")
self.add_passthrough_option(
'--top-n-words',
default="10",
help="Specify the number of words to output in all")
self.add_passthrough_option(
'--use-term-counts',
default="True",
choices=["True","False"],
help="When calculating the most frequent words, choose whether to count " +
"each word based on the term counts reported by Google or just based on " +
"the number of times the word appears in an n-gram")
self.add_passthrough_option(
'--return-counts',
default="False",
choices=["True","False"],
help="The final output includes the counts of each word")
def mapper_init(self):
# Ensure command line options are sane
top_n_words = int(self.options.top_n_words)
last_index_word = int(self.options.index_range.split("-")[1])
if top_n_words < last_index_word:
raise ValueError("""--top-n-words value (currently %d) must be equal to or greater than
--index-range value (currently %d).""" % (top_n_words, last_index_word))
self.stop_words = set(['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves',
'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him',
'his', 'himself', 'she', 'her', 'hers', 'herself', 'it',
'its', 'itself', 'they', 'them', 'their', 'theirs',
'themselves', 'what', 'which', 'who', 'whom', 'this', 'that',
'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be',
'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does',
'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or',
'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for',
'with', 'about', 'against', 'between', 'into', 'through',
'during', 'before', 'after', 'above', 'below', 'to', 'from',
'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under',
'again', 'further', 'then', 'once', 'here', 'there', 'when',
'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few',
'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not',
'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't',
'can', 'will', 'just', 'don', 'should', 'now'])
def mapper(self, _, lines):
terms, term_count, page_count, book_count = lines.split("\t")
# Either use the ngram term count for the count or count each word just once
if self.options.use_term_counts == "True":
term_count = int(term_count)
else:
term_count = 1
# Iterate through each term. Skip stop words
for term in findall(r'[a-z]+', terms.lower()):
if term in self.stop_words:
pass
else:
yield (term, term_count)
def combiner(self, term, counts):
yield (term, sum(counts))
def reducer_init(self):
"""
Accumulates the top X words and yields them. Note: should only use if
you want to emit a reasonable amount of top words (i.e. an amount that
could fit on a single computer.)
"""
self.top_n_words = int(self.options.top_n_words)
self.TopTerms = TopList(self.top_n_words, num_position=1)
def reducer(self, term, counts):
self.TopTerms.append((term, sum(counts)))
def reducer_final(self):
for pair in self.TopTerms:
yield pair
def mapper_single_key(self, term, count):
"""
Send all the data to a single reducer
"""
yield (1, (term, count))
def reducer_init_top_vals(self):
# Collect top words
self.top_n_words = int(self.options.top_n_words)
self.TopTerms = TopList(self.top_n_words, num_position=1)
# Collect index words
self.index_range = [int(num) for num in self.options.index_range.split("-")]
self.index_low, self.index_high = self.index_range
# Control if output shows counts or just words
self.return_counts = self.options.return_counts == "True"
def reducer_top_vals(self, _, terms):
for term in terms:
self.TopTerms.append(term)
def reducer_final_top_vals(self):
TopTerms = self.TopTerms.final_sort()
if self.return_counts:
# Yield index words
for term in TopTerms[self.index_low-1:self.index_high]:
yield ("index", term)
# Yield all words
for term in TopTerms:
yield ("words", term)
else:
# Yield index words
for term in TopTerms[self.index_low-1:self.index_high]:
yield ("index", term[0])
# Yield all words
for term in TopTerms:
yield ("words", term[0])
def steps(self):
"""
Step one: Yield top n-words from each reducer. Means dataset size is
n-words * num_reducers. Guarantees overall top n-words are
sent to the next step.
"""
mr_steps = [MRStep(mapper_init=self.mapper_init,
mapper=self.mapper,
combiner=self.combiner,
reducer_init=self.reducer_init,
reducer_final=self.reducer_final,
reducer=self.reducer),
MRStep(mapper=self.mapper_single_key,
reducer_init=self.reducer_init_top_vals,
reducer=self.reducer_top_vals,
reducer_final=self.reducer_final_top_vals)]
return mr_steps
if __name__ == "__main__":
GetIndexandOtherWords.run()
Test getting the index and other valid words excluding stop words on the mini_5gram.txt dataset. Return the top 10 most common words (based on the term counts) and mark the ninth and tenth most common words as the index words.
In [218]:
!cat mini_5gram.txt | python GetIndexandOtherWords.py --index-range 16-20 \
--top-n-words 20 \
--return-counts False \
--use-term-counts True \
-q > vocabs
!cat vocabs
To spot check the results, view the term counts of each word.
In [221]:
!cat mini_5gram.txt | python GetIndexandOtherWords.py --index-range 16-20 \
--top-n-words 20 \
--return-counts True \
--use-term-counts True \
-q
Do some EDA on this dataset using mrjob, e.g.,
We included all of this analysis at the end.
See here.
In [223]:
%%writefile MakeStripes.py
from mrjob.job import MRJob
from collections import Counter
from sys import stderr
from re import findall
class MakeStripes(MRJob):
def mapper_init(self):
"""
Read in index words and word list.
"""
self.stripes = {}
self.indexlist, self.wordslist = [],[]
with open('vocabs', 'r') as vocabFile:
for line in vocabFile:
word_type, word = line.replace('"', '').split()
if word_type == 'index':
self.indexlist.append(word)
else:
self.wordslist.append(word)
# Convert to sets to make lookups faster
self.indexlist = set(self.indexlist)
self.wordslist = set(self.wordslist)
def mapper(self, _, lines):
"""
Make stripes using index and words list
"""
terms, term_count, page_count, book_count = lines.split("\t")
term_count = int(term_count)
terms = findall(r'[a-z]+', terms.lower())
for item in terms:
if item in self.indexlist:
for val in terms:
if val != item and val in self.wordslist:
yield item, {val:term_count}
def combiner(self, keys, values):
values_sum = Counter()
for val in values:
values_sum += Counter(val)
yield keys, dict(values_sum)
def reducer(self, keys, values):
values_sum = Counter()
for val in values:
values_sum += Counter(val)
yield keys, dict(values_sum)
if __name__ == "__main__":
MakeStripes.run()
In [224]:
!python MakeStripes.py --file vocabs mini_5gram.txt -q
In [229]:
%%writefile InvertIndex.py
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from collections import Counter
class InvertIndex(MRJob):
MRJob.input_protocol = JSONProtocol
def mapper(self, key, words):
"""
Convert each stripe to inverted index
"""
n_words = len(words)
for word in words:
yield (word, {key:n_words})
def combiner(self, keys, values):
values_sum = Counter()
for val in values:
values_sum += Counter(val)
yield keys, dict(values_sum)
def reducer(self, keys, values):
values_sum = Counter()
for val in values:
values_sum += Counter(val)
yield keys, dict(values_sum)
if __name__ == "__main__":
InvertIndex.run()
In [232]:
!python MakeStripes.py --file vocabs mini_5gram.txt -q | python InvertIndex.py -q
In [233]:
%%writefile Similarity.py
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from itertools import combinations
class Similarity(MRJob):
MRJob.input_protocol = JSONProtocol
def mapper(self, key_term, docs):
"""
Make co-occurrence keys for each pair of documents in the inverted
index and make keys representing each document.
"""
doc_names = docs.keys()
for doc_pairs in combinations(sorted(list(doc_names)), 2):
yield (doc_pairs, 1)
for name in doc_names:
yield (name, 1)
def combiner(self, key, value):
yield (key, sum(value))
### Custom partitioner code goes here
def reducer_init(self):
self.words = {}
self.results = []
def reducer(self, doc_or_docs, count):
if isinstance(doc_or_docs, str):
self.words[doc_or_docs] = sum(count)
else:
d1, d2 = doc_or_docs
d1_n_words, d2_n_words = self.words[d1], self.words[d2]
intersection = float(sum(count))
jaccard = round(intersection/(d1_n_words + d2_n_words - intersection), 3)
cosine = round(intersection/(d1_n_words**.5 * d2_n_words**.5), 3)
dice = round(2*intersection/(d1_n_words + d2_n_words), 3)
overlap = round(intersection/min(d1_n_words, d2_n_words), 3)
average = round(sum([jaccard, cosine, dice, overlap])/4.0, 3)
self.results.append([doc_or_docs, {"jacc":jaccard, "cos":cosine,
"dice":dice, "ol":overlap, "ave":average}])
def reducer_final(self):
for doc, result in sorted(self.results, key=lambda x: x[1]["ave"], reverse=True):
yield (doc, result)
if __name__ == "__main__":
Similarity.run()
Doesn't return anything because there are no co-occurring words in the inverted index.
In [237]:
!python MakeStripes.py --file vocabs mini_5gram.txt -q | python InvertIndex.py -q | python Similarity.py -q --jobconf mapred.reduce.tasks=1
First, let's see how large the full dataset is. This number won't be exactly correct because my download was interrupted halfway through and I only have 185 items total.
In [261]:
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-* | wc
Because there are 200 files that make up the 5-gram dataset (at least that is what I thought I heard), the true line count of the dataset is about:
In [263]:
int(57432975*(200/185))
Out[263]:
We are going to operate on a subset of this data.
In [256]:
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-9* | wc
This sample of the data is only a few percent (about 5.5%) of the full size of the dataset.
In [264]:
3435179/62089702
Out[264]:
We find that many of the index words only occur one time.
In [299]:
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-9* | python GetIndexandOtherWords.py --index-range 9001-10000 \
--top-n-words 10000 \
--return-counts True \
--use-term-counts False \
-q > vocabs
!head vocabs
Here we will return the term count of each word (not the 5gram-based count).
In [300]:
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-9* | python GetIndexandOtherWords.py --index-range 9001-10000 \
--top-n-words 10000 \
--return-counts True \
--use-term-counts True \
-q > vocabs
!head vocabs
This code is very similar to what we would run on the full dataset
In [303]:
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-9* | python GetIndexandOtherWords.py --index-range 9001-10000 \
--top-n-words 10000 \
--return-counts False \
--use-term-counts True \
-q > vocabs
!head vocabs
Make stripes, invert index, and calculate similarities. Print top similarities.
In [315]:
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-9* | python MakeStripes.py --file vocabs -q | python InvertIndex.py -q | python Similarity.py -q --jobconf mapred.reduce.tasks=1 > similarities.txt
!head similarities.txt
It about 3 minutes to run this code. The code processes 11 out of 200 files. It currently uses one machine. If the cluster has 50 machines available, we would expect it to take only a few minutes for these core operations to run.
In [316]:
minutes_for_small_job = 3
n_small_jobs_in_big_job = 200/11
total_minutes_one_computer = minutes_for_small_job*n_small_jobs_in_big_job
computers_in_cluster = 50
total_minutes_for_cluster = total_minutes_one_computer/computers_in_cluster
total_minutes_for_cluster
Out[316]:
In [395]:
%%writefile CustomPartitioner.py
from mrjob.job import MRJob
from sys import stderr
import numpy as np
from operator import itemgetter
from random import random
class CustomPartitioner(MRJob):
def __init__(self, *args, **kwargs):
super(CustomPartitioner, self).__init__(*args, **kwargs)
self.N = 30
self.NUM_REDUCERS = 4
def mapper_init(self):
def makeKeyHash(key, num_reducers):
byteof = lambda char: int(format(ord(char), 'b'), 2)
current_hash = 0
for c in key:
current_hash = (current_hash * 31 + byteof(c))
return current_hash % num_reducers
# printable ascii characters, starting with 'A'
keys = [str(chr(i)) for i in range(65,65+self.NUM_REDUCERS)]
partitions = []
for key in keys:
partitions.append([key, makeKeyHash(key, self.NUM_REDUCERS)])
parts = sorted(partitions,key=itemgetter(1))
self.partition_keys = list(np.array(parts)[:,0])
self.partition_file = np.arange(0,self.N,self.N/(self.NUM_REDUCERS))[::-1]
print((keys, partitions, parts, self.partition_keys, self.partition_file), file=stderr)
def mapper(self, _, lines):
terms, term_count, page_count, book_count = lines.split("\t")
terms = terms.split()
term_count = int(term_count)
for item in terms:
yield (item, term_count)
for item in ["A", "B", "H", "I"]:
yield (item, 0)
def reducer_init(self):
self.reducer_unique_key = int(random()*900000+100000)
def reducer(self, keys, values):
yield (self.reducer_unique_key, (keys, sum(values)))
if __name__ == "__main__":
CustomPartitioner.run()
In [396]:
!cat atlas.txt | python CustomPartitioner.py -r local -q
Results of experiment so far: Cannot force specific keys into specific partitions when running locally. Will try again on VM.
In [13]:
%%writefile ngram.py
from mrjob.job import MRJob
from collections import Counter
import operator
class NGram(MRJob):
def mapper_init(self):
self.length = 0
self.longest = 0
self.distribution = Counter()
def mapper(self, _, lines):
# extract word/count sets
ngram, count, pages, _ = lines.split("\t")
count, pages = int(count), int(pages)
# loop to count word length
words = ngram.lower().split()
for w in words:
yield (w, {'count':count, 'pages':pages})
# Count of ngram length
n_gram_character_count = len(ngram)
yield n_gram_character_count, count
# determine if longest word on mapper
if n_gram_character_count > self.length:
self.length = n_gram_character_count
self.longest = [words, n_gram_character_count]
yield (self.longest)
def combiner(self, word, counts):
if isinstance(word,str):
count = 0
pages = 0
for x in counts:
count += x['count']
pages += x['pages']
yield word, {'count':count,'pages':pages}
#aggregate counts
elif isinstance(word,int):
yield word, sum(counts)
#yield long ngrams
else:
for x in counts:
yield word, x
def reducer_init(self):
self.longest = []
self.length = 0
self.counts = Counter()
self.pages = Counter()
self.distribution = Counter()
def reducer(self, key, values):
# use Counter word totals
for val in values:
if isinstance(key,str):
self.counts += Counter({key:val['count']})
self.pages += Counter({key:val['pages']})
# aggregate distribution numbers
elif isinstance(key,int):
self.distribution += Counter({key:val})
else:
# Determine if longest ngram on reducer
if val > self.length:
self.longest = [key, val]
self.length = val
def reducer_final(self):
# yield density calculation
for x in sorted(self.counts):
yield ('mrj_dens',{x:(1.*self.counts[x]/self.pages[x])})
# Use most_common counter function
for x in self.counts.most_common(10):
yield x
# return longest item
if self.longest:
yield self.longest
# yield distribution values
for x in self.distribution:
yield ('mrj_dist', {x:self.distribution[x]})
if __name__ == "__main__":
NGram.run()
In [14]:
!python ngram.py --jobconf mapred.reduce.tasks=1 < googlebooks-eng-all-5gram-20090715-0-filtered-first-10-lines.txt -q > dataout.txt
!cat dataout.txt
In [15]:
%matplotlib inline
import json
import operator
import numpy as np
import matplotlib.pyplot as plt
# sorted density list
def density(data):
x = data
sorted_x = sorted(x.items(), key=operator.itemgetter(1), reverse=True)
print sorted_x[:20]
# distribution plot
def distribution(data):
plt.scatter(data.keys(), data.values(), alpha=0.5)
plt.show()
# loader
def driver():
datain = open('dataout.txt','r')
densdata = {}
distdata = {}
# clean the mess I made
for line in datain:
parts = line.split('\t')
temp = parts[1][1:-2].replace('"', '').split(':')
mrj_val = parts[0].replace('"', '')
if mrj_val == "mrj_dens":
densdata[temp[0]]=float(temp[1])
elif mrj_val == "mrj_dist":
distdata[int(temp[0])]=int(temp[1])
#Execute density sort
density(densdata)
#Execute distribution plot
distribution(distdata)
driver()
This section examine the output pairs using nltk library.
For each pair of words, we examine whether one is identified as a synonym of the other by nltk.
Based on the "hit" data, we compute precision, recall and F1 score of the output.
With limited pair in the output, it is possible to run everything within a python script.
We also prepare a mapreduce job in case the number of pairs increase
In [16]:
%%writefile NLTKBenchMark.py
import nltk
import json
import numpy as np
from nltk.corpus import wordnet as wn
from mrjob.job import MRJob
from mrjob.step import MRStep
class NLTKBenchMark(MRJob):
def mapper(self, _, lines):
#parse the output file and identify the pair of words
pair, avg = lines.split("\t")
pair = json.loads(pair)
word1, word2 = pair[0], pair[1]
hit = 0
#for each word, extract the list of synonyms from nltk corpus, convert to set to remove duplicates
syn1 = set([l.name() for s in wn.synsets(word1) for l in s.lemmas()])
syn2 = set([l.name() for s in wn.synsets(word2) for l in s.lemmas()])
#keep track of words that have no synonym using '~nosync'
if len(syn1) == 0:
yield '~nosyn', [word1]
if len(syn2) == 0:
yield '~nosyn', [word2]
'''
for each occurence of word, increment the count
for word A, synset is the number of synonyms of the other word B
this value is used for calculating recall
this method becomes confusing/problematic if a word appears multiple times in the final output
if there is a hit for word A, set the hit to 1, and set the hit for the other word B to 0 (to avoid double count)
if there is not a hit for A and B, set the hit to 0 for both
'''
if word2 in syn1:
yield word2, {'hit':1, 'count':1, 'synset':len(syn1)}
yield word1, {'hit':0, 'count':1, 'synset':len(syn2)}
elif word1 in syn2:
yield word1, {'hit':1, 'count':1, 'synset':len(syn2)}
yield word2, {'hit':0, 'count':1, 'synset':len(syn1)}
else:
yield word1, {'hit':0, 'count':1, 'synset':len(syn2)}
yield word2, {'hit':0, 'count':1, 'synset':len(syn1)}
def combiner(self, term, values):
#combine '~nosyn' into a bigger list and yield the list
if term == '~nosyn':
nosynList = []
for value in values:
nosynList = nosynList+value
yield term, nosynList
else:
counters = {'hit':0, 'count':0, 'synset':0}
for value in values:
counters['hit'] += value['hit']
counters['count'] += value['count']
counters['synset'] = value['synset']
yield term, counters
def reducer_init(self):
self.plist = []
self.rlist = []
self.flist = []
def reducer(self, term, values):
#yield the final list of words that have no synonym
if term == '~nosyn':
nosynList = []
for value in values:
nosynList = nosynList+value
yield term, nosynList
else:
counters = {'hit':0.0, 'count':0.0, 'synset':0.0}
precision, recall, F1 = 0,0,0
for value in values:
counters['hit'] += value['hit']
counters['count'] += value['count']
counters['synset'] = value['synset']
if counters['hit'] > 0 and counters['synset'] > 0:
precision = float(counters['hit'])/float(counters['count'])
recall = float(counters['hit'])/float(counters['synset'])
F1 = 2*precision*recall/(precision+recall)
self.plist.append(precision)
self.rlist.append(recall)
self.flist.append(F1)
yield term, counters
elif counters['synset'] > 0:
self.plist.append(precision)
self.rlist.append(recall)
self.flist.append(F1)
yield term, counters
def reducer_final(self):
#compute the mean of all collected measurements
yield 'precision', np.mean(self.plist)
yield 'recall', np.mean(self.rlist)
yield 'F1', np.mean(self.flist)
if __name__ == "__main__":
NLTKBenchMark.run()
In [17]:
!python NLTKBenchMark.py nltk_bench_sample.txt
In [18]:
''' Performance measures '''
from __future__ import division
import numpy as np
import json
import nltk
from nltk.corpus import wordnet as wn
import sys
#print all the synset element of an element
def synonyms(string):
syndict = {}
for i,j in enumerate(wn.synsets(string)):
syns = j.lemma_names()
for syn in syns:
syndict.setdefault(syn,1)
return syndict.keys()
hits = []
TP = 0
FP = 0
TOTAL = 0
flag = False # so we don't double count, but at the same time don't miss hits
## For this part we can use one of three outputs. They are all the same, but were generated differently
# 1. the top 1000 from the full sorted dataset -> sortedSims[:1000]
# 2. the top 1000 from the partial sort aggragate file -> sims2/top1000sims
# 3. the top 1000 from the total order sort file -> head -1000 sims_parts/part-00004
top1000sims = []
with open("nltk_bench_sample.txt","r") as f:
for line in f.readlines():
line = line.strip()
lisst, avg = line.split("\t")
lisst = eval(lisst)
lisst.append(avg)
top1000sims.append(lisst)
measures = {}
not_in_wordnet = []
for line in top1000sims:
TOTAL += 1
words=line[0:2]
for word in words:
if word not in measures:
measures[word] = {"syns":0,"opps": 0,"hits":0}
measures[word]["opps"] += 1
syns0 = synonyms(words[0])
measures[words[1]]["syns"] = len(syns0)
if len(syns0) == 0:
not_in_wordnet.append(words[0])
if words[1] in syns0:
TP += 1
hits.append(line)
flag = True
measures[words[1]]["hits"] += 1
syns1 = synonyms(words[1])
measures[words[0]]["syns"] = len(syns1)
if len(syns1) == 0:
not_in_wordnet.append(words[1])
if words[0] in syns1:
if flag == False:
TP += 1
hits.append(line)
measures[words[0]]["hits"] += 1
flag = False
precision = []
recall = []
f1 = []
for key in measures:
p,r,f = 0,0,0
if measures[key]["hits"] > 0 and measures[key]["syns"] > 0:
p = measures[key]["hits"]/measures[key]["opps"]
r = measures[key]["hits"]/measures[key]["syns"]
f = 2 * (p*r)/(p+r)
# For calculating measures, only take into account words that have synonyms in wordnet
if measures[key]["syns"] > 0:
precision.append(p)
recall.append(r)
f1.append(f)
# Take the mean of each measure
print "—"*110
print "Number of Hits:",TP, "out of top",TOTAL
print "Number of words without synonyms:",len(not_in_wordnet)
print "—"*110
print "Precision\t", np.mean(precision)
print "Recall\t\t", np.mean(recall)
print "F1\t\t", np.mean(f1)
print "—"*110
print "Words without synonyms:"
print "-"*100
for word in not_in_wordnet:
print synonyms(word),word