In [15]:
%cd ~/Documents/W261/hw9/
HW9.0:
What is PageRank and what is it used for in the context of web search?
What modifications have to be made to the webgraph in order to leverage the machinery of Markov Chains to compute the steady stade distibuton?
OPTIONAL: In topic-specific pagerank, how can we insure that the irreducible property is satified? (HINT: see HW9.4)
PageRank is an algorithm for scoring web pages based primarily on the number of links to each page. The algorithm is used by Google and others to deliver search results in response to a user query, attempting to ensure that the most relevant results are provided first. To represent the transition matrix of the graph as a Markov chain, the graph must be irreducible, meaning that here are no dangling nodes with no outlinks, and aperiodic, meaning that there are no closed cycles in the graph. Irreducibility is ensured by adding a random transition probability to each node, and aperiodicity is ensured by introducing a random transition probability of jumping to the same node. This probability $\alpha$ is controlled by the damping factor $$\alpha = 1 - d $$.
HW7.5.1:
Can we utilize combiners in the HW 7 to perform the shortest path implementation?
Does order inversion help with the HW 7 shortest path implementation?
Combiners can help by preselecting shortest paths within each mapper's output, reducing the network traffic and reducing the number of path comparisons each reducer must perform. The mappers provide two types of output -- new paths with bogus link lists for frontier nodes, and the graph stripes themselves. Order inversion could ensure that the new paths are received first; since the new paths are generally smaller, less data would need to be cached to update the graph stripes properly.
HW 9.1: MRJob implementation of basic PageRank
Write a basic MRJob implementation of the iterative PageRank algorithm
that takes sparse adjacency lists as input (as explored in HW 7).
Make sure that your implementation utilizes teleportation (1-damping/the number of nodes in the network), and further, distributes the mass of dangling nodes with each iteration so that the output of each iteration is correctly normalized (sums to 1).
[NOTE: The PageRank algorithm assumes that a random surfer (walker), starting from a random web page, chooses the next page to which it will move by clicking at random, with probability d, one of the hyperlinks in the current page. This probability is represented by a so-called ‘damping factor’ d, where d ∈ (0, 1). Otherwise, with probability (1 − d), the surfer jumps to any web page in the network. If a page is a dangling end, meaning it has no outgoing hyperlinks, the random surfer selects an arbitrary web page from a uniform distribution and “teleports” to that page]
As you build your code, use the test data
s3://ucb-mids-mls-networks/PageRank-test.txt
with teleportation parameter set to 0.15 (1-d, where d, the damping factor is set to 0.85), and crosscheck your work with the true result, displayed in the first image in the Wikipedia article:
https://en.wikipedia.org/wiki/PageRank
and here for reference are the corresponding PageRank probabilities:
A,0.033
B,0.384
C,0.343
D,0.039
E,0.081
F,0.039
G,0.016
H,0.016
I,0.016
J,0.016
K,0.016
In [ ]:
%%writefile MRJob_NumberOfNodes.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from sys import maxint
class MRJobNumNodes(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper,
reducer = self.reducer
),
MRStep(
reducer = self.reducer_count
)]
def mapper(self, _, line):
line = line.strip().split('\t')
key = line[0]
value = literal_eval(line[1])
destnodes = value.keys()
yield key, None
for node in destnodes:
yield node, None
def reducer(self,node,_):
yield None, node
def reducer_count(self,_,nodes):
count = 0
for node in nodes:
count+=1
yield None, count
if __name__ == '__main__':
MRJobNumNodes.run()
In [ ]:
%%writefile MRJob_PreprocessGraph.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from sys import maxint
class MRJobPreprocessGraph(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper_init = self.mapper_init,
mapper = self.mapper)]
return [MRStep(
mapper_init = self.mapper_init,
mapper = self.mapper,
reducer = self.reducer
)]
def mapper_init(self):
N = 11
self.PR = 1.0/N
def mapper(self, _, line):
line = line.strip().split('\t')
key = line[0]
edges = literal_eval(line[1])
value = [self.PR, edges]
yield key, value
for node in edges.keys():
yield node, [self.PR]
def reducer(self,key,value):
PR = 0
edges = {}
for v in value:
if len(v)==1:
PR = v[0]
else:
PR = v[0]
edges = v[1]
yield key, [PR, edges]
'''
def mapper(self, _, line):
N = 11
line = line.strip().split('\t')
key = line[0]
edges = literal_eval(line[1])
PR = 1.0/N
value = [PR, edges]
yield None,{key:value}
def reducer(self,_,data):
for line in data:
key = line.keys()[0]
value = line[key]
yield key, value
'''
if __name__ == '__main__':
MRJobPreprocessGraph.run()
In [101]:
!python MRJob_PreprocessGraph.py PageRank-test.txt > PageRank-test_input2.txt
In [102]:
!cat PageRank-test_input2.txt
In [43]:
%%writefile MRJob_PageRank.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
import sys
class MRJobPageRank(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper_firstiter)]+\
[MRStep(
mapper = self.mapper,
combiner = self.combiner,
reducer = self.reducer)]*10
def mapper_firstiter(self, _, line):
line = line.strip().split('\t')
key = line[0]
value = literal_eval(line[1])
PR = value[0]
edges = value[1]
yield key, [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield node, [1.0*PR/outdeg]
def mapper(self, key, value):
sys.stderr.write("MAPPER INPUT: ({0}, {1})\n".format(key,value))
#sys.stderr.write("MAPPER")
PR = value[0]
edges = value[1]
yield key, [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield node, [1.0*PR/outdeg]
def combiner(self,node,data):
PR = 0
edges = {}
for value in data:
if value[0]==None:
yield node, value
else:
PR+=value[0]
yield node, [PR]
def reducer(self,node,data):
#sys.stderr.write("REDUCER NODE: {0}\n".format(node))
G = int(jobconf_from_env('G'))
alpha = float(jobconf_from_env('alpha'))
PR = 0
edges = {}
for value in data:
#yield node, value
if value[0]==None:
edges = value[1]
else:
PR+=value[0]
if node=='*m':
self.m = PR
#sys.stderr.write("REDUCER PR: {0}\n".format(PR))
else:
PR_adj = alpha*(1.0/G)+(1-alpha)*(1.0*self.m/G+PR)
#yield node, PR
#yield node, PR_adj
#line = node+'\t'+str([PR_adj, edges])
#yield line, ''
yield node, [PR_adj, edges]
if __name__ == '__main__':
MRJobPageRank.run()
In [19]:
%load_ext autoreload
%autoreload 2
from MRJob_NumberOfNodes import MRJobNumNodes
from MRJob_PreprocessGraph import MRJobPreprocessGraph
from MRJob_PageRank import MRJobPageRank
def numNodes(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_numnodes = MRJobNumNodes(args=[graphfilename])
with mr_job_numnodes.make_runner() as runner:
runner.run()
for line in runner.stream_output():
null,count = mr_job_numnodes.parse_output_line(line)
print "There are "+str(count)+" nodes in the graph."
return None
def preprocessGraph(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_preprocess = MRJobPreprocessGraph(args=[graphfilename])
outputfilename = graphfilename.split('.')[0]+'_input.txt'
with mr_job_preprocess.make_runner() as runner:
with open(outputfilename, 'w') as f:
runner.run()
for line in runner.stream_output():
node,value = mr_job_preprocess.parse_output_line(line)
f.write(node+'\t'+str(value)+'\n')
return None
def pagerank(graphinputfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_pr1 = MRJobPageRank(args=[graphinputfilename,'--jobconf','alpha=0.15','--jobconf','G=11'])
danglingmass= 0
with mr_job_pr1.make_runner() as runner:
runner.run()
with open(graphinputfilename, 'w+') as f:
for line in runner.stream_output():
node,value = mr_job_pr1.parse_output_line(line)
f.write(node+'\t'+str(value)+'\n')
print line
return None
def hw9_1():
graphfilename = "PageRank-test.txt"
#numNodes(graphfilename)
preprocessGraph(graphfilename)
graphinputfilename = graphfilename.split('.')[0]+'_input.txt'
#pagerank_step1(graphinputfilename, alpha, G)
#pagerank_step1(graphinputfilename, alpha, G)
for i in range(1):
print "Iteration",i
pagerank(graphinputfilename)
return None
hw9_1()
HW 9.2: Exploring PageRank teleportation and network plots
In order to overcome problems such as disconnected components, the damping factor (a typical value for d is 0.85) can be varied.
Using the graph in HW1, plot the test graph (using networkx, https://networkx.github.io/) for several values of the damping parameter alpha, so that each nodes radius is proportional to its PageRank score. In particular you should do this for the following damping factors: [0,0.25,0.5,0.75, 0.85, 1].
Note your plots should look like the following:
https://en.wikipedia.org/wiki/PageRank#/media/File:PageRanks-Example.svg
PageRank is an algorithm for scoring web pages based primarily on the number of links to each page. The algorithm is used by Google and others to deliver search results in response to a user query, attempting to ensure that the most relevant results are provided first. To represent the transition matrix of the graph as a Markov chain, the graph must be irreducible, meaning that here are no dangling nodes with no outlinks, and aperiodic, meaning that there are no closed cycles in the graph. Irreducibility is ensured by adding a random transition probability to each node, and aperiodicity is ensured by introducing a random transition probability of jumping to the same node. This probability $\alpha$ is controlled by the damping factor $$\alpha = 1 - d $$.
Draw the graph using networkx with the node size proportional to the rank. Use a circular layout so no nodes are blocked.
In [97]:
import networkx as nx
from ast import literal_eval
G = nx.DiGraph()
with open('PageRank-test.txt', 'r') as f:
for line in f:
node, edgestr = line.strip().split('\t')
G.add_node(node)
edges = literal_eval(edgestr)
for key in edges.keys():
G.add_edge(node, key)
%matplotlib inline
pr = nx.pagerank(G, alpha=0.0)
radii = list()
for key in pr.keys():
radii.append(pr[key]*10000)
nx.draw_circular(G, with_labels=True, withArrows=True, node_size=radii)
With no damping, all pages are equally likely
In [98]:
pr = nx.pagerank(G, alpha=0.25)
radii = list()
for key in pr.keys():
radii.append(pr[key]*10000)
nx.draw_circular(G, with_labels=True, withArrows=True, node_size=radii)
In [99]:
pr = nx.pagerank(G, alpha=0.5)
radii = list()
for key in pr.keys():
radii.append(pr[key]*10000)
nx.draw_circular(G, with_labels=True, withArrows=True, node_size=radii)
With more damping, B and C become more important
In [100]:
pr = nx.pagerank(G, alpha=0.75)
radii = list()
for key in pr.keys():
radii.append(pr[key]*10000)
nx.draw_circular(G, with_labels=True, withArrows=True, node_size=radii)
In [120]:
pr = nx.pagerank(G, alpha=0.99, max_iter=5000)
radii = list()
for key in pr.keys():
radii.append(pr[key]*10000)
nx.draw_circular(G, with_labels=True, withArrows=True, node_size=radii)
For alpha = 1.0, pagerank fails to converge in 1000000 iterations
HW 9.3: Applying PageRank to the Wikipedia hyperlinks network
Run your PageRank implementation on the Wikipedia dataset for 10 iterations, and display the top 100 ranked nodes (with alpha = 0.85).
Run your PageRank implementation on the Wikipedia dataset for 50 iterations, and display the top 100 ranked nodes (with teleportation factor of 0.15). Have the top 100 ranked pages changed? Comment on your findings. Plot both 100 curves.
In [ ]:
%%writefile mrPageRank2i.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.protocol import RawProtocol
from mrjob.step import MRStep
import re
from itertools import combinations
from ast import literal_eval
import sys
class mrPageRank2i(MRJob):
#INPUT_PROTOCOL = JSONProtocol
# INTERNAL_PROTOCOL and OUTPUT_PROTOCOL default to JSONProtocol
#OUTPUT_PROTOCOL = RawProtocol
def configure_options(self):
super(mrPageRank2i, self).configure_options()
self.add_passthrough_option(
'--iterations',
dest = 'iterations',
default = 10,
type='int',
help='number of iterations to run')
self.add_passthrough_option(
'--damping-factor',
dest = 'damping_factor',
default = 0.85,
type='float',
help='probability that user will continue clicking links')
self.add_passthrough_option(
'--nodecount',
dest = 'nodecount',
default = 11.0,
type='float',
help='total number of nodes in graph')
def steps(self):
return ([MRStep(
mapper = self.send_score,
reducer = self.receive_score
)] * self.options.iterations)
# Raw input protocol so key is not provided
def send_score(self, _, line):
#sys.stderr.write("MAPPER INPUT: ({0})\n".format(line))
N = self.options.nodecount
if type(line) != dict:
fields = line.strip().split('\t')
nodeID = fields[0]
nodeDict = literal_eval(fields[1])
if 'score' not in nodeDict.keys():
#first time through
node = {}
node['score'] = 1.0/N
node['links'] = nodeDict
node['id'] = nodeID
else:
node = nodeDict
else:
node = line
nodeID = node['id']
yield nodeID, node
if len(node['links']) > 0:
rank = node['score'] / float(len(node['links']))
for link in node['links'].keys():
yield link, rank
else:
# dangling mass
for link in range(1, int(N)):
yield str(link), node['score']/N
#assert rank <= 1.0
def receive_score(self, node_id, values):
myNodeID = node_id
myValues = list(values)
#sys.stderr.write("REDUCER INPUT: ({0},{1})\n".format(myNodeID, myValues))
node = {}
node['score'] = 0.0
node['links'] = {}
node['id'] = myNodeID
total_score = 0.0
count = 0
for p in myValues:
#sys.stderr.write("REDUCER VALUE: {0}\n".format(p))
try:
total_score += float(p)
except TypeError:
node = p
node['prev_score'] = node['score']
d = self.options.damping_factor
N = self.options.nodecount
node['score'] = (1.0 - d)/N + d*total_score
#sys.stderr.write("REDUCER OUTPUT: {0} -> {1}\n".format(node['id'], node['score']))
#assert node['score'] <= 1.0
yield node['id'], node
if __name__ == '__main__':
mrPageRank2i.run()
In [ ]:
!chmod a+x mrPageRank2i.py
In [ ]:
!./mrPageRank2i.py s3://ucb-mids-mls-networks/PageRank-test_indexed.txt --iterations=10 -r emr \
--output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputTest --no-output
In [ ]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/outputTest HW9_3Test/ --recursive
In [ ]:
!cat HW9_3Test/part* > test-graph.txt
In [ ]:
prdict={}
with open('test-graph.txt', 'r') as f:
for line in f:
fields = line.strip().split('\t')
prdict[fields[0]] = literal_eval(fields[1])['score']
for key in sorted(prdict, key=prdict.get, reverse=True):
print "{0}\t{1}".format(key, prdict[key])
In [ ]:
!./mrPageRank2i.py s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt --iterations=10 --nodecount=15192277 -r emr \
--output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW10i --no-output
In [ ]:
!./mrPageRank2i.py s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt --iterations=50 --nodecount=15192277 -r emr \
--output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW50i --no-output
In [ ]:
%%writefile MRtop100.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from mrjob.step import MRStep
import re
from itertools import combinations
from ast import literal_eval
import sys
import string
class MRtop100(MRJob):
OUTPUT_PROTOCOL = JSONProtocol
outputList = list()
def jobconf(self):
orig_jobconf = super(MRtop100, self).jobconf()
custom_jobconf = {
'mapred.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
'mapred.text.key.comparator.options': '-k1rn',
'mapred.reduce.tasks': '1',
}
combined_jobconf = orig_jobconf
combined_jobconf.update(custom_jobconf)
self.jobconf = combined_jobconf
return combined_jobconf
def steps(self):
return [MRStep(
mapper = self.mapper,
mapper_final = self.mapper_final,
reducer = self.reducer
)]
def mapper(self, key, line):
# get the line, which consists of an id and a dict separated by a tab
#sys.stderr.write("MAPPER INPUT: ({0},{1})\n".format(key,line))
fields = line.strip().split('\t')
nodeid = fields[0]
nodeDict = literal_eval(fields[1])
self.outputList.append((nodeid, nodeDict['score']))
self.outputList.sort(key=lambda tup: tup[1], reverse=True)
self.outputList = self.outputList[:100]
def mapper_final(self):
for k in range(100):
yield self.outputList[k][1], self.outputList[k][0]
def reducer(self, score, nodeids):
nodeList = list(nodeids)
myScore = score
#sys.stderr.write("REDUCER INPUT: ({0},{1})\n".format(myScore, nodeList))
count = 0
myOutputList = list()
for node in nodeList:
myOutputList.append((node, myScore))
count += 1
myOutputList.sort(key=lambda tup: tup[1], reverse=True)
myOutputList = myOutputList[:100]
for tup in myOutputList:
nodeid = tup[0].translate(string.maketrans("",""), string.punctuation)
yield nodeid, tup[1]
if __name__ == '__main__':
MRtop100.run()
In [ ]:
!./MRtop100.py s3://ucb-mids-mls-dougkelley/HW9_3/outputW50i/ -r emr --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW50i_top100 --no-output
In [ ]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/outputW50i_top100 W50i_100/ --recursive
Using earlier implementation without correct treatment of dangling mass, the scores for the 10 iteration and 50 iteration runs are nearly the same.
In [ ]:
%matplotlib inline
from matplotlib import pyplot as plt
scores50=list()
with open('W50_100/part-00000', 'r') as f:
for line in f:
score=float(line.strip().split('\t')[1])
scores50.append(score)
scores10=list()
with open('W10_100/part-00000', 'r') as f:
for line in f:
score=float(line.strip().split('\t')[1])
scores10.append(score)
plt.semilogy(range(100),scores50,range(100),scores10)
plt.grid()
In [ ]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/outputW10i_top100 W10_100/ --recursive
In [ ]:
hitcount=list()
with open('W10_100/part-00000','r') as f10, open('W50_100/part-00000','r') as f50:
for line10 in f10:
nid10 = line10.strip().split('\t')[0]
nid50 = f50.readline().strip().split('\t')[0]
hitcount.append(nid10 == nid50)
print sum(hitcount)
plt.plot(hitcount)
In [ ]:
from sets import Set
set10=Set()
set50=Set()
with open('W10_100/part-00000','r') as f10:
for line in f10:
set10.add(line.strip().split('\t')[0])
with open('W50_100/part-00000','r') as f50:
for line in f50:
set50.add(line.strip().split('\t')[0])
print len(set10.intersection(set50))
for item in set10:
if item not in set50:
print "Unique item: {0}\n".format(item)
In [ ]:
list10=list()
list50=list()
updates=list()
with open('W10_100/part-00000','r') as f10:
for line in f10:
list10.append(line.strip().split('\t')[0])
with open('W50_100/part-00000','r') as f50:
for line in f50:
list50.append(line.strip().split('\t')[0])
for item in list10:
updates.append(list50.index(item))
In [ ]:
print updates
In [ ]:
fig,ax = plt.subplots()
fig.set_size_inches(3.0, 10.0)
plt.xticks([10.0, 50.0])
plt.xlim([5, 55])
plt.ylabel('PageRank')
plt.xlabel('Iterations')
for k in range(100):
ax.plot([10,50],[100-k,100-updates[k]], '-o', markersize=5)
In [ ]:
!./MRtop100.py s3://ucb-mids-mls-katieadams/output-wiki-10x/ -r emr --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW10d_top100 --no-output
In [ ]:
%%writefile MRtop100k.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from mrjob.step import MRStep
import re
from itertools import combinations
from ast import literal_eval
import sys
import string
from numpy import log
class MRtop100k(MRJob):
OUTPUT_PROTOCOL = JSONProtocol
outputList = list()
def jobconf(self):
orig_jobconf = super(MRtop100k, self).jobconf()
custom_jobconf = {
'mapred.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
'mapred.text.key.comparator.options': '-k1rn',
'mapred.reduce.tasks': '1',
}
combined_jobconf = orig_jobconf
combined_jobconf.update(custom_jobconf)
self.jobconf = combined_jobconf
return combined_jobconf
def steps(self):
return [MRStep(
mapper = self.mapper,
mapper_final = self.mapper_final,
reducer = self.reducer
)]
def mapper(self, key, line):
# get the line, which consists of an id and a dict separated by a tab
#sys.stderr.write("MAPPER INPUT: ({0},{1})\n".format(key,line))
fields = line.strip().split('\t')
nodeid = fields[0]
nodeList = literal_eval(fields[1])
self.outputList.append((nodeid, float(nodeList[0])))
self.outputList.sort(key=lambda tup: tup[1], reverse=True)
self.outputList = self.outputList[:100]
def mapper_final(self):
for k in range(100):
yield log(self.outputList[k][1]),(self.outputList[k][0], self.outputList[k][1])
def reducer(self, score, nodeTuples):
nodeList = list(nodeTuples)
myScore = score
#sys.stderr.write("REDUCER INPUT: ({0},{1})\n".format(myScore, nodeList))
myOutputList = list()
for nodeTuple in nodeList:
node = nodeTuple[0]
myScore = float(nodeTuple[1])
myOutputList.append((node, myScore))
myOutputList.sort(key=lambda tup: tup[1], reverse=True)
for tup in myOutputList[:100]:
nodeid = tup[0].translate(string.maketrans("",""), string.punctuation)
yield nodeid, tup[1]
if __name__ == '__main__':
MRtop100k.run()
In [ ]:
!chmod a+x ./MRtop100k.py
In [ ]:
!./MRtop100k.py part0-1000.txt
In [ ]:
!./MRtop100k.py s3://ucb-mids-mls-katieadams/output-wiki-10x/ -r emr --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW10d_top100 --no-output
In [ ]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/outputW10d_top100 W10k_100/ --recursive
In [ ]:
!wc -l W10k_100/part-00000
In [ ]:
!./MRtop100k.py s3://ucb-mids-mls-katieadams/output-wiki-50x/ -r emr --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW50d_top100 --no-output
In [ ]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/outputW50d_top100 W50k_100/ --recursive
In [ ]:
cat W50k_100/part-00000
In [ ]:
from sets import Set
set10=Set()
set50=Set()
with open('top100_10x.txt','r') as f10:
for line in f10:
set10.add(line.strip().split('\t')[0])
with open('top100_50x.txt','r') as f50:
for line in f50:
set50.add(line.strip().split('\t')[0])
print len(set10.intersection(set50))
for item in set10:
if item not in set50:
print "Unique item: {0}\n".format(item)
In [1]:
%matplotlib inline
from matplotlib import pyplot as plt
scores10x=list()
scores50x=list()
from numpy import arange
with open('top100_10x.txt', 'r') as f:
for line in f:
score=float(line.strip().split('\t')[1])
scores10x.append(score)
with open('top100_50x.txt', 'r') as f:
for line in f:
score=float(line.strip().split('\t')[1])
scores50x.append(score)
plt.semilogy(arange(100),scores10x,arange(100),scores50x)
plt.grid()
In [17]:
%%writefile matchIndex.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol, JSONValueProtocol
from mrjob.step import MRStep
import re
from itertools import combinations
from ast import literal_eval
import sys
class matchIndex(MRJob):
OUTPUT_PROTOCOL = RawValueProtocol
nodes = []
def steps(self):
return [MRStep(
mapper_init = self.mapper_init,
mapper = self.mapper,
reducer = self.reducer
)]
def mapper_init(self):
for line in open('top100.txt'):
self.nodes.append(line.strip().split('\t')[0])
#mapper emits node, dictionary for stripe
def mapper(self, key, line):
#sys.stderr.write("MAPPER INPUT: ({0},{1})\n".format(key,line))
fields = line.strip().split('\t')
node=str(literal_eval(fields[1]))
if node in self.nodes:
yield fields[1], (fields[0], fields[2], fields[3])
#reducer emits key and the rest of the data
def reducer(self, key, stripes):
stripeList = list(stripes)
myKey = key
#sys.stderr.write("REDUCER INPUT: ({0},{1})\n".format(myKey, stripeList))
for stripe in stripeList:
yield myKey, (myKey, stripe)
if __name__ == '__main__':
matchIndex.run()
In [3]:
!chmod a+x ./matchIndex.py
In [14]:
!./matchIndex.py ~/Downloads/indices.txt --file=top100.txt
In [22]:
!./matchIndex.py s3://ucb-mids-mls-networks/wikipedia/indices.txt -r emr --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/ --no-output --file=top100.txt
In [16]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/ 10xMatch/ --recursive
In [42]:
%reload_ext autoreload
%autoreload 2
In [43]:
from ast import literal_eval
nodes10x=[]
nodes50x=[]
scores10x=[]
scores50x=[]
with open('top100_10x.txt','r') as f1, open('top100_50x.txt','r') as f2:
for line in f1:
fields = line.strip().split('\t')
node = literal_eval(fields[0])
score = literal_eval(fields[1])
nodes10x.append(node)
scores10x.append(score)
for line in f2:
fields = line.strip().split('\t')
node = literal_eval(fields[0])
score = literal_eval(fields[1])
nodes50x.append(node)
scores50x.append(score)
links10x=[]
links50x=[]
with open('/Users/doug/Downloads/indices.txt','r') as f:
for line in f:
fields=line.strip().split('\t')
node = str(literal_eval(fields[1]))
if node in nodes10x:
links10x.append(fields)
if node in nodes50x:
links50x.append(fields)
In [44]:
links10x[1]
Out[44]:
In [45]:
links50x[1]
Out[45]:
In [26]:
links10x[0]
Out[26]:
In [27]:
nodes10x[0]
Out[27]:
In [47]:
olinks10x=[]
olinks50x=[]
for k, node in enumerate(nodes10x):
olinks10x.append((list(i for i in links10x if i[1] == node),scores10x[k]))
for k, node in enumerate(nodes50x):
olinks50x.append((list(i for i in links50x if i[1] == node), scores50x[k]))
In [48]:
olinks50x[:10]
Out[48]:
In [49]:
olinks10x[:10]
Out[49]:
In [64]:
len(olinks10x)
Out[64]:
In [112]:
pr10xNames=[]
pr50xNames=[]
pr10xScores=[]
pr50xScores=[]
pr10xinlinks=[]
pr50xinlinks=[]
pr10xoutlinks=[]
pr50xoutlinks=[]
for k in range(len(olinks10x)):
pr10xNames.append(olinks10x[k][0][0][0])
pr50xNames.append(olinks50x[k][0][0][0])
pr10xScores.append(olinks10x[k][1])
pr50xScores.append(olinks50x[k][1])
pr10xinlinks.append(float(olinks10x[k][0][0][2]))
pr50xinlinks.append(float(olinks50x[k][0][0][2]))
pr10xoutlinks.append(float(olinks10x[k][0][0][3]))
pr50xoutlinks.append(float(olinks50x[k][0][0][3]))
In [107]:
from numpy import arange
plt.semilogy(arange(100),pr10xScores,arange(100),pr50xScores)
plt.grid()
plt.xlabel('Rank')
plt.ylabel('Pagerank')
plt.legend(('10 iterations', '50 iterations'))
Out[107]:
With more iterations, more mass is concentrated in the more highly ranked nodes -- the rich get richer. Pagerank drops off slowly after the initial few pages. In terms of Google's results, only the highest ranked nodes show up on teh first page of search results.
In [108]:
plt.semilogy(arange(100),pr10xinlinks,arange(100),pr50xinlinks)
plt.grid()
plt.xlabel('Rank')
plt.ylabel('Inlinks')
plt.legend(('10 iterations', '50 iterations'))
Out[108]:
The more highly ranked nodes have a high level of inbound links, but after the first few, the number varies quite a bit but generally drops off.
In [113]:
plt.semilogy(arange(100),pr10xoutlinks,arange(100),pr50xoutlinks)
plt.grid()
plt.xlabel('Rank')
plt.ylabel('Outlinks')
plt.legend(('10 iterations', '50 iterations'))
Out[113]:
There appears to be no dependence on rank for the number of outlinks from a page. Several high ranking nodes have no outlinks.
In [100]:
def print_full(x):
pd.set_option('display.max_rows', len(x))
print(x)
pd.reset_option('display.max_rows')
In [105]:
pd.set_option('display.max_rows', 100)
In [117]:
df = pd.DataFrame({'Name 10x':pr10xNames, 'Score 10x':pr10xScores, 'Name 50x':pr50xNames, 'Score 50x':pr50xScores})
In [116]:
df
Out[116]:
In [138]:
updates=[]
for item in pr10xNames:
if item in pr50xNames:
updates.append(pr50xNames.index(item))
else:
updates.append(100)
fig,ax = plt.subplots()
fig.set_size_inches(3.0, 10.0)
plt.xticks([10.0, 50.0])
plt.xlim([5, 55])
plt.ylabel('PageRank')
plt.xlabel('Iterations')
plt.title('From 10x')
for k in range(100):
ax.plot([10,50],[100-k,100-updates[k]], '-o', markersize=5)
In [136]:
updates50=[]
for item in pr50xNames:
if item in pr10xNames:
updates50.append(pr10xNames.index(item))
else:
updates50.append(100)
fig,ax = plt.subplots()
fig.set_size_inches(3.0, 10.0)
plt.xticks([10.0, 50.0])
plt.xlim([5, 55])
plt.ylabel('PageRank')
plt.xlabel('Iterations')
plt.title('From 50x')
for k in range(100):
ax.plot([10,50],[100-updates50[k],100-k], '-o', markersize=5)
print "Number of new items in 50x top 100 list: {0}".format(updates50.count(100))
HW 9.4: Topic-specific PageRank implementation using MRJob
Modify your PageRank implementation to produce a topic specific PageRank implementation, as described in:
http://www-cs-students.stanford.edu/~taherh/papers/topic-sensitive-pagerank.pdf
Note in this article that there is a special caveat to ensure that the transition matrix is irreducible.
This caveat lies in footnote 3 on page 3:
A minor caveat: to ensure that M is irreducible when p
contains any 0 entries, nodes not reachable from nonzero
nodes in p should be removed. In practice this is not problematic.
and must be adhered to for convergence to be guaranteed.
Run topic specific PageRank on the following randomly generated network of 100 nodes:
s3://ucb-mids-mls-networks/randNet.txt
which are organized into ten topics, as described in the file:
s3://ucb-mids-mls-networks/randNet_topics.txt
Since there are 10 topics, your result should be 11 PageRank vectors (one for the vanilla PageRank implementation in 9.1, and one for each topic with the topic specific implementation). Print out the top ten ranking nodes and their topics for each of the 11 versions, and comment on your result.
Assume a teleportation factor of 0.15 in all your analyses.
In [28]:
%%writefile MRJob_PreprocessGraph_topic.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
class MRJobPreprocessGraphTopic(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper_init = self.mapper_init,
mapper = self.mapper)]
return [MRStep(
mapper_init = self.mapper_init,
mapper = self.mapper,
reducer = self.reducer
)]
def mapper_init(self):
self.topic_labels = {}
topicIdxFilename = jobconf_from_env('topicIdxFilename')
with open(topicIdxFilename, 'r') as f:
for line in f.readlines():
line = line.strip()
p,t = line.split('\t')
self.topic_labels[int(p)]=int(t)
G = int(jobconf_from_env('G'))
self.PR_init = 1.0/G
def mapper(self, _, line):
line = line.strip().split('\t')
key = int(line[0])
edges = literal_eval(line[1])
value = {}
value['PR']=[self.PR_init]*11
value['edges']=edges.keys()
value['topic']=self.topic_labels[key]
yield key, value
for node in edges.keys():
key=int(node)
value = {}
value['PR']=[self.PR_init]*11
value['edges']=None
value['topic']=self.topic_labels[key]
yield key, value
def reducer(self,key,value):
data = {}
data['topic']=None
data['PR']=None
data['edges']=[]
for v in value:
if data['topic']==None:
data['topic']=v['topic']
if v['edges']==None:
data['PR']=v['PR']
else:
data['PR']=v['PR']
data['edges']=v['edges']
yield int(key), data
if __name__ == '__main__':
MRJobPreprocessGraphTopic.run()
Preprocess RandNet locally to include initialized page ranks and topics
In [29]:
!python MRJob_PreprocessGraph_topic.py randNet.txt --file 'randNet_topics.txt' --jobconf 'G=100' --jobconf 'topicIdxFilename=randNet_topics.txt'> randNet_input.txt
Test preprocessing on emr
In [31]:
!python MRJob_PreprocessGraph_topic.py randNet.txt --file 'randNet_topics.txt' --jobconf 'G=100' --jobconf 'topicIdxFilename=randNet_topics.txt' -r emr --num-ec2-instances 2 --ec2-task-instance-type m1.medium --output-dir s3://ucb-mids-mls-katieadams/output-wiki-preprocess-topics --no-output
In [30]:
!head -1 randNet_input.txt
In [18]:
from collections import defaultdict
def numNodesByTopic(topicsIdxFile, topicsCountFile):
numNodesTopic = defaultdict(int)
with open('randNet_topics.txt', 'r') as f:
for line in f.readlines():
line = line.strip()
p,t = line.split('\t')
numNodesTopic[int(t)]+=1
numNodesTopic['*']+=1
with open('randNet_topicCount.txt','w') as f:
for k,v in numNodesTopic.iteritems():
line = str(k)+'\t'+str(v)+'\n'
f.write(line)
topicsIdxFile, topicsCountFile
numNodesByTopic(topicsIdxFile, topicsCountFile)
In [21]:
!cat randNet_topicCount.txt
In [62]:
%%writefile MRJob_PageRank_Topic.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
import sys
class MRJobPageRankTopic(MRJob):
def configure_options(self):
super(MRJobPageRankTopic, self).configure_options()
self.add_passthrough_option(
'--iterations',
dest = 'iterations',
default = 10,
type='int',
help='number of iterations to run')
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper_firstiter)]
return [MRStep(
mapper = self.mapper_firstiter,
reducer_init = self.reducer_init,
reducer = self.reducer
),
MRStep(
reducer = self.reducer_redistr)]+\
[MRStep(
mapper = self.mapper,
reducer_init = self.reducer_init,
reducer = self.reducer
),
MRStep(
reducer = self.reducer_redistr)]*self.options.iterations
def mapper_firstiter(self, _, line):
line = line.strip().split('\t')
key = line[0]
value = literal_eval(line[1])
graphvalue = {}
graphvalue['PR']=None
graphvalue['edges']=value['edges']
graphvalue['topic']=value['topic']
yield int(key), graphvalue
destnodes = value['edges']
outdeg = len(destnodes)
sendvalue = {}
sendvalue['PR']=[]
sendvalue['topic']=None
sendvalue['edges']=None
if outdeg==0:
sendvalue['PR']=value['PR']
yield '*m',sendvalue
else:
for i in range(len(value['PR'])):
PRi = value['PR'][i]
sendvalue['PR'].append(1.0*PRi/outdeg)
for node in destnodes:
yield int(node), sendvalue
def mapper(self, key, value):
graphvalue = {}
graphvalue['PR']=None
graphvalue['edges']=value['edges']
graphvalue['topic']=value['topic']
yield int(key), graphvalue
destnodes = value['edges']
outdeg = len(destnodes)
sendvalue = {}
sendvalue['PR']=[]
sendvalue['topic']=None
sendvalue['edges']=None
if outdeg==0:
sendvalue['PR']=value['PR']
yield '*m',sendvalue
else:
for i in range(len(value['PR'])):
PRi = value['PR'][i]
sendvalue['PR'].append(1.0*PRi/outdeg)
for node in destnodes:
yield int(node), sendvalue
def reducer_init(self):
self.alpha = float(jobconf_from_env('alpha'))
self.G = int(jobconf_from_env('G'))
self.n = int(jobconf_from_env('num_topics'))
self.beta = float(jobconf_from_env('beta'))
topicCountFilename = jobconf_from_env('topicCountFilename')
self.T = {}
with open(topicCountFilename, 'r') as f:
for line in f.readlines():
topic,count = line.strip().split('\t')
if topic=='*':
topic = 0
else:
topic = int(topic)
self.T[topic]=int(count)
def reducer(self,node,data):
sendvalue = {}
sendvalue['PR']=[0]*(self.n+1)
sendvalue['topic']=None
sendvalue['edges']=None
for value in data:
if value['edges']==None:
for i in range(len(value['PR'])):
sendvalue['PR'][i]+=value['PR'][i]
else:
sendvalue['topic']=int(value['topic'])
sendvalue['edges']=value['edges']
if node=='*m':
PR_adj = []
for i in range(self.n+1):
PR_adj.append([(1.0/self.G)*(1.0-self.alpha)*sendvalue['PR'][i]])
sendvalue['PR']=PR_adj
for i in range(self.G):
yield i+1, sendvalue
else:
PR_adj = []
PR_adj.append(self.alpha*(1.0/self.G)+(1.0-self.alpha)*sendvalue['PR'][i])
for i in range(1,self.n+1):
if sendvalue['topic']==i:
PR_adj.append(self.alpha*self.beta*(1.0/self.T[i])+(1.0-self.alpha)*sendvalue['PR'][i])
else:
PR_adj.append(self.alpha*(1-self.beta)*(1.0/(self.G-self.T[i]))+(1.0-self.alpha)*sendvalue['PR'][i])
sendvalue['PR']=PR_adj
yield int(node), sendvalue
def reducer_redistr(self, node, data):
n = int(jobconf_from_env('num_topics'))
finalvalue = {}
finalvalue['PR']=[0]*(n+1)
finalvalue['topic']=None
finalvalue['edges']=None
for value in data:
for i in range(n+1):
finalvalue['PR'][i]+=value['PR'][i]
if value['edges']!=None:
finalvalue['edges'] = value['edges']
finalvalue['topic']=int(value['topic'])
yield int(node), finalvalue
if __name__ == '__main__':
MRJobPageRankTopic.run()
In [65]:
!python MRJob_PageRank_Topic.py randNet_input.txt --file 'randNet_topicCount.txt' --jobconf 'G=100' --jobconf 'alpha=0.15'\
--jobconf 'beta=0.99' --jobconf 'topicCountFilename=randNet_topicCount.txt'\
--jobconf 'num_topics=10' --iterations 9 > randNet_Topic_output.txt
In [8]:
%%writefile MRtop10.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from mrjob.step import MRStep
import re
from itertools import combinations
from ast import literal_eval
import sys
import string
from operator import itemgetter
from collections import defaultdict
from mrjob.compat import jobconf_from_env
'''
Modified from Doug and Group E's code
'''
class MRtop10(MRJob):
OUTPUT_PROTOCOL = JSONProtocol
mapperListDict = defaultdict(list)
reducerListDict = defaultdict(list)
def jobconf(self):
orig_jobconf = super(MRtop10, self).jobconf()
custom_jobconf = {
'mapred.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
'mapred.text.key.comparator.options': '-k1rn',
'mapred.reduce.tasks': '1',
}
combined_jobconf = orig_jobconf
combined_jobconf.update(custom_jobconf)
self.jobconf = combined_jobconf
return combined_jobconf
def steps(self):
return [MRStep(
mapper = self.mapper,
reducer_init = self.reducer_init,
reducer = self.reducer,
reducer_final = self.reducer_final
)]
def getFloatKey(item):
return float(item[1])
def mapper(self, key, line):
fields = line.strip().split('\t')
nodeid = fields[0]
nodeDict = literal_eval(fields[1])
for i in range(11):
yield float(nodeDict['PR'][i]), (i, nodeid, nodeDict['topic'])
def reducer_init(self):
self.n = int(jobconf_from_env('num_topics'))
self.top_pages = defaultdict(list)
def reducer(self, score, nodetups):
for node in nodetups:
topic = node[0]
if len(self.top_pages[topic])<10:
self.top_pages[topic].append([score, node])
def reducer_final(self):
for page, scores in self.top_pages.iteritems():
for score in scores:
rank = score[0]
node = score[1][1]
topic = score[1][2]
yield "Vector "+str(page), (rank, node, topic)
if __name__ == '__main__':
MRtop10.run()
In [10]:
!python MRtop10.py randNet_Topic_output.txt --jobconf 'num_topics=10' -r emr
HW 9.5: Applying topic-specific PageRank to Wikipedia
Here you will apply your topic-specific PageRank implementation to Wikipedia, defining topics (very arbitrarily) for each page by the length (number of characters) of the name of the article mod 10, so that there are 10 topics. Once again, print out the top ten ranking nodes and their topics for each of the 11 versions, and comment on your result.
Assume a teleportation factor of 0.15 in all your analyses.
In [25]:
def assignWikiTopics():
ftopics = open('wiki_topics.txt','w')
i=0
with open('indices.txt','r') as fidx:
for line in fidx.readlines():
i+=1
if i%500000==0:
print "progress: ",i*1.0/15192277
line = line.strip().split('\t')
topic = len(line[0])%10
idx = int(line[1])
ftopics.write(str(idx)+'\t'+str(topic)+'\n')
ftopics.close()
assignWikiTopics()
In [34]:
!python MRJob_PreprocessGraph_topic.py s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt --file 'wiki_topics.txt' --jobconf G=15192277 --jobconf 'topicIdxFilename=wiki_topics.txt' -r emr --num-ec2-instances 4 --ec2-instance-type m1.large --output-dir s3://ucb-mids-mls-katieadams/output-wiki-preprocess-topics --no-output
Memory Error on emr => running locally -- ran out of disk space in map step
In [35]:
!python MRJob_PreprocessGraph_topic.py all-pages-indexed-out.txt --file 'wiki_topics.txt' --jobconf 'G=15192277' --jobconf 'topicIdxFilename=wiki_topics.txt'
In [ ]:
In [ ]: