Course Lead: Dr James G. Shanahan (email Jimi via James.Shanahan AT gmail.com)
Group Members:
Jim Chen, Memphis, TN, jim.chen@ischool.berkeley.edu
Manuel Moreno, Salt Lake City, UT, momoreno@ischool.berkeley.edu
Rahul Ragunathan
Jason Sanchez, San Francisco, CA, jason.sanchez@ischool.berkeley.edu
Class: MIDS w261 Fall 2016 Group 2
Week:   5
Due Time: 2 Phases.
HW5 Phase 1 This can be done on a local machine (with a unit test on the cloud such as AltaScale's PaaS or on AWS) and is due Tuesday, Week 6 by 8AM (West coast time). It will primarily focus on building a unit/systems and for pairwise similarity calculations pipeline (for stripe documents)
HW5 Phase 2 This will require the AltaScale cluster and will be due Tuesday, Week 7 by 8AM (West coast time). The focus of HW5 Phase 2 will be to scale up the unit/systems tests to the Google 5 gram corpus. This will be a group exercise
MIDS UC Berkeley, Machine Learning at Scale DATSCIW261 ASSIGNMENT #5
Version 2016-09-25
=== INSTRUCTIONS for SUBMISSIONS === Follow the instructions for submissions carefully.
https://docs.google.com/forms/d/1ZOr9RnIe_A06AcZDB6K1mJN4vrLeSmS2PD6Xm3eOiis/viewform?usp=send_form
Data warehouse: Stores a large amount of relational, semi-structured, and unstructured data. Is used for business intelligence and data science.
A star schema has fact tables and many dimension tables that connect to the fact tables. Fact tables record events such as sales or website visits and encodes details of the events as keys (user_id, product_id, store_id, ad_id). The dimension tables store the detailed information about each of these keys.
Star schemas provide simple approached to structuring data warehouses in a relational way.
3NF means third normal form. It is used to transform large flat files that have repeated data into a linked collection of smaller tables that can be joined on a set of common keys.
Machine learning does not use data in 3NF. Instead it uses large flat files so the details that are hidden by the keys can be used in the algorithms.
Log files can track specific events of interest. A denormalized log file allows a company to track these events in real time conditioned on specific customer features. Alternatively, a model can be running that triggers appropriate responses based on the next predicted action of a user given the user's latest action.
Using MRJob, implement a hashside join (memory-backed map-side) for left, right and inner joins. Run your code on the data used in HW 4.4: (Recall HW 4.4: Find the most frequent visitor of each page using mrjob and the output of 4.2 (i.e., transfromed log file). In this output please include the webpage URL, webpageID and Visitor ID.)
Justify which table you chose as the Left table in this hashside join.
Please report the number of rows resulting from:
List data files to use for joins
In [20]:
    
!ls | grep "mo"
    
    
Count lines in log dataset. View the first 10 lines. Rename data to log.txt
In [21]:
    
!wc -l anonymous-msweb-preprocessed.data && echo
!head anonymous-msweb-preprocessed.data
!cp anonymous-msweb-preprocessed.data log.txt
    
    
Convert the output of 4.4 to be just url and url_id. Save as urls.txt.
In [22]:
    
!cat mostFrequentVisitors.txt | cut -f 1,2 -d',' > urls.txt
!wc -l urls.txt && echo
!head urls.txt
    
    
The urls.txt file is much smaller than the log.txt data and should be what is loaded into memory. This means it would be the right-side table in a left-side join.
In [117]:
    
%%writefile join.py
from mrjob.job import MRJob
from mrjob.step import MRStep
# Avoid broken pipe error
from signal import signal, SIGPIPE, SIG_DFL
signal(SIGPIPE,SIG_DFL) 
class Join(MRJob):
    def configure_options(self):
        super(Join, self).configure_options()
        
        self.add_passthrough_option(
            '--join', 
            default="left", 
            help="Options: left, inner, right")
        
    def mapper_init(self):
        self.join = self.options.join
        self.urls_used = set()
        self.urls = {}
        
        try:
            open("urls.txt")
            filename = "urls.txt"
        except FileNotFoundError:
            filename = "limited_urls.txt"
        
        with open(filename) as urls:
            for line in urls:
                url, key = line.strip().replace('"',"").split(",")
                self.urls[key] = url
        
    def mapper(self, _, lines):
        try:
            url = lines[2:6]
            if self.join in ["inner", "left"]:
                yield (lines, self.urls[url])
            elif self.join in ["right"]:
                yield (self.urls[url], lines)
                self.urls_used.add(url)
            
        except KeyError:
            if self.join in ["inner", "right"]:
                pass
            else:
                yield (lines, "None")
    def mapper_final(self):
        for key, value in self.urls.items():
            if key not in self.urls_used:
                yield (self.urls[key], "*")
                
    def reducer(self, url, values):
        quick_stash = 0
        for val in values:
            if val != "*":
                quick_stash += 1
                yield (val, url)
        if quick_stash == 0:
            yield ("None", url)
            
    def steps(self):
        join = self.options.join
        if join in ["inner", "left"]:
            mrsteps = [MRStep(mapper_init=self.mapper_init,
                              mapper=self.mapper)]
        if join == "right":
            mrsteps = [MRStep(mapper_init=self.mapper_init,
                              mapper=self.mapper,
                              mapper_final=self.mapper_final,
                              reducer=self.reducer)]  
        return mrsteps
        
if __name__ == "__main__":
    Join.run()
    
    
Make a file with only the first five urls to test left and inner join.
In [128]:
    
!head -n 5 urls.txt > limited_urls.txt
    
Using the first ten lines of the log file and left joining it to the first five lines of the urls file, we see that some of the urls are returned as "None." This is correct behavior.
In [141]:
    
!head log.txt | python join.py --file limited_urls.txt --join left -q
    
    
Performing the same operation, but with an inner join, we see the lines that were "None" are dropped.
In [130]:
    
!head log.txt | python join.py --file limited_urls.txt --join inner -q
    
    
To prove the right-side join works, we can only use the first 100 log entries. We see that urls without corresponding log entries are listed as "None" and that all urls are returned in alphabetical order.
In [140]:
    
!head -n 100 log.txt | python join.py --file urls.txt --join right -r local -q | head -n 15
    
    
By using the limited urls file, we see that only five urls are returned and every logged page visit to those pages are returned (at least within the first 50 log entries).
In [139]:
    
!head -n 50 log.txt | python join.py --file limited_urls.txt --join right -r local -q
    
    
A large subset of the Google n-grams dataset
https://aws.amazon.com/datasets/google-books-ngrams/
which we have placed in a bucket/folder on Dropbox and on s3:
https://www.dropbox.com/sh/tmqpc4o0xswhkvz/AACUifrl6wrMrlK6a3X3lZ9Ea?dl=0
s3://filtered-5grams/
In particular, this bucket contains (~200) files (10Meg each) in the format:
(ngram) \t (count) \t (pages_count) \t (books_count)
The next cell shows the first 10 lines of the googlebooks-eng-all-5gram-20090715-0-filtered.txt file.
DISCLAIMER: Each record is already a 5-gram. We should calculate the stripes cooccurrence data from the raw text and not from the 5-gram preprocessed data. Calculating pairs on this 5-gram is a little corrupt as we will be double counting cooccurences. Having said that this exercise can still pull out some similar terms.
mini_5gram.txt
In [149]:
    
%%writefile mini_5gram.txt
A BILL FOR ESTABLISHING RELIGIOUS	59	59	54
A Biography of General George	92	90	74
A Case Study in Government	102	102	78
A Case Study of Female	447	447	327
A Case Study of Limited	55	55	43
A Child's Christmas in Wales	1099	1061	866
A Circumstantial Narrative of the	62	62	50
A City by the Sea	62	60	49
A Collection of Fairy Tales	123	117	80
A Collection of Forms of	116	103	82
    
    
atlas.txt
In [150]:
    
%%writefile atlas.txt
atlas boon	50	50	50
boon cava dipped	10	10	10
atlas dipped	15	15	15
    
    
mini_stripes.txt
In [151]:
    
with open("mini_stripes.txt", "w") as f:
    f.writelines([
        '"DocA"\t{"X":20, "Y":30, "Z":5}\n',
        '"DocB"\t{"X":100, "Y":20}\n',  
        '"DocC"\t{"M":5, "N":20, "Z":5, "Y":1}\n'
    ])
!cat mini_stripes.txt
    
    
In [161]:
    
%%writefile MakeStripes.py
from mrjob.job import MRJob
from collections import Counter
class MakeStripes(MRJob):
    def mapper(self, _, lines):
        terms, term_count, page_count, book_count = lines.split("\t")
        terms = terms.split()
        term_count = int(term_count)
        
        for item in terms:
            yield (item, {term:term_count for term in terms if term != item})
        
    def combiner(self, keys, values):
        values_sum = Counter()
        for val in values:
            values_sum += Counter(val)
        yield keys, dict(values_sum)
    def reducer(self, keys, values):
        values_sum = Counter()
        for val in values:
            values_sum += Counter(val)
        yield keys, dict(values_sum)
        
if __name__ == "__main__":
    MakeStripes.run()
    
    
In [164]:
    
%%writefile atlas_desired_results.txt
"atlas"	{"dipped": 15, "boon": 50}
"boon"	{"atlas": 50, "dipped": 10, "cava": 10}
"cava"	{"dipped": 10, "boon": 10}
"dipped"	{"atlas": 15, "boon": 10, "cava": 10}
    
    
In [168]:
    
!cat atlas.txt | python MakeStripes.py -q > atlas_stripes.txt
!cat atlas_stripes.txt
    
    
Actual result matches desired result
In [169]:
    
%%writefile InvertIndex.py
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from collections import Counter
class InvertIndex(MRJob):
    MRJob.input_protocol = JSONProtocol
    
    def mapper(self, key, words):
        n_words = len(words)
        
        for word in words: 
            yield (word, {key:n_words})
            
    def combiner(self, keys, values):
        values_sum = Counter()
        for val in values:
            values_sum += Counter(val)
        yield keys, dict(values_sum)
    def reducer(self, keys, values):
        values_sum = Counter()
        for val in values:
            values_sum += Counter(val)
        yield keys, dict(values_sum)
        
if __name__ == "__main__":
    InvertIndex.run()
    
    
Systems test mini_stripes - Inverted Index
——————————————————————————————————————————
            "M" |         DocC 4 |
            "N" |         DocC 4 |
            "X" |         DocA 3 |         DocB 2 |
            "Y" |         DocA 3 |         DocB 2 |         DocC 4   |
            "Z" |         DocA 3 |         DocC 4 |
systems test atlas-boon - Inverted Index
——————————————————————————————————————————
        "atlas" |         boon 3 |       dipped 3 |
       "dipped" |        atlas 2 |         boon 3 |         cava 2  |
         "boon" |        atlas 2 |         cava 2 |       dipped 3  |
         "cava" |         boon 3 |       dipped 3 |
In [170]:
    
!cat mini_stripes.txt | python InvertIndex.py -q > mini_stripes_inverted.txt
!cat mini_stripes_inverted.txt
    
    
In [171]:
    
!cat atlas_stripes.txt | python InvertIndex.py -q > atlas_inverted.txt
!cat atlas_inverted.txt
    
    
The tests pass
In [172]:
    
%%writefile Similarity.py
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from itertools import combinations
from statistics import mean
class Similarity(MRJob):
    MRJob.input_protocol = JSONProtocol
    
    def mapper(self, key_term, docs):
        doc_names = docs.keys()
        for doc_pairs in combinations(sorted(list(doc_names)), 2):
            yield (doc_pairs, 1)
        for name in doc_names:
            yield (name, 1)
            
    def combiner(self, key, value):
        yield (key, sum(value))
        
    def reducer_init(self):
        self.words = {}
        self.results = []
    
    def reducer(self, doc_or_docs, count):
        if isinstance(doc_or_docs, str):
            self.words[doc_or_docs] = sum(count)
        else:
            d1, d2 = doc_or_docs
            d1_n_words, d2_n_words = self.words[d1], self.words[d2]
            intersection = sum(count)
            
            jaccard = round(intersection/(d1_n_words + d2_n_words - intersection), 3)
            cosine = round(intersection/(d1_n_words**.5 * d2_n_words**.5), 3)
            dice = round(2*intersection/(d1_n_words + d2_n_words), 3)
            overlap = round(intersection/min(d1_n_words, d2_n_words), 3)
            average = round(mean([jaccard, cosine, dice, overlap]), 3)
            
            self.results.append([doc_or_docs, {"jacc":jaccard, "cos":cosine, 
                                               "dice":dice, "ol":overlap, "ave":average}])
            
    def reducer_final(self):
        for doc, result in sorted(self.results, key=lambda x: x[1]["ave"], reverse=True):
            yield (doc, result)
        
if __name__ == "__main__":
    Similarity.run()
    
    
Systems test mini_stripes - Similarity measures
| average | pair | cosine | jaccard | overlap | dice | 
|---|---|---|---|---|---|
| 0.741582 | DocA - DocB | 0.816497 | 0.666667 | 1.000000 | 0.800000 | 
| 0.488675 | DocA - DocC | 0.577350 | 0.400000 | 0.666667 | 0.571429 | 
| 0.276777 | DocB - DocC | 0.353553 | 0.200000 | 0.500000 | 0.333333 | 
Systems test atlas-boon 2 - Similarity measures
| average | pair | cosine | jaccard | overlap | dice | 
|---|---|---|---|---|---|
| 1.000000 | atlas - cava | 1.000000 | 1.000000 | 1.000000 | 1.000000 | 
| 0.625000 | boon - dipped | 0.666667 | 0.500000 | 0.666667 | 0.666667 | 
| 0.389562 | cava - dipped | 0.408248 | 0.250000 | 0.500000 | 0.400000 | 
| 0.389562 | boon - cava | 0.408248 | 0.250000 | 0.500000 | 0.400000 | 
| 0.389562 | atlas - dipped | 0.408248 | 0.250000 | 0.500000 | 0.400000 | 
| 0.389562 | atlas - boon | 0.408248 | 0.250000 | 0.500000 | 0.400000 | 
In [173]:
    
!cat mini_stripes_inverted.txt | python Similarity.py -q --jobconf mapred.reduce.tasks=1
    
    
In [179]:
    
!cat atlas_inverted.txt | python Similarity.py -q --jobconf mapred.reduce.tasks=1
    
    
The numbers calculated exactly match the systems test except for the average calculations of the mini_stripes set. In this instance, the systems test calculations are not correct.
In [177]:
    
!cat atlas-boon-systems-test.txt | python MakeStripes.py -q | python InvertIndex.py -q | python Similarity.py -q --jobconf mapred.reduce.tasks=1
    
    
This section scans through the corpus file(s) and identify the top-n frequent words as vocabularies.
We utilize heapq to reduce the amount of data to transfer using hadoop.
This approach can run into memory constraints if our goal is to return the top k results where k is so large the resulting ordered list cannot fit into memory on a single machine (i.e. billions of results). In practice, we only care about a small number of the top results (for example, in this problem we only need to return the top 1000 results. 1000 results are trivially stored in memory).
The code uses multiple reducers. In the last MapReduce step, all data is sent to a single reducer by use of a single key; however, the data that is sent is never stored in memory (only the top k results are) and at most k*n_reducers observations would be sent to this reducer, which means the total data sent is very small and could easily fit on a single hard drive. If the data is so large it cannot fit on a single hard drive, we could add more MR steps to reduce the size of the data by 90% for each added step.
That said, we estimate that the code could work without any changes on a dataset with 100 trillion words if we were asked to return the top 100,000 words and had a cluster of 1,000 machines available.
In [142]:
    
%%writefile GetIndexandOtherWords.py
import heapq
from re import findall
from mrjob.job import MRJob
from mrjob.step import MRStep
class TopList(list):
    def __init__(self, max_size, num_position=0):
        """
        Just like a list, except the append method adds the new value to the 
        list only if it is larger than the smallest value (or if the size of 
        the list is less than max_size). 
        
        If each element of the list is an int or float, uses that value for 
        comparison. If the elements in the list are lists or tuples, uses the 
        list_position element of the list or tuple for the comparison.
        """
        self.max_size = max_size
        self.pos = num_position
        
    def _get_key(self, x):
        return x[self.pos] if isinstance(x, (list, tuple)) else x
        
    def append(self, val):
        if len(self) < self.max_size:
            heapq.heappush(self, val)
        elif self._get_key(self[0]) < self._get_key(val):
            heapq.heapreplace(self, val)
            
    def final_sort(self):
        return sorted(self, key=self._get_key, reverse=True)
    
class GetIndexandOtherWords(MRJob):
    """
    Usage: python GetIndexandOtherWords.py --index-range 9000-10000 --top-n-words 10000 --use-term-counts True
    
    Given n-gram formatted data, outputs a file of the form:
    
    index    term
    index    term
    ...
    word     term
    word     term
    ...
    
    Where there would be 1001 index words and 10000 total words. Each word would be ranked based
    on either the term count listed in the Google n-gram data (i.e. the counts found in the
    underlying books) or the ranks would be based on the word count of the n-grams in the actual
    dataset (i.e. ignore the numbers/counts associated with each n-gram and count each n-gram
    exactly once).
    """
    def configure_options(self):
        super(GetIndexandOtherWords, self).configure_options()
        
        self.add_passthrough_option(
            '--index-range', 
            default="9-10", 
            help="Specify the range of the index words. ex. 9-10 means the ninth and " +
                 "tenth most popular words will serve as the index")
        
        self.add_passthrough_option(
            '--top-n-words', 
            default="10", 
            help="Specify the number of words to output in all")
        
        self.add_passthrough_option(
            '--use-term-counts', 
            default="True", 
            choices=["True","False"],
            help="When calculating the most frequent words, choose whether to count " + 
            "each word based on the term counts reported by Google or just based on " + 
            "the number of times the word appears in an n-gram")
        
        self.add_passthrough_option(
            '--return-counts', 
            default="False", 
            choices=["True","False"],
            help="The final output includes the counts of each word")
        
    def mapper_init(self):
        # Ensure command line options are sane
        top_n_words = int(self.options.top_n_words)
        last_index_word = int(self.options.index_range.split("-")[1])
        if top_n_words < last_index_word:
            raise ValueError("""--top-n-words value (currently %d) must be equal to or greater than
                             --index-range value (currently %d).""" % (top_n_words, last_index_word))
        
        self.stop_words =  set(['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 
                            'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 
                            'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 
                            'its', 'itself', 'they', 'them', 'their', 'theirs', 
                            'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', 
                            'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 
                            'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 
                            'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 
                            'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 
                            'with', 'about', 'against', 'between', 'into', 'through', 
                            'during', 'before', 'after', 'above', 'below', 'to', 'from', 
                            'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 
                            'again', 'further', 'then', 'once', 'here', 'there', 'when', 
                            'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 
                            'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 
                            'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 
                            'can', 'will', 'just', 'don', 'should', 'now'])
        
    def mapper(self, _, lines):
        terms, term_count, page_count, book_count = lines.split("\t")
        
        # Either use the ngram term count for the count or count each word just once
        if self.options.use_term_counts == "True":
            term_count = int(term_count)
        else:
            term_count = 1
        
        # Iterate through each term. Skip stop words
        for term in findall(r'[a-z]+', terms.lower()):
            if term in self.stop_words:
                pass
            else:
                yield (term, term_count)
        
    def combiner(self, term, counts):
        yield (term, sum(counts))      
        
        
    def reducer_init(self):
        """
        Accumulates the top X words and yields them. Note: should only use if
        you want to emit a reasonable amount of top words (i.e. an amount that 
        could fit on a single computer.)
        """
        self.top_n_words = int(self.options.top_n_words)
        self.TopTerms = TopList(self.top_n_words, num_position=1)
        
    def reducer(self, term, counts):
        self.TopTerms.append((term, sum(counts)))
    def reducer_final(self):
        for pair in self.TopTerms:
            yield pair
        
    def mapper_single_key(self, term, count):
        """
        Send all the data to a single reducer
        """
        yield (1, (term, count))
        
    def reducer_init_top_vals(self):
        # Collect top words
        self.top_n_words = int(self.options.top_n_words)
        self.TopTerms = TopList(self.top_n_words, num_position=1)
        # Collect index words
        self.index_range = [int(num) for num in self.options.index_range.split("-")]
        self.index_low, self.index_high = self.index_range
        # Control if output shows counts or just words
        self.return_counts = self.options.return_counts == "True"
    def reducer_top_vals(self, _, terms):
        for term in terms:
            self.TopTerms.append(term)
            
    def reducer_final_top_vals(self):
        TopTerms = self.TopTerms.final_sort()
        
        if self.return_counts:            
            # Yield index words
            for term in TopTerms[self.index_low-1:self.index_high]:
                yield ("index", term)
            # Yield all words
            for term in TopTerms:
                yield ("words", term)
        else:
            # Yield index words
            for term in TopTerms[self.index_low-1:self.index_high]:
                yield ("index", term[0])
            # Yield all words
            for term in TopTerms:
                yield ("words", term[0])
    
    def steps(self):
        """
        Step one: Yield top n-words from each reducer. Means dataset size is 
                  n-words * num_reducers. Guarantees overall top n-words are 
                  sent to the next step.
        
        """
        mr_steps = [MRStep(mapper_init=self.mapper_init,
                           mapper=self.mapper,
                           combiner=self.combiner,
                           reducer_init=self.reducer_init,
                           reducer_final=self.reducer_final,
                           reducer=self.reducer),
                    MRStep(mapper=self.mapper_single_key,
                           reducer_init=self.reducer_init_top_vals,
                           reducer=self.reducer_top_vals,
                           reducer_final=self.reducer_final_top_vals)]
        return mr_steps
        
if __name__ == "__main__":
    GetIndexandOtherWords.run()
    
    
Test getting the index and other valid words excluding stop words on the mini_5gram.txt dataset. Return the top 10 most common words (based on the term counts) and mark the ninth and tenth most common words as the index words.
In [218]:
    
!cat mini_5gram.txt | python GetIndexandOtherWords.py --index-range 16-20    \
                                                      --top-n-words 20       \
                                                      --return-counts False  \
                                                      --use-term-counts True \
                                                       -q > vocabs
!cat vocabs
    
    
To spot check the results, view the term counts of each word.
In [221]:
    
!cat mini_5gram.txt | python GetIndexandOtherWords.py --index-range 16-20    \
                                                      --top-n-words 20       \
                                                      --return-counts True   \
                                                      --use-term-counts True \
                                                       -q
    
    
Do some EDA on this dataset using mrjob, e.g.,
We included all of this analysis at the end.
See here.
In [223]:
    
%%writefile MakeStripes.py
from mrjob.job import MRJob
from collections import Counter
from sys import stderr
from re import findall
class MakeStripes(MRJob):
    def mapper_init(self):
        """
        Read in index words and word list.
        """
        self.stripes = {}
        
        self.indexlist, self.wordslist = [],[]
        with open('vocabs', 'r') as vocabFile:
            for line in vocabFile:
                word_type, word = line.replace('"', '').split()
                if word_type == 'index':
                    self.indexlist.append(word)
                else:
                    self.wordslist.append(word)
        
        # Convert to sets to make lookups faster
        self.indexlist = set(self.indexlist)
        self.wordslist = set(self.wordslist)
    
    def mapper(self, _, lines):
        """
        Make stripes using index and words list
        """
        terms, term_count, page_count, book_count = lines.split("\t")
        term_count = int(term_count)
        terms = findall(r'[a-z]+', terms.lower())
                
        for item in terms:
            if item in self.indexlist:
                for val in terms:
                    if val != item and val in self.wordslist:
                        yield item, {val:term_count}
        
    def combiner(self, keys, values):
        values_sum = Counter()
        for val in values:
            values_sum += Counter(val)
        yield keys, dict(values_sum)
    def reducer(self, keys, values):
        values_sum = Counter()
        for val in values:
            values_sum += Counter(val)
        yield keys, dict(values_sum)
        
if __name__ == "__main__":
    MakeStripes.run()
    
    
In [224]:
    
!python MakeStripes.py --file vocabs mini_5gram.txt -q
    
    
In [229]:
    
%%writefile InvertIndex.py
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from collections import Counter
class InvertIndex(MRJob):
    MRJob.input_protocol = JSONProtocol
    
    def mapper(self, key, words):
        """
        Convert each stripe to inverted index
        """
        n_words = len(words)
        for word in words: 
            yield (word, {key:n_words})
            
    def combiner(self, keys, values):
        values_sum = Counter()
        for val in values:
            values_sum += Counter(val)
        yield keys, dict(values_sum)
    def reducer(self, keys, values):
        values_sum = Counter()
        for val in values:
            values_sum += Counter(val)
        yield keys, dict(values_sum)
        
if __name__ == "__main__":
    InvertIndex.run()
    
    
In [232]:
    
!python MakeStripes.py --file vocabs mini_5gram.txt -q | python InvertIndex.py -q
    
    
In [233]:
    
%%writefile Similarity.py
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from itertools import combinations
class Similarity(MRJob):
    MRJob.input_protocol = JSONProtocol
    
    def mapper(self, key_term, docs):
        """
        Make co-occurrence keys for each pair of documents in the inverted
        index and make keys representing each document.
        """
        doc_names = docs.keys()
        for doc_pairs in combinations(sorted(list(doc_names)), 2):
            yield (doc_pairs, 1)
        for name in doc_names:
            yield (name, 1)
            
    def combiner(self, key, value):
        yield (key, sum(value))
        
    ### Custom partitioner code goes here
    def reducer_init(self):
        self.words = {}
        self.results = []
    
    def reducer(self, doc_or_docs, count):
        if isinstance(doc_or_docs, str):
            self.words[doc_or_docs] = sum(count)
        else:
            d1, d2 = doc_or_docs
            d1_n_words, d2_n_words = self.words[d1], self.words[d2]
            intersection = float(sum(count))
            
            jaccard = round(intersection/(d1_n_words + d2_n_words - intersection), 3)
            cosine = round(intersection/(d1_n_words**.5 * d2_n_words**.5), 3)
            dice = round(2*intersection/(d1_n_words + d2_n_words), 3)
            overlap = round(intersection/min(d1_n_words, d2_n_words), 3)
            average = round(sum([jaccard, cosine, dice, overlap])/4.0, 3)
            
            self.results.append([doc_or_docs, {"jacc":jaccard, "cos":cosine, 
                                               "dice":dice, "ol":overlap, "ave":average}])
            
    def reducer_final(self):
        for doc, result in sorted(self.results, key=lambda x: x[1]["ave"], reverse=True):
            yield (doc, result)
        
if __name__ == "__main__":
    Similarity.run()
    
    
Doesn't return anything because there are no co-occurring words in the inverted index.
In [237]:
    
!python MakeStripes.py --file vocabs mini_5gram.txt -q | python InvertIndex.py -q | python Similarity.py -q --jobconf mapred.reduce.tasks=1
    
First, let's see how large the full dataset is. This number won't be exactly correct because my download was interrupted halfway through and I only have 185 items total.
In [261]:
    
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-* | wc
    
    
Because there are 200 files that make up the 5-gram dataset (at least that is what I thought I heard), the true line count of the dataset is about:
In [263]:
    
int(57432975*(200/185))
    
    Out[263]:
We are going to operate on a subset of this data.
In [256]:
    
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-9* | wc
    
    
This sample of the data is only a few percent (about 5.5%) of the full size of the dataset.
In [264]:
    
3435179/62089702
    
    Out[264]:
We find that many of the index words only occur one time.
In [299]:
    
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-9* | python GetIndexandOtherWords.py --index-range 9001-10000  \
                                                                                       --top-n-words 10000       \
                                                                                       --return-counts True     \
                                                                                       --use-term-counts False    \
                                                                                        -q > vocabs
!head vocabs
    
    
Here we will return the term count of each word (not the 5gram-based count).
In [300]:
    
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-9* | python GetIndexandOtherWords.py --index-range 9001-10000  \
                                                                                       --top-n-words 10000       \
                                                                                       --return-counts True     \
                                                                                       --use-term-counts True    \
                                                                                        -q > vocabs
!head vocabs
    
    
This code is very similar to what we would run on the full dataset
In [303]:
    
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-9* | python GetIndexandOtherWords.py --index-range 9001-10000  \
                                                                                       --top-n-words 10000       \
                                                                                       --return-counts False     \
                                                                                       --use-term-counts True    \
                                                                                        -q > vocabs
!head vocabs
    
    
Make stripes, invert index, and calculate similarities. Print top similarities.
In [315]:
    
%%time
!cat Temp_data/googlebooks-eng-all-5gram-20090715-9* | python MakeStripes.py --file vocabs -q | python InvertIndex.py -q | python Similarity.py -q --jobconf mapred.reduce.tasks=1 > similarities.txt
!head similarities.txt
    
    
It about 3 minutes to run this code. The code processes 11 out of 200 files. It currently uses one machine. If the cluster has 50 machines available, we would expect it to take only a few minutes for these core operations to run.
In [316]:
    
minutes_for_small_job = 3
n_small_jobs_in_big_job = 200/11
total_minutes_one_computer = minutes_for_small_job*n_small_jobs_in_big_job
computers_in_cluster = 50
total_minutes_for_cluster = total_minutes_one_computer/computers_in_cluster
total_minutes_for_cluster
    
    Out[316]:
In [395]:
    
%%writefile CustomPartitioner.py
from mrjob.job import MRJob
from sys import stderr
import numpy as np
from operator import itemgetter
from random import random
class CustomPartitioner(MRJob):
    def __init__(self, *args, **kwargs):
        super(CustomPartitioner, self).__init__(*args, **kwargs)
        self.N = 30
        self.NUM_REDUCERS = 4
    def mapper_init(self):
        def makeKeyHash(key, num_reducers):
            byteof = lambda char: int(format(ord(char), 'b'), 2)
            current_hash = 0
            for c in key:
                current_hash = (current_hash * 31 + byteof(c))
            return current_hash % num_reducers
        # printable ascii characters, starting with 'A'
        keys = [str(chr(i)) for i in range(65,65+self.NUM_REDUCERS)]
        partitions = []
        for key in keys:
            partitions.append([key, makeKeyHash(key, self.NUM_REDUCERS)])
        parts = sorted(partitions,key=itemgetter(1))
        self.partition_keys = list(np.array(parts)[:,0])
        self.partition_file = np.arange(0,self.N,self.N/(self.NUM_REDUCERS))[::-1]
        
        print((keys, partitions, parts, self.partition_keys, self.partition_file), file=stderr)
        
    def mapper(self, _, lines):
        terms, term_count, page_count, book_count = lines.split("\t")
        terms = terms.split()
        term_count = int(term_count)
        
        for item in terms:
            yield (item, term_count)
            
        for item in ["A", "B", "H", "I"]:
            yield (item, 0)
            
    def reducer_init(self):
        self.reducer_unique_key = int(random()*900000+100000)
    
    def reducer(self, keys, values):
        yield (self.reducer_unique_key, (keys, sum(values)))
        
if __name__ == "__main__":
    CustomPartitioner.run()
    
    
In [396]:
    
!cat atlas.txt | python CustomPartitioner.py -r local -q
    
    
Results of experiment so far: Cannot force specific keys into specific partitions when running locally. Will try again on VM.
In [13]:
    
%%writefile ngram.py
from mrjob.job import MRJob
from collections import Counter
import operator
class NGram(MRJob):
    def mapper_init(self):
        self.length = 0
        self.longest = 0
        self.distribution = Counter()
    def mapper(self, _, lines):
        # extract word/count sets
        ngram, count, pages, _ = lines.split("\t")
        count, pages = int(count), int(pages)
        
        # loop to count word length
        words = ngram.lower().split()
        for w in words:
            yield (w, {'count':count, 'pages':pages})
        
        # Count of ngram length
        n_gram_character_count = len(ngram)
        yield n_gram_character_count, count
        
        # determine if longest word on mapper
        if n_gram_character_count > self.length:
            self.length = n_gram_character_count
            self.longest = [words, n_gram_character_count]
            yield (self.longest)
    def combiner(self, word, counts):
        if isinstance(word,str):
            count = 0
            pages = 0
            for x in counts:
                count += x['count']
                pages += x['pages']
            yield word, {'count':count,'pages':pages}
        
        #aggregate counts
        elif isinstance(word,int):
            yield word, sum(counts)
        
        #yield long ngrams
        else:
            for x in counts:
                yield word, x
                  
    def reducer_init(self):
        self.longest = []
        self.length = 0
        self.counts = Counter()
        self.pages = Counter()
        self.distribution = Counter()
    
    def reducer(self, key, values):
        # use Counter word totals
        for val in values:
            if isinstance(key,str):
                self.counts += Counter({key:val['count']})
                self.pages += Counter({key:val['pages']})
        
        # aggregate distribution numbers
            elif isinstance(key,int):
                self.distribution += Counter({key:val})
            else:
                # Determine if longest ngram on reducer
                if val > self.length:
                        self.longest = [key, val]
                        self.length = val
    def reducer_final(self):
        # yield density calculation
        for x in sorted(self.counts):
            yield ('mrj_dens',{x:(1.*self.counts[x]/self.pages[x])})
        
        # Use most_common counter function
        for x in self.counts.most_common(10):
            yield x
        # return longest item
        if self.longest:
            yield self.longest
        
        # yield distribution values
        for x in self.distribution:
            yield ('mrj_dist', {x:self.distribution[x]})
    
if __name__ == "__main__":
    NGram.run()
    
    
In [14]:
    
!python ngram.py --jobconf mapred.reduce.tasks=1 < googlebooks-eng-all-5gram-20090715-0-filtered-first-10-lines.txt -q > dataout.txt
!cat dataout.txt
    
    
In [15]:
    
%matplotlib inline
import json
import operator
import numpy as np
import matplotlib.pyplot as plt
# sorted density list
def density(data):
    x = data
    sorted_x = sorted(x.items(), key=operator.itemgetter(1), reverse=True)
    print sorted_x[:20]
# distribution plot
def distribution(data):
    plt.scatter(data.keys(), data.values(), alpha=0.5)
    plt.show()
# loader
def driver():
    datain = open('dataout.txt','r')
    densdata = {}
    distdata = {}
    # clean the mess I made
    for line in datain:
        parts = line.split('\t')
        temp = parts[1][1:-2].replace('"', '').split(':')
        mrj_val = parts[0].replace('"', '')
        if mrj_val == "mrj_dens":
            densdata[temp[0]]=float(temp[1])
        elif mrj_val == "mrj_dist":
            distdata[int(temp[0])]=int(temp[1])
    
    #Execute density sort
    density(densdata)
    
    #Execute distribution plot
    distribution(distdata)
            
driver()
    
    
    
This section examine the output pairs using nltk library.
For each pair of words, we examine whether one is identified as a synonym of the other by nltk.
Based on the "hit" data, we compute precision, recall and F1 score of the output.
With limited pair in the output, it is possible to run everything within a python script.
We also prepare a mapreduce job in case the number of pairs increase
In [16]:
    
%%writefile NLTKBenchMark.py
import nltk
import json
import numpy as np
from nltk.corpus import wordnet as wn
from mrjob.job import MRJob
from mrjob.step import MRStep
class NLTKBenchMark(MRJob):
    
    def mapper(self, _, lines):
        #parse the output file and identify the pair of words
        pair, avg = lines.split("\t")
        pair = json.loads(pair)
        word1, word2 = pair[0], pair[1]        
        
        hit = 0
        
        #for each word, extract the list of synonyms from nltk corpus, convert to set to remove duplicates
        syn1 = set([l.name() for s in wn.synsets(word1) for l in s.lemmas()])
        syn2 = set([l.name() for s in wn.synsets(word2) for l in s.lemmas()])
        
        #keep track of words that have no synonym using '~nosync'
        if len(syn1) == 0:
            yield '~nosyn', [word1]
        if len(syn2) == 0:
            yield '~nosyn', [word2]
            
        '''
        for each occurence of word, increment the count
        for word A, synset is the number of synonyms of the other word B
        this value is used for calculating recall
        this method becomes confusing/problematic if a word appears multiple times in the final output
        
        if there is a hit for word A, set the hit to 1, and set the hit for the other word B to 0 (to avoid double count)
        if there is not a hit for A and B, set the hit to 0 for both
        '''
        if word2 in syn1:
            yield word2, {'hit':1, 'count':1, 'synset':len(syn1)}
            yield word1, {'hit':0, 'count':1, 'synset':len(syn2)}
        elif word1 in syn2:
            yield word1, {'hit':1, 'count':1, 'synset':len(syn2)}
            yield word2, {'hit':0, 'count':1, 'synset':len(syn1)}
        else:
            yield word1, {'hit':0, 'count':1, 'synset':len(syn2)}
            yield word2, {'hit':0, 'count':1, 'synset':len(syn1)}
        
    def combiner(self, term, values):
        #combine '~nosyn' into a bigger list and yield the list
        if term == '~nosyn':
            nosynList = []
            for value in values:
                nosynList = nosynList+value
            yield term, nosynList
            
        else:
            counters = {'hit':0, 'count':0, 'synset':0}
            for value in values:
                counters['hit'] += value['hit']
                counters['count'] += value['count']
                counters['synset'] = value['synset']
            yield term, counters
        
        
    def reducer_init(self):
        self.plist = []
        self.rlist = []
        self.flist = []
        
    def reducer(self, term, values):
        #yield the final list of words that have no synonym
        if term == '~nosyn':
            nosynList = []
            for value in values:
                nosynList = nosynList+value
            yield term, nosynList
            
        else:
            counters = {'hit':0.0, 'count':0.0, 'synset':0.0}
            precision, recall, F1 = 0,0,0
            for value in values:
                counters['hit'] += value['hit']
                counters['count'] += value['count']
                counters['synset'] = value['synset']
                
            if counters['hit'] > 0 and counters['synset'] > 0:
                precision = float(counters['hit'])/float(counters['count'])
                recall = float(counters['hit'])/float(counters['synset'])
                F1 = 2*precision*recall/(precision+recall)
                
                self.plist.append(precision)
                self.rlist.append(recall)
                self.flist.append(F1)
                yield term, counters
            elif counters['synset'] > 0:
                self.plist.append(precision)
                self.rlist.append(recall)
                self.flist.append(F1)
                yield term, counters
    
    def reducer_final(self):
        #compute the mean of all collected measurements
        yield 'precision', np.mean(self.plist)
        yield 'recall', np.mean(self.rlist)
        yield 'F1', np.mean(self.flist)
        
if __name__ == "__main__":
    NLTKBenchMark.run()
    
    
In [17]:
    
!python NLTKBenchMark.py nltk_bench_sample.txt
    
    
In [18]:
    
''' Performance measures '''
from __future__ import division
import numpy as np
import json
import nltk
from nltk.corpus import wordnet as wn
import sys
#print all the synset element of an element
def synonyms(string):
    syndict = {}
    for i,j in enumerate(wn.synsets(string)):
        syns = j.lemma_names()
        for syn in syns:
            syndict.setdefault(syn,1)
    
    return syndict.keys()
hits = []
TP = 0
FP = 0
TOTAL = 0
flag = False # so we don't double count, but at the same time don't miss hits
## For this part we can use one of three outputs. They are all the same, but were generated differently
# 1. the top 1000 from the full sorted dataset -> sortedSims[:1000]
# 2. the top 1000 from the partial sort aggragate file -> sims2/top1000sims
# 3. the top 1000 from the total order sort file -> head -1000 sims_parts/part-00004
top1000sims = []
with open("nltk_bench_sample.txt","r") as f:
    for line in f.readlines():
        line = line.strip()
        lisst, avg = line.split("\t")
        lisst = eval(lisst)
        lisst.append(avg)
        top1000sims.append(lisst)
    
measures = {}
not_in_wordnet = []
for line in top1000sims:
    TOTAL += 1
    words=line[0:2]
    
    
    for word in words:
        if word not in measures:
            measures[word] = {"syns":0,"opps": 0,"hits":0}
        measures[word]["opps"] += 1 
    
    syns0 = synonyms(words[0])
    measures[words[1]]["syns"] = len(syns0)
    if len(syns0) == 0:
        not_in_wordnet.append(words[0])
        
    if words[1] in syns0:
        TP += 1
        hits.append(line)
        flag = True
        measures[words[1]]["hits"] += 1
        
    syns1 = synonyms(words[1]) 
    measures[words[0]]["syns"] = len(syns1)
    if len(syns1) == 0:
        not_in_wordnet.append(words[1])
    if words[0] in syns1:
        if flag == False:
            TP += 1
            hits.append(line)
            measures[words[0]]["hits"] += 1
            
    flag = False    
precision = []
recall = []
f1 = []
for key in measures:
    p,r,f = 0,0,0
    if measures[key]["hits"] > 0 and measures[key]["syns"] > 0:
        p = measures[key]["hits"]/measures[key]["opps"]
        r = measures[key]["hits"]/measures[key]["syns"]
        f = 2 * (p*r)/(p+r)
    
    # For calculating measures, only take into account words that have synonyms in wordnet
    if measures[key]["syns"] > 0:
        precision.append(p)
        recall.append(r)
        f1.append(f)
    
# Take the mean of each measure   
print "—"*110    
print "Number of Hits:",TP, "out of top",TOTAL
print "Number of words without synonyms:",len(not_in_wordnet)
print "—"*110 
print "Precision\t", np.mean(precision)
print "Recall\t\t", np.mean(recall)
print "F1\t\t", np.mean(f1)
print "—"*110  
print "Words without synonyms:"
print "-"*100
for word in not_in_wordnet:
    print synonyms(word),word