DATSCIW261 ASSIGNMENT

Version 2016-01-27 (FINAL) Week 3 ASSIGNMENTS

Jason Sanchez - Group 2

HW3.0.

How do you merge two sorted lists/arrays of records of the form [key, value]?

  • Merge sort.

Where is this used in Hadoop MapReduce? [Hint within the shuffle]

  • It is used after files are spilled to disk from the circular buffer before the map-side combiner is run as well as after the data is partitioned during the "Hadoop shuffle" and before it is fed into the reduce-side combiner.

What is a combiner function in the context of Hadoop?

  • Combiners improve the speed of MapReduce jobs. Map-side combiners can reduce the amount of data that needs to be transferred over the network by acting as a simplified reducer. Also, long running map jobs block the merge sort that is part of the "Hadoop shuffle" phase. A map-side combiner can run on all of the data that has been processed by the mappers before being blocked by the merge-sort.

Give an example where it can be used and justify why it should be used in the context of this problem.

  • Word count. Greatly reduce data needed to transfer of the network by combining key-value pairs.

What is the Hadoop shuffle?

  • Partition --> Merge sort --> Pass to reduce-side combiner (or directly to reducer)

HW3.1 consumer complaints dataset: Use Counters to do EDA (exploratory data analysis and to monitor progress)

Counters are lightweight objects in Hadoop that allow you to keep track of system progress in both the map and reduce stages of processing. By default, Hadoop defines a number of standard counters in "groups"; these show up in the jobtracker webapp, giving you information such as "Map input records", "Map output records", etc.

While processing information/data using MapReduce job, it is a challenge to monitor the progress of parallel threads running across nodes of distributed clusters. Moreover, it is also complicated to distinguish between the data that has been processed and the data which is yet to be processed. The MapReduce Framework offers a provision of user-defined Counters, which can be effectively utilized to monitor the progress of data across nodes of distributed clusters.

Use the Consumer Complaints Dataset provide here to complete this question:

 https://www.dropbox.com/s/vbalm3yva2rr86m/Consumer_Complaints.csv?dl=0

The consumer complaints dataset consists of diverse consumer complaints, which have been reported across the United States regarding various types of loans. The dataset consists of records of the form:

Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed?

Here’s is the first few lines of the of the Consumer Complaints Dataset:

Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed? 1114245,Debt collection,Medical,Disclosure verification of debt,Not given enough info to verify debt,FL,32219,Web,11/13/2014,11/13/2014,"Choice Recovery, Inc.",Closed with explanation,Yes, 1114488,Debt collection,Medical,Disclosure verification of debt,Right to dispute notice not received,TX,75006,Web,11/13/2014,11/13/2014,"Expert Global Solutions, Inc.",In progress,Yes, 1114255,Bank account or service,Checking account,Deposits and withdrawals,,NY,11102,Web,11/13/2014,11/13/2014,"FNIS (Fidelity National Information Services, Inc.)",In progress,Yes, 1115106,Debt collection,"Other (phone, health club, etc.)",Communication tactics,Frequent or repeated calls,GA,31721,Web,11/13/2014,11/13/2014,"Expert Global Solutions, Inc.",In progress,Yes,

User-defined Counters

Now, let’s use Hadoop Counters to identify the number of complaints pertaining to debt collection, mortgage and other categories (all other categories get lumped into this one) in the consumer complaints dataset. Basically produce the distribution of the Product column in this dataset using counters (limited to 3 counters here).

Hadoop offers Job Tracker, an UI tool to determine the status and statistics of all jobs. Using the job tracker UI, developers can view the Counters that have been created. Screenshot your job tracker UI as your job completes and include it here. Make sure that your user defined counters are visible.

Presumes you have downloaded the file and put it in the "Temp_data" folder.


In [8]:
%%writefile ComplaintDistribution.py
from mrjob.job import MRJob

class ComplaintDistribution(MRJob):
    def mapper(self, _, lines):
        line = lines[:30]
        if "Debt collection" in line:
            self.increment_counter('Complaint', 'Debt collection', 1)
        elif "Mortgage" in line:
            self.increment_counter('Complaint', 'Mortgage', 1)
        else:
            self.increment_counter('Complaint', 'Other', 1)
            
if __name__ == "__main__":
    ComplaintDistribution.run()


Writing ComplaintDistribution.py

In [11]:
%%time
!python ComplaintDistribution.py Temp_data/Consumer_Complaints.csv


No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/ComplaintDistribution.Jason.20160920.074657.421852
Running step 1 of 1...
Counters: 3
	Complaint
		Debt collection=44372
		Mortgage=125752
		Other=142789
Streaming final output from /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/ComplaintDistribution.Jason.20160920.074657.421852/output...
Removing temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/ComplaintDistribution.Jason.20160920.074657.421852...
CPU times: user 28.7 ms, sys: 17.1 ms, total: 45.8 ms
Wall time: 2.97 s

HW 3.2 Analyze the performance of your Mappers, Combiners and Reducers using Counters

For this brief study the Input file will be one record (the next line only):

foo foo quux labs foo bar quux

Perform a word count analysis of this single record dataset using a Mapper and Reducer based WordCount (i.e., no combiners are used here) using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing this word count job. The answer should be 1 and 4 respectively. Please explain.


In [41]:
%%writefile SimpleCounters.py

from mrjob.job import MRJob

class SimpleCounters(MRJob):
    def mapper_init(self):
        self.increment_counter("Mappers", "Count", 1)
    
    def mapper(self, _, lines):
        self.increment_counter("Mappers", "Tasks", 1)
        for word in lines.split():
            yield (word, 1)
    
    def reducer_init(self):
        self.increment_counter("Reducers", "Count", 1)
    
    def reducer(self, word, count):
        self.increment_counter("Reducers", "Tasks", 1)
        yield (word, sum(count))
        
if __name__ == "__main__":
    SimpleCounters.run()


Overwriting SimpleCounters.py

In [53]:
!echo "foo foo quux labs foo bar quux" | python SimpleCounters.py --jobconf mapred.map.tasks=2 --jobconf mapred.reduce.tasks=2


No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/SimpleCounters.Jason.20160920.082703.788299
Running step 1 of 1...
reading from STDIN
Counters: 2
	Mappers
		Count=1
		Tasks=1
Counters: 4
	Mappers
		Count=1
		Tasks=1
	Reducers
		Count=2
		Tasks=4
Streaming final output from /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/SimpleCounters.Jason.20160920.082703.788299/output...
"bar"	1
"foo"	3
"labs"	1
"quux"	2
Removing temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/SimpleCounters.Jason.20160920.082703.788299...

Please use multiple mappers and reducers for these jobs (at least 2 mappers and 2 reducers).

Perform a word count analysis of the Issue column of the Consumer Complaints Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere) using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.


In [64]:
%%writefile IssueCounter.py

from mrjob.job import MRJob
import csv
import sys

class IssueCounter(MRJob):

    def mapper(self, _, lines):
        self.increment_counter("Mappers", "Tasks", 1)
        terms = list(csv.reader([lines]))[0]
        yield (terms[3], 1)
    
    def reducer(self, word, count):
        self.increment_counter("Reducers", "Tasks", 1)
        self.increment_counter("Reducers", "Lines processed", len(list(count)))
        yield (word, sum(count))
        
if __name__ == "__main__":
    IssueCounter.run()


Overwriting IssueCounter.py

In [65]:
!cat Temp_data/Consumer_Complaints.csv | python IssueCounter.py | head -n 1


No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/IssueCounter.Jason.20160921.070459.209407
Running step 1 of 1...
reading from STDIN
Counters: 1
	Mappers
		Tasks=312913
Counters: 3
	Mappers
		Tasks=312913
	Reducers
		Lines processed=312913
		Tasks=80
Streaming final output from /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/IssueCounter.Jason.20160921.070459.209407/output...
""	0
Removing temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/IssueCounter.Jason.20160921.070459.209407...

Mapper tasks = 312913. The mapper was called this many times because that is how many lines there are in the file.

Reducer tasks = 80. The reducer was called this many times because that is how many unique issues there are in the file.

Reducer lines processed = 312913. The reducer was passed all of the data from the mappers.


In [14]:
# We can easily confirm the first hypothesis
!wc -l Temp_data/Consumer_Complaints.csv


  312913 Temp_data/Consumer_Complaints.csv

Perform a word count analysis of the Issue column of the Consumer Complaints Dataset using a Mapper, Reducer, and standalone combiner (i.e., not an in-memory combiner) based WordCount using user defined Counters to count up how many time the mapper, combiner, reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.


In [60]:
%%writefile IssueCounterCombiner.py

from mrjob.job import MRJob
from mrjob.step import MRStep
import csv
import sys

class IssueCounterCombiner(MRJob):
    
    def mapper(self, _, lines):
        self.increment_counter("Mappers", "Tasks", 1)
        terms = list(csv.reader([lines]))[0]
        yield (terms[3], 1)
    
    def combiner(self, word, count):
        self.increment_counter("Combiners", "Tasks", 1)
        yield (word, sum(count))
    
    def reducer(self, word, count):
        self.increment_counter("Reducers", "Tasks", 1)
        self.increment_counter("Reducers", "Lines processed", len(list(count)))
        yield (word, sum(count))
        
if __name__ == "__main__":
    IssueCounterCombiner.run()


Overwriting IssueCounterCombiner.py

In [1]:
%%writefile python_mr_driver.py

from IssueCounterCombiner import IssueCounterCombiner

mr_job = IssueCounterCombiner(args=['Temp_data/Consumer_Complaints.csv'])

with mr_job.make_runner() as runner:
    runner.run() 
    print(runner.counters())
#     for line in runner.stream_output(): 
#         print(mr_job.parse_output_line(line))


Overwriting python_mr_driver.py

In [2]:
results = !python python_mr_driver.py

In [3]:
results


Out[3]:
["[{'Combiners': {'Tasks': 146}, 'Reducers': {'Tasks': 80, 'Lines processed': 146}, 'Mappers': {'Tasks': 312913}}]"]

Although the same amount of map and reduce tasks were called, because 146 combiner tasks were called, my hypothesis would be that the number of observations read by reducers was less. I went back and included a counter that kept track of the lines passed over the network. With the combiner, only 146 observations were passed over the network. This is equal to the number of times the combiner was called (which makes sense because combiners act as map-side reducers and each one would process on a different key of the data and output a single line).

Using a single reducer: What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items).


In [89]:
%%writefile Top50.py

from mrjob.job import MRJob
from mrjob.step import MRStep
import csv
import sys

def order_key(order_in_reducer, key_name):
    number_of_stars = order_in_reducer//10 + 1
    number = str(order_in_reducer%10)
    return "%s %s" % ("*"*number_of_stars+number, key_name)

class Top50(MRJob):

    MRJob.SORT_VALUES = True
        
    def mapper_get_issue(self, _, lines):
        terms = list(csv.reader([lines]))[0]
        issue = terms[3]
        if issue == "":
            issue = "<blank>"
        yield (issue, 1)
    
    def combiner_count_issues(self, word, count):
        yield (word, sum(count))
        
    def reducer_init_totals(self):
        self.issue_counts = []
    
    def reducer_count_issues(self, word, count):
        issue_count = sum(count)
        self.issue_counts.append(int(issue_count))
        yield (word, issue_count)
        
    def reducer_final_emit_counts(self):
        yield (order_key(1, "Total"), sum(self.issue_counts))
        yield (order_key(2, "40th"), sorted(self.issue_counts)[-40])
    
    def reducer_init(self):
        self.increment_counter("Reducers", "Count", 1)
        self.var = {}
    
    def reducer(self, word, count):
        if word.startswith("*"):
            _, term = word.split()
            self.var[term] = next(count)

        else:
            total = sum(count)
            if total >= self.var["40th"]:
                yield (word, (total/self.var["Total"], total))
                
    def mapper_sort(self, key, value):
        value[0] = 1-float(value[0])
        yield value, key
        
    def reducer_sort(self, key, value):
        key[0] = round(1-float(key[0]),3)
        yield key, next(value)

    def steps(self):
        mr_steps = [MRStep(mapper=self.mapper_get_issue,
                           combiner=self.combiner_count_issues,
                           reducer_init=self.reducer_init_totals,
                           reducer=self.reducer_count_issues,
                           reducer_final=self.reducer_final_emit_counts),
                    MRStep(reducer_init=self.reducer_init,
                           reducer=self.reducer),
                    MRStep(mapper=self.mapper_sort,
                           reducer=self.reducer_sort)
                   ]
        return mr_steps
    
        
        
if __name__ == "__main__":
    Top50.run()


Overwriting Top50.py

In [90]:
!head -n 3001 Temp_data/Consumer_Complaints.csv | python Top50.py --jobconf mapred.reduce.tasks=1


No configs found; falling back on auto-configuration
ignoring partitioner keyword arg (requires real Hadoop): 'org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner'
Creating temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/Top50.Jason.20160926.010457.383781
Running step 1 of 3...
reading from STDIN
Running step 2 of 3...
Counters: 1
	Reducers
		Count=1
Running step 3 of 3...
Streaming final output from /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/Top50.Jason.20160926.010457.383781/output...
[0.174, 522]	"Incorrect information on credit report"
[0.116, 349]	"Cont'd attempts collect debt not owed"
[0.098, 294]	"Loan modification,collection,foreclosure"
[0.079, 237]	"Loan servicing, payments, escrow account"
[0.049, 146]	"Communication tactics"
[0.045, 136]	"Disclosure verification of debt"
[0.041, 124]	"Account opening, closing, or management"
[0.036, 109]	"Credit reporting company's investigation"
[0.028, 83]	"Deposits and withdrawals"
[0.021, 62]	"Managing the loan or lease"
[0.02, 61]	"False statements or representation"
[0.017, 50]	"Problems caused by my funds being low"
[0.016, 48]	"Improper contact or sharing of info"
[0.015, 44]	"Application, originator, mortgage broker"
[0.012, 35]	"Problems when you are unable to pay"
[0.011, 34]	"Other"
[0.011, 33]	"Billing disputes"
[0.009, 26]	"Using a debit or ATM card"
[0.007, 22]	"Closing/Cancelling account"
[0.007, 21]	"Improper use of my credit report"
[0.007, 20]	"APR or interest rate"
[0.006, 19]	"Credit decision / Underwriting"
[0.006, 17]	"Taking out the loan or lease"
[0.005, 16]	"Making/receiving payments, sending money"
[0.005, 15]	"Late fee"
[0.004, 13]	"Charged fees or interest I didn't expect"
[0.004, 12]	"Credit determination"
[0.004, 11]	"Shopping for a loan or lease"
[0.003, 10]	"Advertising and marketing"
[0.003, 9]	"Billing statement"
[0.003, 8]	"Fraud or scam"
Removing temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/Top50.Jason.20160926.010457.383781...

3.2.1

Using 2 reducers: What are the top 50 most frequent terms in your word count analysis?

Present the top 50 terms and their frequency and their relative frequency. Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). Please use a combiner.

HW3.3. Shopping Cart Analysis

Product Recommendations: The action or practice of selling additional products or services to existing customers is called cross-selling. Giving product recommendation is one of the examples of cross-selling that are frequently used by online retailers. One simple method to give product recommendations is to recommend products that are frequently browsed together by the customers.

For this homework use the online browsing behavior dataset located at:

   https://www.dropbox.com/s/zlfyiwa70poqg74/ProductPurchaseData.txt?dl=0

Each line in this dataset represents a browsing session of a customer. On each line, each string of 8 characters represents the id of an item browsed during that session. The items are separated by spaces.

Here are the first few lines of the ProductPurchaseData FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444

Do some exploratory data analysis of this dataset guided by the following questions:.

How many unique items are available from this supplier?

Using a single reducer: Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items, their frequency, and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce.


In [91]:
!head -n 10 Temp_data/ProductPurchaseData.txt


FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 
GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 
ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 
ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 
ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444 
ELE17451 GRO73461 DAI22896 SNA99873 FRO18919 DAI50921 SNA80192 GRO75578 
ELE17451 ELE59935 FRO18919 ELE23393 SNA80192 SNA85662 SNA91554 DAI22177 
ELE17451 SNA69641 FRO18919 SNA90258 ELE28573 ELE11375 DAI14125 FRO78087 
ELE17451 GRO73461 DAI22896 SNA80192 SNA85662 SNA90258 DAI46755 FRO81176 ELE66810 DAI49199 DAI91535 GRO94758 ELE94711 DAI22177 
ELE17451 SNA69641 DAI91535 GRO94758 GRO99222 FRO76833 FRO81176 SNA80192 DAI54690 ELE37798 GRO56989 

In [186]:
%%writefile ProductPurchaseStats.py

from mrjob.job import MRJob
from mrjob.step import MRStep
import sys
import heapq


class TopList(list):
    def __init__(self, max_size):
        """
        Just like a list, except the append method adds the new value to the 
        list only if it is larger than the smallest value (or if the size of 
        the list is less than max_size). If each element of the list is an int
        or float, uses that value for comparison. If the first element is a 
        list or tuple, uses the first element of the list or tuple for the 
        comparison.
        """
        self.max_size = max_size
        
    def _get_key(self, x):
        return x[0] if isinstance(x, (list, tuple)) else x
        
    def append(self, val):
        key=lambda x: x[0] if isinstance(x, (list, tuple)) else x
        if len(self) < self.max_size:
            heapq.heappush(self, val)
        elif self._get_key(self[0]) < self._get_key(val):
            heapq.heapreplace(self, val)
            
    def final_sort(self):
        return sorted(self, key=self._get_key, reverse=True)


class ProductPurchaseStats(MRJob):
    
    def mapper_init(self):
        self.largest_basket = 0
        self.total_items = 0
    
    def mapper(self, _, lines):
        products = lines.split()
        n_products = len(products)
        self.total_items += n_products
        if n_products > self.largest_basket:
            self.largest_basket = n_products
        for prod in products:
            yield (prod, 1)
            
    def mapper_final(self):
        self.increment_counter("product stats", "largest basket", self.largest_basket)
        yield ("*** Total", self.total_items)
        
    def combiner(self, keys, values):
        yield keys, sum(values)
        
    def reducer_init(self):
        self.top50 = TopList(50)
        self.total = 0
        
    def reducer(self, key, values):
        value_count = sum(values)
        
        if key == "*** Total":
            self.total = value_count
        else:
            self.increment_counter("product stats", "unique products")
            self.top50.append([value_count, value_count/self.total, key])

    def reducer_final(self):
        for counts, relative_rate, key in self.top50.final_sort():
            yield key, (counts, round(relative_rate,3))
    
if __name__ == "__main__":
    ProductPurchaseStats.run()


Overwriting ProductPurchaseStats.py

In [187]:
!cat Temp_data/ProductPurchaseData.txt | python ProductPurchaseStats.py  --jobconf mapred.reduce.tasks=1


No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/ProductPurchaseStats.Jason.20160926.034901.357619
Running step 1 of 1...
reading from STDIN
Counters: 1
	product stats
		largest basket=74
Counters: 2
	product stats
		largest basket=74
		unique products=12592
Streaming final output from /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/ProductPurchaseStats.Jason.20160926.034901.357619/output...
"DAI62779"	[6667, 0.018]
"FRO40251"	[3881, 0.01]
"ELE17451"	[3875, 0.01]
"GRO73461"	[3602, 0.009]
"SNA80324"	[3044, 0.008]
"ELE32164"	[2851, 0.007]
"DAI75645"	[2736, 0.007]
"SNA45677"	[2455, 0.006]
"FRO31317"	[2330, 0.006]
"DAI85309"	[2293, 0.006]
"ELE26917"	[2292, 0.006]
"FRO80039"	[2233, 0.006]
"GRO21487"	[2115, 0.006]
"SNA99873"	[2083, 0.005]
"GRO59710"	[2004, 0.005]
"GRO71621"	[1920, 0.005]
"FRO85978"	[1918, 0.005]
"GRO30386"	[1840, 0.005]
"ELE74009"	[1816, 0.005]
"GRO56726"	[1784, 0.005]
"DAI63921"	[1773, 0.005]
"GRO46854"	[1756, 0.005]
"ELE66600"	[1713, 0.004]
"DAI83733"	[1712, 0.004]
"FRO32293"	[1702, 0.004]
"ELE66810"	[1697, 0.004]
"SNA55762"	[1646, 0.004]
"DAI22177"	[1627, 0.004]
"FRO78087"	[1531, 0.004]
"ELE99737"	[1516, 0.004]
"ELE34057"	[1489, 0.004]
"GRO94758"	[1489, 0.004]
"FRO35904"	[1436, 0.004]
"FRO53271"	[1420, 0.004]
"SNA93860"	[1407, 0.004]
"SNA90094"	[1390, 0.004]
"GRO38814"	[1352, 0.004]
"ELE56788"	[1345, 0.004]
"GRO61133"	[1321, 0.003]
"ELE74482"	[1316, 0.003]
"DAI88807"	[1316, 0.003]
"ELE59935"	[1311, 0.003]
"SNA96271"	[1295, 0.003]
"DAI43223"	[1290, 0.003]
"ELE91337"	[1289, 0.003]
"GRO15017"	[1275, 0.003]
"DAI31081"	[1261, 0.003]
"GRO81087"	[1220, 0.003]
"DAI22896"	[1219, 0.003]
"GRO85051"	[1214, 0.003]
Removing temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/ProductPurchaseStats.Jason.20160926.034901.357619...

3.3.1 OPTIONAL Using 2 reducers: Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items, their frequency, and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce.

HW3.4. (Computationally prohibitive but then again Hadoop can handle this) Pairs

Suppose we want to recommend new products to the customer based on the products they have already browsed on the online website. Write a map-reduce program to find products which are frequently browsed together. Fix the support count (cooccurence count) to s = 100 (i.e. product pairs need to occur together at least 100 times to be considered frequent) and find pairs of items (sometimes referred to itemsets of size 2 in association rule mining) that have a support count of 100 or more.

List the top 50 product pairs with corresponding support count (aka frequency), and relative frequency or support (number of records where they coccur, the number of records where they coccur/the number of baskets in the dataset) in decreasing order of support for frequent (100>count) itemsets of size 2.

Use the Pairs pattern (lecture 3) to extract these frequent itemsets of size 2. Free free to use combiners if they bring value. Instrument your code with counters for count the number of times your mapper, combiner and reducers are called.

Please output records of the following form for the top 50 pairs (itemsets of size 2):

  item1, item2, support count, support



Fix the ordering of the pairs lexicographically (left to right), and break ties in support (between pairs, if any exist) by taking the first ones in lexicographically increasing order.

Report the compute time for the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers) Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts.


In [249]:
%%writefile PairsRecommender.py

from mrjob.job import MRJob
import heapq
import sys

def all_itemsets_of_size_two(array, key=None, return_type="string", concat_val=" "):
    """
    Generator that yields all valid itemsets of size two
    where each combo is returned in an order sorted by key.
    
    key = None defaults to standard sorting.
    
    return_type: can be "string" or "tuple". If "string", 
    concatenates values with concat_val and returns string.
    If tuple, returns a tuple with two elements.
    """
    array = sorted(array, key=key)
    for index, item in enumerate(array):
        for other_item in array[index:]:
            if item != other_item:
                if return_type == "string":
                    yield "%s%s%s" % (str(item), concat_val, str(other_item))
                else:
                    yield (item, other_item) 

class TopList(list):
    def __init__(self, max_size):
        """
        Just like a list, except the append method adds the new value to the 
        list only if it is larger than the smallest value (or if the size of 
        the list is less than max_size). If each element of the list is an int
        or float, uses that value for comparison. If the first element is a 
        list or tuple, uses the first element of the list or tuple for the 
        comparison.
        """
        self.max_size = max_size
        
    def _get_key(self, x):
        return x[0] if isinstance(x, (list, tuple)) else x
        
    def append(self, val):
        key=lambda x: x[0] if isinstance(x, (list, tuple)) else x
        if len(self) < self.max_size:
            heapq.heappush(self, val)
        elif self._get_key(self[0]) < self._get_key(val):
            heapq.heapreplace(self, val)
            
    def final_sort(self):
        return sorted(self, key=self._get_key, reverse=True)
    
                    
class PairsRecommender(MRJob):
    def mapper_init(self):
        self.total_baskets = 0
    
    def mapper(self, _, lines):
        self.total_baskets += 1
        products = lines.split()
        self.increment_counter("job stats", "number of items", len(products))
        for itemset in all_itemsets_of_size_two(products):
            self.increment_counter("job stats", "number of item combos")
            yield (itemset, 1)
            
    def mapper_final(self):
        self.increment_counter("job stats", "number of baskets", self.total_baskets)
        yield ("*** Total", self.total_baskets)
        
    def combiner(self, key, values):
        self.increment_counter("job stats", "number of keys fed to combiner")
        yield key, sum(values)
    
    def reducer_init(self):
        self.top_values = TopList(50)
        self.total_baskets = 0
    
    def reducer(self, key, values):
        values_sum = sum(values)
        if key == "*** Total":
            self.total_baskets = values_sum
        elif values_sum >= 100:
            self.increment_counter("job stats", "number of unique itemsets >= 100")
            basket_percent = values_sum/self.total_baskets
            self.top_values.append([values_sum, round(basket_percent,3), key])
        else:
            self.increment_counter("job stats", "number of unique itemsets < 100")
            
    def reducer_final(self):
        for values_sum, basket_percent, key in self.top_values.final_sort():
            yield key, (values_sum, basket_percent)
        
if __name__ == "__main__":
    PairsRecommender.run()


Overwriting PairsRecommender.py

In [250]:
%%time
!cat Temp_data/ProductPurchaseData.txt | python PairsRecommender.py  --jobconf mapred.reduce.tasks=1


No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/PairsRecommender.Jason.20160926.045124.502699
Running step 1 of 1...
reading from STDIN
Counters: 4
	job stats
		number of baskets=31101
		number of item combos=2534054
		number of items=380824
		number of keys fed to combiner=1026709
Counters: 6
	job stats
		number of baskets=31101
		number of item combos=2534054
		number of items=380824
		number of itemsets < 100=875761
		number of itemsets >= 100=1334
		number of keys fed to combiner=1026709
Streaming final output from /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/PairsRecommender.Jason.20160926.045124.502699/output...
"DAI62779 ELE17451"	[1592, 0.051]
"FRO40251 SNA80324"	[1412, 0.045]
"DAI75645 FRO40251"	[1254, 0.04]
"FRO40251 GRO85051"	[1213, 0.039]
"DAI62779 GRO73461"	[1139, 0.037]
"DAI75645 SNA80324"	[1130, 0.036]
"DAI62779 FRO40251"	[1070, 0.034]
"DAI62779 SNA80324"	[923, 0.03]
"DAI62779 DAI85309"	[918, 0.03]
"ELE32164 GRO59710"	[911, 0.029]
"DAI62779 DAI75645"	[882, 0.028]
"FRO40251 GRO73461"	[882, 0.028]
"DAI62779 ELE92920"	[877, 0.028]
"FRO40251 FRO92469"	[835, 0.027]
"DAI62779 ELE32164"	[832, 0.027]
"DAI75645 GRO73461"	[712, 0.023]
"DAI43223 ELE32164"	[711, 0.023]
"DAI62779 GRO30386"	[709, 0.023]
"ELE17451 FRO40251"	[697, 0.022]
"DAI85309 ELE99737"	[659, 0.021]
"DAI62779 ELE26917"	[650, 0.021]
"GRO21487 GRO73461"	[631, 0.02]
"DAI62779 SNA45677"	[604, 0.019]
"ELE17451 SNA80324"	[597, 0.019]
"DAI62779 GRO71621"	[595, 0.019]
"DAI62779 SNA55762"	[593, 0.019]
"DAI62779 DAI83733"	[586, 0.019]
"ELE17451 GRO73461"	[580, 0.019]
"GRO73461 SNA80324"	[562, 0.018]
"DAI62779 GRO59710"	[561, 0.018]
"DAI62779 FRO80039"	[550, 0.018]
"DAI75645 ELE17451"	[547, 0.018]
"DAI62779 SNA93860"	[537, 0.017]
"DAI55148 DAI62779"	[526, 0.017]
"DAI43223 GRO59710"	[512, 0.016]
"ELE17451 ELE32164"	[511, 0.016]
"DAI62779 SNA18336"	[506, 0.016]
"ELE32164 GRO73461"	[486, 0.016]
"DAI62779 FRO78087"	[482, 0.015]
"DAI85309 ELE17451"	[482, 0.015]
"DAI62779 GRO94758"	[479, 0.015]
"DAI62779 GRO21487"	[471, 0.015]
"GRO85051 SNA80324"	[471, 0.015]
"ELE17451 GRO30386"	[468, 0.015]
"FRO85978 SNA95666"	[463, 0.015]
"DAI62779 FRO19221"	[462, 0.015]
"DAI62779 GRO46854"	[461, 0.015]
"DAI43223 DAI62779"	[459, 0.015]
"ELE92920 SNA18336"	[455, 0.015]
"DAI88079 FRO40251"	[446, 0.014]
Removing temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/PairsRecommender.Jason.20160926.045124.502699...
CPU times: user 770 ms, sys: 206 ms, total: 975 ms
Wall time: 1min 30s

In [248]:
!system_profiler SPHardwareDataType


Hardware:

    Hardware Overview:

      Model Name: MacBook Pro
      Model Identifier: MacBookPro12,1
      Processor Name: Intel Core i7
      Processor Speed: 3.1 GHz
      Number of Processors: 1
      Total Number of Cores: 2
      L2 Cache (per Core): 256 KB
      L3 Cache: 4 MB
      Memory: 16 GB
      Boot ROM Version: MBP121.0167.B17
      SMC Version (system): 2.28f7
      Serial Number (system): C02RT071FVH9
      Hardware UUID: D12CBB32-4EFD-5F0A-83B5-3E6FE291C8E1

HW3.5: Stripes

Repeat 3.4 using the stripes design pattern for finding cooccuring pairs.

Report the compute times for stripes job versus the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)

Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts. Discuss the differences in these counts between the Pairs and Stripes jobs

OPTIONAL: all HW below this are optional


In [315]:
%%writefile StripesRecommender.py

from mrjob.job import MRJob
from collections import Counter
import sys
import heapq

def all_itemsets_of_size_two_stripes(array, key=None):
    """
    Generator that yields all valid itemsets of size two
    where each combo is as a stripe.
    
    key = None defaults to standard sorting.
    """
    array = sorted(array, key=key)
    for index, item in enumerate(array[:-1]):
        yield (item, {val:1 for val in array[index+1:]})

class TopList(list):
    def __init__(self, max_size):
        """
        Just like a list, except the append method adds the new value to the 
        list only if it is larger than the smallest value (or if the size of 
        the list is less than max_size). If each element of the list is an int
        or float, uses that value for comparison. If the first element is a 
        list or tuple, uses the first element of the list or tuple for the 
        comparison.
        """
        self.max_size = max_size
        
    def _get_key(self, x):
        return x[0] if isinstance(x, (list, tuple)) else x
        
    def append(self, val):
        key=lambda x: x[0] if isinstance(x, (list, tuple)) else x
        if len(self) < self.max_size:
            heapq.heappush(self, val)
        elif self._get_key(self[0]) < self._get_key(val):
            heapq.heapreplace(self, val)
            
    def final_sort(self):
        return sorted(self, key=self._get_key, reverse=True)
    
        
class StripesRecommender(MRJob):
    
    def mapper_init(self):
        self.basket_count = 0
    
    def mapper(self, _, lines):
        self.basket_count += 1
        products = lines.split()
        for item, value in all_itemsets_of_size_two_stripes(products):
            yield item, value
            
    def mapper_final(self):
        yield ("*** Total", {"total": self.basket_count})
        
    def combiner(self, keys, values):
        values_sum = Counter()
        for val in values:
            values_sum += Counter(val)
        yield keys, dict(values_sum)
    
    def reducer_init(self):
        self.top = TopList(50)
    
    def reducer(self, keys, values):
        values_sum = Counter()
        for val in values:
            values_sum += Counter(val)

        if keys == "*** Total":            
            self.total = values_sum["total"]
        else:
            for k, v in values_sum.items():
                if v >= 100:
                    self.top.append([v, round(v/self.total,3), keys+" "+k])

    def reducer_final(self):
        for count, perc, key in self.top.final_sort():
            yield key, (count, perc)
                    
if __name__ == "__main__":
    StripesRecommender.run()


Overwriting StripesRecommender.py

In [316]:
%%time
!cat Temp_data/ProductPurchaseData.txt | python StripesRecommender.py  --jobconf mapred.reduce.tasks=1


No configs found; falling back on auto-configuration
Creating temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/StripesRecommender.Jason.20160926.054535.885306
Running step 1 of 1...
reading from STDIN
Streaming final output from /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/StripesRecommender.Jason.20160926.054535.885306/output...
"DAI62779 ELE17451"	[1592, 0.051]
"FRO40251 SNA80324"	[1412, 0.045]
"DAI75645 FRO40251"	[1254, 0.04]
"FRO40251 GRO85051"	[1213, 0.039]
"DAI62779 GRO73461"	[1139, 0.037]
"DAI75645 SNA80324"	[1130, 0.036]
"DAI62779 FRO40251"	[1070, 0.034]
"DAI62779 SNA80324"	[923, 0.03]
"DAI62779 DAI85309"	[918, 0.03]
"ELE32164 GRO59710"	[911, 0.029]
"DAI62779 DAI75645"	[882, 0.028]
"FRO40251 GRO73461"	[882, 0.028]
"DAI62779 ELE92920"	[877, 0.028]
"FRO40251 FRO92469"	[835, 0.027]
"DAI62779 ELE32164"	[832, 0.027]
"DAI75645 GRO73461"	[712, 0.023]
"DAI43223 ELE32164"	[711, 0.023]
"DAI62779 GRO30386"	[709, 0.023]
"ELE17451 FRO40251"	[697, 0.022]
"DAI85309 ELE99737"	[659, 0.021]
"DAI62779 ELE26917"	[650, 0.021]
"GRO21487 GRO73461"	[631, 0.02]
"DAI62779 SNA45677"	[604, 0.019]
"ELE17451 SNA80324"	[597, 0.019]
"DAI62779 GRO71621"	[595, 0.019]
"DAI62779 SNA55762"	[593, 0.019]
"DAI62779 DAI83733"	[586, 0.019]
"ELE17451 GRO73461"	[580, 0.019]
"GRO73461 SNA80324"	[562, 0.018]
"DAI62779 GRO59710"	[561, 0.018]
"DAI62779 FRO80039"	[550, 0.018]
"DAI75645 ELE17451"	[547, 0.018]
"DAI62779 SNA93860"	[537, 0.017]
"DAI55148 DAI62779"	[526, 0.017]
"DAI43223 GRO59710"	[512, 0.016]
"ELE17451 ELE32164"	[511, 0.016]
"DAI62779 SNA18336"	[506, 0.016]
"ELE32164 GRO73461"	[486, 0.016]
"DAI85309 ELE17451"	[482, 0.015]
"DAI62779 FRO78087"	[482, 0.015]
"DAI62779 GRO94758"	[479, 0.015]
"DAI62779 GRO21487"	[471, 0.015]
"GRO85051 SNA80324"	[471, 0.015]
"ELE17451 GRO30386"	[468, 0.015]
"FRO85978 SNA95666"	[463, 0.015]
"DAI62779 FRO19221"	[462, 0.015]
"DAI62779 GRO46854"	[461, 0.015]
"DAI43223 DAI62779"	[459, 0.015]
"ELE92920 SNA18336"	[455, 0.015]
"DAI88079 FRO40251"	[446, 0.014]
Removing temp directory /var/folders/sz/4k2bbjts7x5fmg9sn7kh6hlw0000gn/T/StripesRecommender.Jason.20160926.054535.885306...
CPU times: user 209 ms, sys: 59.2 ms, total: 268 ms
Wall time: 24.3 s

The pairs operation took 1 minute 30 seconds. The stripes operation took 24 seconds, which is about a quarter of the time for pairs.

HW3.6 Computing Relative Frequencies on 100K WikiPedia pages (93Meg)

Dataset description For this assignment you will explore a set of 100,000 Wikipedia documents:

https://www.dropbox.com/s/n5lfbnztclo93ej/wikitext_100k.txt?dl=0 s3://cs9223/wikitext_100k.txt, or https://s3.amazonaws.com/cs9223/wikitext_100k.txt Each line in this file consists of the plain text extracted from a Wikipedia document.

Task Compute the relative frequencies of each word that occurs in the documents in wikitext_100k.txt and output the top 100 word pairs sorted by decreasing order of relative frequency.

Recall that the relative frequency (RF) of word B given word A is defined as follows:

f(B|A) = Count(A, B) / Count (A) = Count(A, B) / sum_B'(Count (A, B')

where count(A,B) is the number of times A and B co-occur within a window of two words (co-occurrence window size of two) in a document and count(A) the number of times A occurs with anything else. Intuitively, given a document collection, the relative frequency captures the proportion of time the word B appears in the same document as A. (See Section 3.3, in Data-Intensive Text Processing with MapReduce).

In the async lecture you learned different approaches to do this, and in this assignment, you will implement them:

a. Write a mapreduce program which uses the Stripes approach and writes its output in a file named rfstripes.txt

b. Write a mapreduce program which uses the Pairs approach and writes its output in a file named rfpairs.txt

c. Compare the performance of the two approaches and output the relative performance to a file named rfcomp.txt. Compute the relative performance as follows: (running time for Pairs/ running time for Stripes). Also include an analysis comparing the communication costs for the two approaches. Instrument your mapper and reduces for counters where necessary to aid with your analysis.

NOTE: please limit your analysis to the top 100 word pairs sorted by decreasing order of relative frequency for each word (tokens with all alphabetical letters).

Please include markdown cell named rf.txt that describes the following:

the input/output format in each Hadoop task, i.e., the keys for the mappers and reducers the Hadoop cluster settings you used, i.e., number of mappers and reducers the running time for each approach: pairs and stripes

You can write your program using Python or MrJob (with Hadoop streaming) and you should run it on AWS. It is a good idea to develop and test your program on a local machine before deploying on AWS. Remember your notebook, needs to have all the commands you used to run each Mapreduce job (i.e., pairs and stripes) -- include the Hadoop streaming commands you used to run your jobs.

In addition the All the following files should be compressed in one ZIP file and submitted. The ZIP file should contain:

A. The result files: rfstripes.txt, rfpairs.txt, rfcomp.txt

Prior to working with Hadoop, the corpus should first be preprocessed as follows: perform tokenization (whitespace and all non-alphabetic characters) and stopword removal using standard tools from the Lucene search engine. All tokens should then be replaced with unique integers for a more efficient encoding.

== Preliminary information for the remaing HW problems===

Much of this homework beyond this point will focus on the Apriori algorithm for frequent itemset mining and the additional step for extracting association rules from these frequent itemsets. Please acquaint yourself with the background information (below) before approaching the remaining assignments.

=== Apriori background information ===

Some background material for the Apriori algorithm is located at:

Association Rules are frequently used for Market Basket Analysis (MBA) by retailers to understand the purchase behavior of their customers. This information can be then used for many different purposes such as cross-selling and up-selling of products, sales promotions, loyalty programs, store design, discount plans and many others. Evaluation of item sets: Once you have found the frequent itemsets of a dataset, you need to choose a subset of them as your recommendations. Commonly used metrics for measuring significance and interest for selecting rules for recommendations are: confidence; lift; and conviction.

HW3.7 Apriori Algorithm

What is the Apriori algorithm? Describe an example use in your domain of expertise and what kind of . Define confidence and lift.

NOTE: For the remaining homework use the online browsing behavior dataset located at (same dataset as used above):

   https://www.dropbox.com/s/zlfyiwa70poqg74/ProductPurchaseData.txt?dl=0

Each line in this dataset represents a browsing session of a customer. On each line, each string of 8 characters represents the id of an item browsed during that session. The items are separated by spaces.

Here are the first few lines of the ProductPurchaseData FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444

HW3.8. Shopping Cart Analysis

Product Recommendations: The action or practice of selling additional products or services to existing customers is called cross-selling. Giving product recommendation is one of the examples of cross-selling that are frequently used by online retailers. One simple method to give product recommendations is to recommend products that are frequently browsed together by the customers.

Suppose we want to recommend new products to the customer based on the products they have already browsed on the online website. Write a program using the A-priori algorithm to find products which are frequently browsed together. Fix the support to s = 100 (i.e. product sets need to occur together at least 100 times to be considered frequent) and find itemsets of size 2 and 3.

Then extract association rules from these frequent items.

A rule is of the form:

(item1, item5) ⇒ item2.

List the top 10 discovered rules in descreasing order of confidence in the following format

(item1, item5) ⇒ item2, supportCount ,support, confidence

HW3.8

Benchmark your results using the pyFIM implementation of the Apriori algorithm (Apriori - Association Rule Induction / Frequent Item Set Mining implemented by Christian Borgelt). You can download pyFIM from here:

http://www.borgelt.net/pyfim.html

Comment on the results from both implementations (your Hadoop MapReduce of apriori versus pyFIM) in terms of results and execution times.

END OF HOMEWORK


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [320]:
set([1,2,3])


Out[320]:
{1, 2, 3}

In [319]:
[1,2,3][:-1]


Out[319]:
[1, 2]