In [1]:
import numpy as np
from __future__ import division
%reload_ext autoreload
%autoreload 2
: Review the submission form first to scope it out (it will take a 5-10 minutes to input your answers and other information into this form):
: Please keep all your work and responses in ONE (1) notebook only (and submit via the submission form)
: Please submit your solutions and notebook via the following form:
: For the midterm you will need access to MrJob and Jupyter on your local machines or on AltaScale/AWS to complete some of the questions (like fill in the code to do X).
: As for question types:
: This is an open book exam meaning you can consult webpages and textbooks, class notes, slides etc. but you can not discuss with each other or any other person/group. If any collusion, then this will result in a zero grade and will be grounds for dismissal from the entire program. Please complete this exam by yourself within the time limit.
===Map-Reduce===
(I) If you only have 1 computer with 1 computing core, then map-reduce is unlikely to help
(II) If we run map-reduce using N single-core computers, then it is likely to get at least an N-Fold speedup compared to using 1 computer
(III) Because of network latency and other overhead associated with map-reduce, if we run map-reduce using N computers, then we will get less than N-Fold speedup compared to using 1 computer
(IV) When using map-reduce for learning a naive Bayes classifier for SPAM classification, we usually use a single machine that accumulates the partial class and word stats from each of the map machines, in order to compute the final model.
Please select one from the following that is most correct:
In [ ]:
===Order inversion===
Suppose you wish to write a MapReduce job that creates normalized product co-occurrence (i.e., pairs of products that have been purchased together) data form a large transaction file of shopping baskets. In addition, we want the relative frequency of coocurring products. Given this scenario, to ensure that all (potentially many) reducers receive appropriate normalization factors (denominators)for a product in the most effcient order in their input streams (so as to minimize memory overhead on the reducer side), the mapper should emit/yield records according to which pattern for the product occurence totals:
(a) emit (*,product) count
(b) There is no need to use order inversion here
(c) emit (product,*) count
(d) None of the above
In [ ]:
===Map-Reduce===
(a) An arbitrarily sized list of key/value pairs.
(b) One key and a list of some values associated with that key
(c) One key and a list of all values associated with that key.
(d) None of the above
In [ ]:
===Bayesian document classification===
(a) It allows you to use your training data as your validation data.
(b) It prevents zero-products in the posterior distribution.
(c) It accounts for words that were missed by regular expressions.
(d) None of the above
In [ ]:
Big data is defined as the voluminous amount of structured, unstructured or semi-structured data that has huge potential for mining but is so large that it cannot be processed nor stored using traditional (single computer) computing and storage systems. Big data is characterized by its high velocity, volume and variety that requires cost effective and innovative methods for information processing to draw meaningful business insights. More than the volume of the data – it is the nature of the data that defines whether it is considered as Big Data or not. What do the four V’s of Big Data denote? Here is a potential simple explanation for each of the four critical features of big data (some or all of which is correct):
Statements
Which combination of the above statements is correct. Select a single correct response from the following :
In [ ]:
Using combiners result in what?
Select most correct option (i.e., select one option only) from the following:
In [ ]:
In probability theory and information theory, the Kullback–Leibler divergence (also information divergence, information gain, relative entropy, KLIC, or KL divergence) is a non-symmetric measure of the difference between two probability distributions P and Q. Specifically, the Kullback–Leibler divergence of Q from P, denoted DKL(P\‖Q), is a measure of the information lost when Q is used to approximate P:
For discrete probability distributions P and Q, the Kullback–Leibler divergence of Q from P is defined to be
+ KLDistance(P, Q) = Sum_over_item_i (P(i) log (P(i) / Q(i))
In the extreme cases, the KL Divergence is 1 when P and Q are maximally different and is 0 when the two distributions are exactly the same (follow the same distribution).
For more information on K-L Divergence see:
+ [K-L Divergence](https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence)
For the next three question we will use an MRjob class for calculating pairwise similarity using K-L Divergence as the similarity measure:
Using the following cells then fill in the code for the first reducer to calculate the K-L divergence of objects (letter documents) in line1 and line2, i.e., KLD(Line1||line2).
Here we ignore characters which are not alphabetical. And all alphabetical characters are lower-cased in the first mapper.
In [2]:
%%writefile kltext.txt
1.Data Science is an interdisciplinary field about processes and systems to extract knowledge or insights from large volumes of data in various forms (data in various forms, data in various forms, data in various forms), either structured or unstructured,[1][2] which is a continuation of some of the data analysis fields such as statistics, data mining and predictive analytics, as well as Knowledge Discovery in Databases.
2.Machine learning is a subfield of computer science[1] that evolved from the study of pattern recognition and computational learning theory in artificial intelligence.[1] Machine learning explores the study and construction of algorithms that can learn from and make predictions on data.[2] Such algorithms operate by building a model from example inputs in order to make data-driven predictions or decisions,[3]:2 rather than following strictly static program instructions.
In [3]:
import numpy as np
np.log(3)
Out[3]:
In [4]:
!cat kltext.txt
In [23]:
%%writefile kldivergence.py
# coding: utf-8
from __future__ import division
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import numpy as np
class kldivergence(MRJob):
# process each string character by character
# the relative frequency of each character emitting Pr(character|str)
# for input record 1.abcbe
# emit "a" [1, 0.2]
# emit "b" [1, 0.4] etc...
def mapper1(self, _, line):
index = int(line.split('.',1)[0])
letter_list = re.sub(r"[^A-Za-z]+", '', line).lower()
count = {}
for l in letter_list:
if count.has_key(l):
count[l] += 1
else:
count[l] = 1
for key in count:
yield key, [index, count[key]*1.0/len(letter_list)]
# on a component i calculate (e.g., "b")
# Kullback–Leibler divergence of Q from P is defined as (P(i) log (P(i) / Q(i))
def reducer1(self, key, values):
p = 0
q = 0
for v in values:
if v[0] == 1: #String 1
p = v[1]
else: # String 2
q = v[1]
if p and q:
yield (None, p*np.log(p/q))
#Aggegate components
def reducer2(self, key, values):
kl_sum = 0
for value in values:
kl_sum = kl_sum + value
yield "KLDivergence", kl_sum
def steps(self):
mr_steps = [self.mr(mapper=self.mapper1,
reducer=self.reducer1),
self.mr(reducer=self.reducer2)]
# mr_steps = [MRStep(mapper=self.mapper1, reducer=self.reducer1)]
return mr_steps
if __name__ == '__main__':
kldivergence.run()
In [24]:
%reload_ext autoreload
%autoreload 2
from mrjob.job import MRJob
from kldivergence import kldivergence
#dont forget to save kltext.txt (see earlier cell)
mr_job = kldivergence(args=['kltext.txt'])
with mr_job.make_runner() as runner:
runner.run()
# stream_output: get access of the output
for line in runner.stream_output():
print mr_job.parse_output_line(line)
Questions:
In [ ]:
In [28]:
words = """
1.Data Science is an interdisciplinary field about processes and systems to extract knowledge or insights from large volumes of data in various forms (data in various forms, data in various forms, data in various forms), either structured or unstructured,[1][2] which is a continuation of some of the data analysis fields such as statistics, data mining and predictive analytics, as well as Knowledge Discovery in Databases.
2.Machine learning is a subfield of computer science[1] that evolved from the study of pattern recognition and computational learning theory in artificial intelligence.[1] Machine learning explores the study and construction of algorithms that can learn from and make predictions on data.[2] Such algorithms operate by building a model from example inputs in order to make data-driven predictions or decisions,[3]:2 rather than following strictly static program instructions."""
for char in ['p', 'k', 'f', 'q', 'j']:
if char not in words:
print char
In [ ]:
In [29]:
%%writefile kldivergence_smooth.py
from __future__ import division
from mrjob.job import MRJob
import re
import numpy as np
class kldivergence_smooth(MRJob):
# process each string character by character
# the relative frequency of each character emitting Pr(character|str)
# for input record 1.abcbe
# emit "a" [1, (1+1)/(5+24)]
# emit "b" [1, (2+1)/(5+24) etc...
def mapper1(self, _, line):
index = int(line.split('.',1)[0])
letter_list = re.sub(r"[^A-Za-z]+", '', line).lower()
count = {}
# (ni+1)/(n+24)
for l in letter_list:
if count.has_key(l):
count[l] += 1
else:
count[l] = 1
for letter in ['q', 'j']:
if letter not in letter_list:
count[letter] = 0
for key in count:
yield key, [index, (1+count[key]*1.0)/(24+len(letter_list))]
def reducer1(self, key, values):
p = 0
q = 0
for v in values:
if v[0] == 1:
p = v[1]
else:
q = v[1]
yield (None, p*np.log(p/q))
# Aggregate components
def reducer2(self, key, values):
kl_sum = 0
for value in values:
kl_sum = kl_sum + value
yield "KLDivergence", kl_sum
def steps(self):
return [self.mr(mapper=self.mapper1,
reducer=self.reducer1),
self.mr(reducer=self.reducer2)
]
if __name__ == '__main__':
kldivergence_smooth.run()
In [31]:
%reload_ext autoreload
%autoreload 2
from kldivergence_smooth import kldivergence_smooth
mr_job = kldivergence_smooth(args=['kltext.txt'])
with mr_job.make_runner() as runner:
runner.run()
# stream_output: get access of the output
for line in runner.stream_output():
print mr_job.parse_output_line(line)
For zero entries, we have to smooth distributions. Suppose we smooth in this way:
(ni+1)/(n+24)
where ni is the count for letter i and n is the total count of all letters.
After smoothing, which number below is the closest to the result you get for KLD(Line1||line2)??
(a) 0.08
(b) 0.71
(c) 0.02
(d) 0.11
In [ ]:
Given ten (10) files in the input directory for a Hadoop Streaming job (MRjob or just Hadoop) with the following filesizes (in megabytes): 1, 2,3,4,5,6,7,8,9,10; and a block size of 5M (NOTE: normally we should set the blocksize to 1 GigB using modern computers). How many map tasks will result from processing the data in the input directory? Select the closest number from the following list.
(a) 1 map task
(b) 14
(c) 12
(d) None of the above
In [ ]:
Given a purchase transaction log file where each purchase transaction contains the customer identifier, item purchased and much more information about the transaction. Which of the following statements are true about a MapReduce job that performs an “aggregation” such as get the number of transaction per customer.
Statements
Select the most correct option from the following:
In [ ]:
Which of the following statements are true regarding Naive Bayes?
Statements
Please select the single most correct combination from the following:
In [ ]:
Given the following document dataset for a Two-Class problem: ham and spam. Use MRJob (please include your code) to build a muiltnomial Naive Bayes classifier. Please use Laplace Smoothing with a hyperparameter of 1. Please use words only (a-z) as features. Please lowercase all words.
In [37]:
%%writefile spam.txt
0002.2001-05-25.SA_and_HP 0 0 good
0002.2001-05-25.SA_and_HP 0 0 very good
0002.2001-05-25.SA_and_HP 1 0 bad
0002.2001-05-25.SA_and_HP 1 0 very bad
0002.2001-05-25.SA_and_HP 1 0 very bad, very BAD
In [39]:
%%writefile spam_test.txt
0002.2001-05-25.SA_and_HP 1 0 good? bad! very Bad!
In [40]:
%%writefile NaiveBayes.py
import sys
import re
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import TextProtocol, TextValueProtocol
# Prevents broken pipe errors from using ... | head
from signal import signal, SIGPIPE, SIG_DFL
signal(SIGPIPE,SIG_DFL)
def sum_hs(counts):
h_total, s_total = 0, 0
for h, s in counts:
h_total += h
s_total += s
return (h_total, s_total)
class NaiveBayes(MRJob):
MRJob.OUTPUT_PROTOCOL = TextValueProtocol
def mapper(self, _, lines):
_, spam, subject, email = lines.split("\t")
words = re.findall(r'[a-z]+', (email.lower()+" "+subject.lower()))
if spam == "1":
h, s = 0, 1
else:
h, s = 1, 0
yield "***Total Emails", (h, s)
for word in words:
yield word, (h, s)
yield "***Total Words", (h, s)
def combiner(self, key, count):
yield key, sum_hs(count)
def reducer_init(self):
self.total_ham = 0
self.total_spam = 0
def reducer(self, key, count):
ham, spam = sum_hs(count)
if key.startswith("***"):
if "Words" in key:
self.total_ham, self.total_spam = ham, spam
elif "Emails" in key:
total = ham + spam
yield "_", "***Priors\t%.10f\t%.10f" % (ham/total, spam/total)
else:
pg_ham, pg_spam = ham/self.total_ham, spam/self.total_spam
yield "_", "%s\t%.10f\t%.10f" % (key, pg_ham, pg_spam)
if __name__ == "__main__":
NaiveBayes.run()
In [41]:
!cat spam.txt | python NaiveBayes.py --jobconf mapred.reduce.tasks=1 -q | head
QUESTION
Having learnt the Naive Bayes text classification model for this problem using the training data and classified the test data (d6) please indicate which of the following is true:
Statements
Please select the single most correct combination of these statements from the following:
In [ ]:
(a) Yes, but only in Hadoop 0.22+.
(b) Yes, in Hadoop there is a default expectation that each record is delimited by an end of line charcacter and that key is the first token delimited by a tab character and that the value-part is everything after the tab character.
(c) No, when MRJob INPUT_PROTOCOL = RawValueProtocol. In this case input is processed in format agnostic way thereby avoiding any type of parsing errors. The value is treated as a str, the key is read in as None.
(d) Both b and c are correct answers.
In [ ]:
(a) Hadoop API will convert the data to the type that is needed by the reducer.
(b) Data input/output inconsistency cannot occur. A preliminary validation check is executed prior to the full execution of the job to ensure there is consistency.
(c) The java compiler will report an error during compilation but the job will complete with exceptions.
(d) A real-time exception will be thrown and map-reduce job will fail.
In [ ]:
(a) Developers should design Map-Reduce jobs without reducers only if no reduce slots are available on the cluster.
(b) Developers should never design Map-Reduce jobs without reducers. An error will occur upon compile.
(c) There is a CPU intensive step that occurs between the map and reduce steps. Disabling the reduce step speeds up data processing.
(d) It is not possible to create a map-reduce job without at least one reduce step. A developer may decide to limit to one reducer for debugging purposes.
In [ ]:
===Gradient descent===
Select a single correct response from the following:
In [ ]:
===Weighted K-means===
Write a MapReduce job in MRJob to do the training at scale of a weighted K-means algorithm.
You can write your own code or you can use most of the code from the following notebook:
Weight each example as follows using the inverse vector length (Euclidean norm):
weight(X)= 1/||X||,
where ||X|| = SQRT(X.X)= SQRT(X1^2 + X2^2)
Here X is vector made up of two component X1 and X2.
Using the following data to answer the following TWO questions:
In [87]:
def inverse_vector_length(x1, x2):
norm = (x1**2 + x2**2)**.5
return 1.0/norm
inverse_vector_length(1, 5)
Out[87]:
In [ ]:
0 --> .2
In [68]:
%matplotlib inline
import numpy as np
import pylab
import pandas as pd
In [56]:
data = pd.read_csv("Kmeandata.csv", header=None)
In [63]:
pylab.plot(data[0], data[1], 'o', linewidth=0, alpha=.5);
In [65]:
%%writefile Kmeans.py
from numpy import argmin, array, random
from mrjob.job import MRJob
from mrjob.step import MRStep
from itertools import chain
import os
#Calculate find the nearest centroid for data point
def MinDist(datapoint, centroid_points):
datapoint = array(datapoint)
centroid_points = array(centroid_points)
diff = datapoint - centroid_points
diffsq = diff*diff
# Get the nearest centroid for each instance
minidx = argmin(list(diffsq.sum(axis = 1)))
return minidx
#Check whether centroids converge
def stop_criterion(centroid_points_old, centroid_points_new,T):
oldvalue = list(chain(*centroid_points_old))
newvalue = list(chain(*centroid_points_new))
Diff = [abs(x-y) for x, y in zip(oldvalue, newvalue)]
Flag = True
for i in Diff:
if(i>T):
Flag = False
break
return Flag
class MRKmeans(MRJob):
centroid_points=[]
k=3
def steps(self):
return [
MRStep(mapper_init = self.mapper_init, mapper=self.mapper,combiner = self.combiner,reducer=self.reducer)
]
#load centroids info from file
def mapper_init(self):
# print "Current path:", os.path.dirname(os.path.realpath(__file__))
self.centroid_points = [map(float,s.split('\n')[0].split(',')) for s in open("Centroids.txt").readlines()]
#open('Centroids.txt', 'w').close()
# print "Centroids: ", self.centroid_points
#load data and output the nearest centroid index and data point
def mapper(self, _, line):
D = (map(float,line.split(',')))
yield int(MinDist(D, self.centroid_points)), (D[0],D[1],1)
#Combine sum of data points locally
def combiner(self, idx, inputdata):
sumx = sumy = num = 0
for x,y,n in inputdata:
num = num + n
sumx = sumx + x
sumy = sumy + y
yield idx,(sumx,sumy,num)
#Aggregate sum for each cluster and then calculate the new centroids
def reducer(self, idx, inputdata):
centroids = []
num = [0]*self.k
for i in range(self.k):
centroids.append([0,0])
for x, y, n in inputdata:
num[idx] = num[idx] + n
centroids[idx][0] = centroids[idx][0] + x
centroids[idx][1] = centroids[idx][1] + y
centroids[idx][0] = centroids[idx][0]/num[idx]
centroids[idx][1] = centroids[idx][1]/num[idx]
yield idx,(centroids[idx][0],centroids[idx][1])
if __name__ == '__main__':
MRKmeans.run()
In [66]:
%reload_ext autoreload
%autoreload 2
from numpy import random
from Kmeans import MRKmeans, stop_criterion
mr_job = MRKmeans(args=['Kmeandata.csv', '--file=Centroids.txt'])
#Geneate initial centroids
centroid_points = []
k = 3
for i in range(k):
centroid_points.append([random.uniform(-3,3),random.uniform(-3,3)])
with open('Centroids.txt', 'w+') as f:
f.writelines(','.join(str(j) for j in i) + '\n' for i in centroid_points)
# Update centroids iteratively
i = 0
while(1):
# save previous centoids to check convergency
centroid_points_old = centroid_points[:]
print "iteration"+str(i)+":"
with mr_job.make_runner() as runner:
runner.run()
# stream_output: get access of the output
for line in runner.stream_output():
key,value = mr_job.parse_output_line(line)
print key, value
centroid_points[key] = value
# Update the centroids for the next iteration
with open('Centroids.txt', 'w') as f:
f.writelines(','.join(str(j) for j in i) + '\n' for i in centroid_points)
print "\n"
i = i + 1
if(stop_criterion(centroid_points_old,centroid_points,0.01)):
break
print "Centroids\n"
print centroid_points
In [82]:
pylab.plot(data[0], data[1], 'o', linewidth=0, alpha=.5);
for point in centroid_points:
pylab.plot(point[0], point[1], '*',color='pink',markersize=20)
for point in [(-4.5,0.0), (4.5,0.0), (0.0,4.5)]:
pylab.plot(point[0], point[1], '*',color='red',markersize=20)
pylab.show()
In [ ]:
The average weighted distance is defined as sum over i (weighted_distance_i) / sum over i (weight_i)
In [ ]: