How do you merge two sorted lists/arrays of records of the form [key, value]?
Where is this used in Hadoop MapReduce? [Hint within the shuffle]
What is a combiner function in the context of Hadoop?
Give an example where it can be used and justify why it should be used in the context of this problem.
What is the Hadoop shuffle?
Counters are lightweight objects in Hadoop that allow you to keep track of system progress in both the map and reduce stages of processing. By default, Hadoop defines a number of standard counters in "groups"; these show up in the jobtracker webapp, giving you information such as "Map input records", "Map output records", etc.
While processing information/data using MapReduce job, it is a challenge to monitor the progress of parallel threads running across nodes of distributed clusters. Moreover, it is also complicated to distinguish between the data that has been processed and the data which is yet to be processed. The MapReduce Framework offers a provision of user-defined Counters, which can be effectively utilized to monitor the progress of data across nodes of distributed clusters.
Use the Consumer Complaints Dataset provide here to complete this question:
https://www.dropbox.com/s/vbalm3yva2rr86m/Consumer_Complaints.csv?dl=0
The consumer complaints dataset consists of diverse consumer complaints, which have been reported across the United States regarding various types of loans. The dataset consists of records of the form:
Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed?
Here’s is the first few lines of the of the Consumer Complaints Dataset:
Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed? 1114245,Debt collection,Medical,Disclosure verification of debt,Not given enough info to verify debt,FL,32219,Web,11/13/2014,11/13/2014,"Choice Recovery, Inc.",Closed with explanation,Yes, 1114488,Debt collection,Medical,Disclosure verification of debt,Right to dispute notice not received,TX,75006,Web,11/13/2014,11/13/2014,"Expert Global Solutions, Inc.",In progress,Yes, 1114255,Bank account or service,Checking account,Deposits and withdrawals,,NY,11102,Web,11/13/2014,11/13/2014,"FNIS (Fidelity National Information Services, Inc.)",In progress,Yes, 1115106,Debt collection,"Other (phone, health club, etc.)",Communication tactics,Frequent or repeated calls,GA,31721,Web,11/13/2014,11/13/2014,"Expert Global Solutions, Inc.",In progress,Yes,
User-defined Counters
Now, let’s use Hadoop Counters to identify the number of complaints pertaining to debt collection, mortgage and other categories (all other categories get lumped into this one) in the consumer complaints dataset. Basically produce the distribution of the Product column in this dataset using counters (limited to 3 counters here).
Hadoop offers Job Tracker, an UI tool to determine the status and statistics of all jobs. Using the job tracker UI, developers can view the Counters that have been created. Screenshot your job tracker UI as your job completes and include it here. Make sure that your user defined counters are visible.
Presumes you have downloaded the file and put it in the "Temp_data" folder.
In [8]:
%%writefile ComplaintDistribution.py
from mrjob.job import MRJob
class ComplaintDistribution(MRJob):
def mapper(self, _, lines):
line = lines[:30]
if "Debt collection" in line:
self.increment_counter('Complaint', 'Debt collection', 1)
elif "Mortgage" in line:
self.increment_counter('Complaint', 'Mortgage', 1)
else:
self.increment_counter('Complaint', 'Other', 1)
if __name__ == "__main__":
ComplaintDistribution.run()
In [11]:
%%time
!python ComplaintDistribution.py Temp_data/Consumer_Complaints.csv
foo foo quux labs foo bar quux
In [41]:
%%writefile SimpleCounters.py
from mrjob.job import MRJob
class SimpleCounters(MRJob):
def mapper_init(self):
self.increment_counter("Mappers", "Count", 1)
def mapper(self, _, lines):
self.increment_counter("Mappers", "Tasks", 1)
for word in lines.split():
yield (word, 1)
def reducer_init(self):
self.increment_counter("Reducers", "Count", 1)
def reducer(self, word, count):
self.increment_counter("Reducers", "Tasks", 1)
yield (word, sum(count))
if __name__ == "__main__":
SimpleCounters.run()
In [53]:
!echo "foo foo quux labs foo bar quux" | python SimpleCounters.py --jobconf mapred.map.tasks=2 --jobconf mapred.reduce.tasks=2
In [64]:
%%writefile IssueCounter.py
from mrjob.job import MRJob
import csv
import sys
class IssueCounter(MRJob):
def mapper(self, _, lines):
self.increment_counter("Mappers", "Tasks", 1)
terms = list(csv.reader([lines]))[0]
yield (terms[3], 1)
def reducer(self, word, count):
self.increment_counter("Reducers", "Tasks", 1)
self.increment_counter("Reducers", "Lines processed", len(list(count)))
yield (word, sum(count))
if __name__ == "__main__":
IssueCounter.run()
In [65]:
!cat Temp_data/Consumer_Complaints.csv | python IssueCounter.py | head -n 1
Mapper tasks = 312913. The mapper was called this many times because that is how many lines there are in the file.
Reducer tasks = 80. The reducer was called this many times because that is how many unique issues there are in the file.
Reducer lines processed = 312913. The reducer was passed all of the data from the mappers.
In [14]:
# We can easily confirm the first hypothesis
!wc -l Temp_data/Consumer_Complaints.csv
In [60]:
%%writefile IssueCounterCombiner.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv
import sys
class IssueCounterCombiner(MRJob):
def mapper(self, _, lines):
self.increment_counter("Mappers", "Tasks", 1)
terms = list(csv.reader([lines]))[0]
yield (terms[3], 1)
def combiner(self, word, count):
self.increment_counter("Combiners", "Tasks", 1)
yield (word, sum(count))
def reducer(self, word, count):
self.increment_counter("Reducers", "Tasks", 1)
self.increment_counter("Reducers", "Lines processed", len(list(count)))
yield (word, sum(count))
if __name__ == "__main__":
IssueCounterCombiner.run()
In [1]:
%%writefile python_mr_driver.py
from IssueCounterCombiner import IssueCounterCombiner
mr_job = IssueCounterCombiner(args=['Temp_data/Consumer_Complaints.csv'])
with mr_job.make_runner() as runner:
runner.run()
print(runner.counters())
# for line in runner.stream_output():
# print(mr_job.parse_output_line(line))
In [2]:
results = !python python_mr_driver.py
In [3]:
results
Out[3]:
Although the same amount of map and reduce tasks were called, because 146 combiner tasks were called, my hypothesis would be that the number of observations read by reducers was less. I went back and included a counter that kept track of the lines passed over the network. With the combiner, only 146 observations were passed over the network. This is equal to the number of times the combiner was called (which makes sense because combiners act as map-side reducers and each one would process on a different key of the data and output a single line).
In [89]:
%%writefile Top50.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv
import sys
def order_key(order_in_reducer, key_name):
number_of_stars = order_in_reducer//10 + 1
number = str(order_in_reducer%10)
return "%s %s" % ("*"*number_of_stars+number, key_name)
class Top50(MRJob):
MRJob.SORT_VALUES = True
def mapper_get_issue(self, _, lines):
terms = list(csv.reader([lines]))[0]
issue = terms[3]
if issue == "":
issue = "<blank>"
yield (issue, 1)
def combiner_count_issues(self, word, count):
yield (word, sum(count))
def reducer_init_totals(self):
self.issue_counts = []
def reducer_count_issues(self, word, count):
issue_count = sum(count)
self.issue_counts.append(int(issue_count))
yield (word, issue_count)
def reducer_final_emit_counts(self):
yield (order_key(1, "Total"), sum(self.issue_counts))
yield (order_key(2, "40th"), sorted(self.issue_counts)[-40])
def reducer_init(self):
self.increment_counter("Reducers", "Count", 1)
self.var = {}
def reducer(self, word, count):
if word.startswith("*"):
_, term = word.split()
self.var[term] = next(count)
else:
total = sum(count)
if total >= self.var["40th"]:
yield (word, (total/self.var["Total"], total))
def mapper_sort(self, key, value):
value[0] = 1-float(value[0])
yield value, key
def reducer_sort(self, key, value):
key[0] = round(1-float(key[0]),3)
yield key, next(value)
def steps(self):
mr_steps = [MRStep(mapper=self.mapper_get_issue,
combiner=self.combiner_count_issues,
reducer_init=self.reducer_init_totals,
reducer=self.reducer_count_issues,
reducer_final=self.reducer_final_emit_counts),
MRStep(reducer_init=self.reducer_init,
reducer=self.reducer),
MRStep(mapper=self.mapper_sort,
reducer=self.reducer_sort)
]
return mr_steps
if __name__ == "__main__":
Top50.run()
In [90]:
!head -n 3001 Temp_data/Consumer_Complaints.csv | python Top50.py --jobconf mapred.reduce.tasks=1
Using 2 reducers: What are the top 50 most frequent terms in your word count analysis?
Present the top 50 terms and their frequency and their relative frequency. Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). Please use a combiner.
Product Recommendations: The action or practice of selling additional products or services to existing customers is called cross-selling. Giving product recommendation is one of the examples of cross-selling that are frequently used by online retailers. One simple method to give product recommendations is to recommend products that are frequently browsed together by the customers.
For this homework use the online browsing behavior dataset located at:
https://www.dropbox.com/s/zlfyiwa70poqg74/ProductPurchaseData.txt?dl=0
Each line in this dataset represents a browsing session of a customer. On each line, each string of 8 characters represents the id of an item browsed during that session. The items are separated by spaces.
Here are the first few lines of the ProductPurchaseData FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444
Do some exploratory data analysis of this dataset guided by the following questions:.
How many unique items are available from this supplier?
Using a single reducer: Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items, their frequency, and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce.
In [91]:
!head -n 10 Temp_data/ProductPurchaseData.txt
In [186]:
%%writefile ProductPurchaseStats.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import sys
import heapq
class TopList(list):
def __init__(self, max_size):
"""
Just like a list, except the append method adds the new value to the
list only if it is larger than the smallest value (or if the size of
the list is less than max_size). If each element of the list is an int
or float, uses that value for comparison. If the first element is a
list or tuple, uses the first element of the list or tuple for the
comparison.
"""
self.max_size = max_size
def _get_key(self, x):
return x[0] if isinstance(x, (list, tuple)) else x
def append(self, val):
key=lambda x: x[0] if isinstance(x, (list, tuple)) else x
if len(self) < self.max_size:
heapq.heappush(self, val)
elif self._get_key(self[0]) < self._get_key(val):
heapq.heapreplace(self, val)
def final_sort(self):
return sorted(self, key=self._get_key, reverse=True)
class ProductPurchaseStats(MRJob):
def mapper_init(self):
self.largest_basket = 0
self.total_items = 0
def mapper(self, _, lines):
products = lines.split()
n_products = len(products)
self.total_items += n_products
if n_products > self.largest_basket:
self.largest_basket = n_products
for prod in products:
yield (prod, 1)
def mapper_final(self):
self.increment_counter("product stats", "largest basket", self.largest_basket)
yield ("*** Total", self.total_items)
def combiner(self, keys, values):
yield keys, sum(values)
def reducer_init(self):
self.top50 = TopList(50)
self.total = 0
def reducer(self, key, values):
value_count = sum(values)
if key == "*** Total":
self.total = value_count
else:
self.increment_counter("product stats", "unique products")
self.top50.append([value_count, value_count/self.total, key])
def reducer_final(self):
for counts, relative_rate, key in self.top50.final_sort():
yield key, (counts, round(relative_rate,3))
if __name__ == "__main__":
ProductPurchaseStats.run()
In [187]:
!cat Temp_data/ProductPurchaseData.txt | python ProductPurchaseStats.py --jobconf mapred.reduce.tasks=1
3.3.1 OPTIONAL Using 2 reducers: Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items, their frequency, and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce.
Suppose we want to recommend new products to the customer based on the products they have already browsed on the online website. Write a map-reduce program to find products which are frequently browsed together. Fix the support count (cooccurence count) to s = 100 (i.e. product pairs need to occur together at least 100 times to be considered frequent) and find pairs of items (sometimes referred to itemsets of size 2 in association rule mining) that have a support count of 100 or more.
List the top 50 product pairs with corresponding support count (aka frequency), and relative frequency or support (number of records where they coccur, the number of records where they coccur/the number of baskets in the dataset) in decreasing order of support for frequent (100>count) itemsets of size 2.
Use the Pairs pattern (lecture 3) to extract these frequent itemsets of size 2. Free free to use combiners if they bring value. Instrument your code with counters for count the number of times your mapper, combiner and reducers are called.
Please output records of the following form for the top 50 pairs (itemsets of size 2):
item1, item2, support count, support
Fix the ordering of the pairs lexicographically (left to right), and break ties in support (between pairs, if any exist) by taking the first ones in lexicographically increasing order.
Report the compute time for the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers) Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts.
In [249]:
%%writefile PairsRecommender.py
from mrjob.job import MRJob
import heapq
import sys
def all_itemsets_of_size_two(array, key=None, return_type="string", concat_val=" "):
"""
Generator that yields all valid itemsets of size two
where each combo is returned in an order sorted by key.
key = None defaults to standard sorting.
return_type: can be "string" or "tuple". If "string",
concatenates values with concat_val and returns string.
If tuple, returns a tuple with two elements.
"""
array = sorted(array, key=key)
for index, item in enumerate(array):
for other_item in array[index:]:
if item != other_item:
if return_type == "string":
yield "%s%s%s" % (str(item), concat_val, str(other_item))
else:
yield (item, other_item)
class TopList(list):
def __init__(self, max_size):
"""
Just like a list, except the append method adds the new value to the
list only if it is larger than the smallest value (or if the size of
the list is less than max_size). If each element of the list is an int
or float, uses that value for comparison. If the first element is a
list or tuple, uses the first element of the list or tuple for the
comparison.
"""
self.max_size = max_size
def _get_key(self, x):
return x[0] if isinstance(x, (list, tuple)) else x
def append(self, val):
key=lambda x: x[0] if isinstance(x, (list, tuple)) else x
if len(self) < self.max_size:
heapq.heappush(self, val)
elif self._get_key(self[0]) < self._get_key(val):
heapq.heapreplace(self, val)
def final_sort(self):
return sorted(self, key=self._get_key, reverse=True)
class PairsRecommender(MRJob):
def mapper_init(self):
self.total_baskets = 0
def mapper(self, _, lines):
self.total_baskets += 1
products = lines.split()
self.increment_counter("job stats", "number of items", len(products))
for itemset in all_itemsets_of_size_two(products):
self.increment_counter("job stats", "number of item combos")
yield (itemset, 1)
def mapper_final(self):
self.increment_counter("job stats", "number of baskets", self.total_baskets)
yield ("*** Total", self.total_baskets)
def combiner(self, key, values):
self.increment_counter("job stats", "number of keys fed to combiner")
yield key, sum(values)
def reducer_init(self):
self.top_values = TopList(50)
self.total_baskets = 0
def reducer(self, key, values):
values_sum = sum(values)
if key == "*** Total":
self.total_baskets = values_sum
elif values_sum >= 100:
self.increment_counter("job stats", "number of unique itemsets >= 100")
basket_percent = values_sum/self.total_baskets
self.top_values.append([values_sum, round(basket_percent,3), key])
else:
self.increment_counter("job stats", "number of unique itemsets < 100")
def reducer_final(self):
for values_sum, basket_percent, key in self.top_values.final_sort():
yield key, (values_sum, basket_percent)
if __name__ == "__main__":
PairsRecommender.run()
In [250]:
%%time
!cat Temp_data/ProductPurchaseData.txt | python PairsRecommender.py --jobconf mapred.reduce.tasks=1
In [248]:
!system_profiler SPHardwareDataType
Repeat 3.4 using the stripes design pattern for finding cooccuring pairs.
Report the compute times for stripes job versus the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)
Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts. Discuss the differences in these counts between the Pairs and Stripes jobs
OPTIONAL: all HW below this are optional
In [315]:
%%writefile StripesRecommender.py
from mrjob.job import MRJob
from collections import Counter
import sys
import heapq
def all_itemsets_of_size_two_stripes(array, key=None):
"""
Generator that yields all valid itemsets of size two
where each combo is as a stripe.
key = None defaults to standard sorting.
"""
array = sorted(array, key=key)
for index, item in enumerate(array[:-1]):
yield (item, {val:1 for val in array[index+1:]})
class TopList(list):
def __init__(self, max_size):
"""
Just like a list, except the append method adds the new value to the
list only if it is larger than the smallest value (or if the size of
the list is less than max_size). If each element of the list is an int
or float, uses that value for comparison. If the first element is a
list or tuple, uses the first element of the list or tuple for the
comparison.
"""
self.max_size = max_size
def _get_key(self, x):
return x[0] if isinstance(x, (list, tuple)) else x
def append(self, val):
key=lambda x: x[0] if isinstance(x, (list, tuple)) else x
if len(self) < self.max_size:
heapq.heappush(self, val)
elif self._get_key(self[0]) < self._get_key(val):
heapq.heapreplace(self, val)
def final_sort(self):
return sorted(self, key=self._get_key, reverse=True)
class StripesRecommender(MRJob):
def mapper_init(self):
self.basket_count = 0
def mapper(self, _, lines):
self.basket_count += 1
products = lines.split()
for item, value in all_itemsets_of_size_two_stripes(products):
yield item, value
def mapper_final(self):
yield ("*** Total", {"total": self.basket_count})
def combiner(self, keys, values):
values_sum = Counter()
for val in values:
values_sum += Counter(val)
yield keys, dict(values_sum)
def reducer_init(self):
self.top = TopList(50)
def reducer(self, keys, values):
values_sum = Counter()
for val in values:
values_sum += Counter(val)
if keys == "*** Total":
self.total = values_sum["total"]
else:
for k, v in values_sum.items():
if v >= 100:
self.top.append([v, round(v/self.total,3), keys+" "+k])
def reducer_final(self):
for count, perc, key in self.top.final_sort():
yield key, (count, perc)
if __name__ == "__main__":
StripesRecommender.run()
In [316]:
%%time
!cat Temp_data/ProductPurchaseData.txt | python StripesRecommender.py --jobconf mapred.reduce.tasks=1
The pairs operation took 1 minute 30 seconds. The stripes operation took 24 seconds, which is about a quarter of the time for pairs.
Dataset description For this assignment you will explore a set of 100,000 Wikipedia documents:
https://www.dropbox.com/s/n5lfbnztclo93ej/wikitext_100k.txt?dl=0 s3://cs9223/wikitext_100k.txt, or https://s3.amazonaws.com/cs9223/wikitext_100k.txt Each line in this file consists of the plain text extracted from a Wikipedia document.
Task Compute the relative frequencies of each word that occurs in the documents in wikitext_100k.txt and output the top 100 word pairs sorted by decreasing order of relative frequency.
Recall that the relative frequency (RF) of word B given word A is defined as follows:
f(B|A) = Count(A, B) / Count (A) = Count(A, B) / sum_B'(Count (A, B')
where count(A,B) is the number of times A and B co-occur within a window of two words (co-occurrence window size of two) in a document and count(A) the number of times A occurs with anything else. Intuitively, given a document collection, the relative frequency captures the proportion of time the word B appears in the same document as A. (See Section 3.3, in Data-Intensive Text Processing with MapReduce).
In the async lecture you learned different approaches to do this, and in this assignment, you will implement them:
a. Write a mapreduce program which uses the Stripes approach and writes its output in a file named rfstripes.txt
b. Write a mapreduce program which uses the Pairs approach and writes its output in a file named rfpairs.txt
c. Compare the performance of the two approaches and output the relative performance to a file named rfcomp.txt. Compute the relative performance as follows: (running time for Pairs/ running time for Stripes). Also include an analysis comparing the communication costs for the two approaches. Instrument your mapper and reduces for counters where necessary to aid with your analysis.
NOTE: please limit your analysis to the top 100 word pairs sorted by decreasing order of relative frequency for each word (tokens with all alphabetical letters).
Please include markdown cell named rf.txt that describes the following:
the input/output format in each Hadoop task, i.e., the keys for the mappers and reducers the Hadoop cluster settings you used, i.e., number of mappers and reducers the running time for each approach: pairs and stripes
You can write your program using Python or MrJob (with Hadoop streaming) and you should run it on AWS. It is a good idea to develop and test your program on a local machine before deploying on AWS. Remember your notebook, needs to have all the commands you used to run each Mapreduce job (i.e., pairs and stripes) -- include the Hadoop streaming commands you used to run your jobs.
In addition the All the following files should be compressed in one ZIP file and submitted. The ZIP file should contain:
A. The result files: rfstripes.txt, rfpairs.txt, rfcomp.txt
Prior to working with Hadoop, the corpus should first be preprocessed as follows: perform tokenization (whitespace and all non-alphabetic characters) and stopword removal using standard tools from the Lucene search engine. All tokens should then be replaced with unique integers for a more efficient encoding.
== Preliminary information for the remaing HW problems===
Much of this homework beyond this point will focus on the Apriori algorithm for frequent itemset mining and the additional step for extracting association rules from these frequent itemsets. Please acquaint yourself with the background information (below) before approaching the remaining assignments.
=== Apriori background information ===
Some background material for the Apriori algorithm is located at:
Association Rules are frequently used for Market Basket Analysis (MBA) by retailers to understand the purchase behavior of their customers. This information can be then used for many different purposes such as cross-selling and up-selling of products, sales promotions, loyalty programs, store design, discount plans and many others. Evaluation of item sets: Once you have found the frequent itemsets of a dataset, you need to choose a subset of them as your recommendations. Commonly used metrics for measuring significance and interest for selecting rules for recommendations are: confidence; lift; and conviction.
What is the Apriori algorithm? Describe an example use in your domain of expertise and what kind of . Define confidence and lift.
NOTE: For the remaining homework use the online browsing behavior dataset located at (same dataset as used above):
https://www.dropbox.com/s/zlfyiwa70poqg74/ProductPurchaseData.txt?dl=0
Each line in this dataset represents a browsing session of a customer. On each line, each string of 8 characters represents the id of an item browsed during that session. The items are separated by spaces.
Here are the first few lines of the ProductPurchaseData FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444
Product Recommendations: The action or practice of selling additional products or services to existing customers is called cross-selling. Giving product recommendation is one of the examples of cross-selling that are frequently used by online retailers. One simple method to give product recommendations is to recommend products that are frequently browsed together by the customers.
Suppose we want to recommend new products to the customer based on the products they have already browsed on the online website. Write a program using the A-priori algorithm to find products which are frequently browsed together. Fix the support to s = 100 (i.e. product sets need to occur together at least 100 times to be considered frequent) and find itemsets of size 2 and 3.
Then extract association rules from these frequent items.
A rule is of the form:
(item1, item5) ⇒ item2.
List the top 10 discovered rules in descreasing order of confidence in the following format
(item1, item5) ⇒ item2, supportCount ,support, confidence
Benchmark your results using the pyFIM implementation of the Apriori algorithm (Apriori - Association Rule Induction / Frequent Item Set Mining implemented by Christian Borgelt). You can download pyFIM from here:
http://www.borgelt.net/pyfim.html
Comment on the results from both implementations (your Hadoop MapReduce of apriori versus pyFIM) in terms of results and execution times.
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [320]:
set([1,2,3])
Out[320]:
In [319]:
[1,2,3][:-1]
Out[319]: