Some Imports
In [ ]:
#%matplotlib inline
#import matplotlib.pyplot as plt
#import numpy as np
%cd ~/Documents/W261/hw9/
HW9.0:
What is PageRank and what is it used for in the context of web search?
What modifications have to be made to the webgraph in order to leverage the machinery of Markov Chains to compute the steady stade distibuton?
OPTIONAL: In topic-specific pagerank, how can we insure that the irreducible property is satified? (HINT: see HW9.4)
__HW 9.0 ANSWER__
HW 9.1: MRJob implementation of basic PageRank
Write a basic MRJob implementation of the iterative PageRank algorithm
that takes sparse adjacency lists as input (as explored in HW 7).
Make sure that your implementation utilizes teleportation (1-damping/the number of nodes in the network), and further, distributes the mass of dangling nodes with each iteration so that the output of each iteration is correctly normalized (sums to 1).
[NOTE: The PageRank algorithm assumes that a random surfer (walker), starting from a random web page, chooses the next page to which it will move by clicking at random, with probability d, one of the hyperlinks in the current page. This probability is represented by a so-called ‘damping factor’ d, where d ∈ (0, 1). Otherwise, with probability (1 − d), the surfer jumps to any web page in the network. If a page is a dangling end, meaning it has no outgoing hyperlinks, the random surfer selects an arbitrary web page from a uniform distribution and “teleports” to that page]
As you build your code, use the test data
s3://ucb-mids-mls-networks/PageRank-test.txt
with teleportation parameter set to 0.15 (1-d, where d, the damping factor is set to 0.85), and crosscheck your work with the true result, displayed in the first image in the Wikipedia article:
https://en.wikipedia.org/wiki/PageRank
and here for reference are the corresponding PageRank probabilities:
A,0.033
B,0.384
C,0.343
D,0.039
E,0.081
F,0.039
G,0.016
H,0.016
I,0.016
J,0.016
K,0.016
In [ ]:
'''
Map: emit graph and PR/(num outgoing nodes)
Reduce: sum PR's and emit with graph
'''
In [2]:
%%writefile MRJob_NumberOfNodes.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from sys import maxint
class MRJobNumNodes(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper,
reducer = self.reducer
),
MRStep(
reducer = self.reducer_count
)]
def mapper(self, _, line):
line = line.strip().split('\t')
key = line[0]
value = literal_eval(line[1])
destnodes = value.keys()
yield key, None
for node in destnodes:
yield node, None
def reducer(self,node,_):
yield None, node
def reducer_count(self,_,nodes):
count = 0
for node in nodes:
count+=1
yield None, count
if __name__ == '__main__':
MRJobNumNodes.run()
In [64]:
%%writefile MRJob_PreprocessGraph.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from sys import maxint
class MRJobPreprocessGraph(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper_init = self.mapper_init,
mapper = self.mapper)]
return [MRStep(
mapper_init = self.mapper_init,
mapper = self.mapper,
reducer = self.reducer
)]
def mapper_init(self):
N = 11
self.PR = 1.0/N
def mapper(self, _, line):
line = line.strip().split('\t')
key = line[0]
edges = literal_eval(line[1])
value = [self.PR, edges]
yield key, value
for node in edges.keys():
yield node, [self.PR]
def reducer(self,key,value):
PR = 0
edges = {}
for v in value:
if len(v)==1:
PR = v[0]
else:
PR = v[0]
edges = v[1]
yield key, [PR, edges]
'''
def mapper(self, _, line):
N = 11
line = line.strip().split('\t')
key = line[0]
edges = literal_eval(line[1])
PR = 1.0/N
value = [PR, edges]
yield None,{key:value}
def reducer(self,_,data):
for line in data:
key = line.keys()[0]
value = line[key]
yield key, value
'''
if __name__ == '__main__':
MRJobPreprocessGraph.run()
In [15]:
!python MRJob_PreprocessGraph.py PageRank-test.txt > PageRank-test_input.txt
In [16]:
!cat PageRank-test_input.txt
In [103]:
%%writefile MRJob_PageRankStep1.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
class MRJobPageRank_step1(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
#return [MRStep(
# mapper = self.mapper,
# reducer = self.reducer
# )]
return [MRStep(
mapper = self.mapper,
reducer = self.reducer
),
MRStep(
mapper = self.mapper_redistr)]
dangling_mass = None
def mapper(self, _, line):
line = line.strip().split('\t')
key = line[0]
value = literal_eval(line[1])
PR = value[0]
edges = value[1]
yield key, [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield node, [1.0*PR/outdeg]
def reducer(self,node,data):
global dangling_mass
PR = 0
edges = {}
for value in data:
yield node, data
if len(value)==1:
PR+=value[0]
else:
edges = value[1]
if node=='*m':
dangling_mass = PR
else:
yield node, [PR, edges]
def mapper_redistr(self, node, data, parameter=dangling_mass):
G = int(jobconf_from_env('G'))
alpha = float(jobconf_from_env('alpha'))
m = dangling_mass
PR = data[0]
edges = data[1]
PR_adj = alpha*(1.0/G)+(1-alpha)*(m/G+PR)
yield node, [PR_adj, edges]
if __name__ == '__main__':
MRJobPageRank_step1.run()
In [114]:
#%load_ext autoreload
%autoreload 2
from MRJob_NumberOfNodes import MRJobNumNodes
from MRJob_PreprocessGraph import MRJobPreprocessGraph
from MRJob_PageRankStep1 import MRJobPageRank_step1
def numNodes(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_numnodes = MRJobNumNodes(args=[graphfilename])
with mr_job_numnodes.make_runner() as runner:
runner.run()
for line in runner.stream_output():
null,count = mr_job_numnodes.parse_output_line(line)
print "There are "+str(count)+" nodes in the graph."
return None
def preprocessGraph(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_preprocess = MRJobPreprocessGraph(args=[graphfilename])
outputfilename = graphfilename.split('.')[0]+'_input.txt'
with mr_job_preprocess.make_runner() as runner:
with open(outputfilename, 'w') as f:
runner.run()
for line in runner.stream_output():
node,value = mr_job_preprocess.parse_output_line(line)
f.write(node+'\t'+str(value)+'\n')
return None
def pagerank_step1(graphinputfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_pr1 = MRJobPageRank_step1(args=[graphinputfilename])
danglingmass= 0
with mr_job_pr1.make_runner() as runner:
runner.run()
with open(graphinputfilename, 'w+') as f:
for line in runner.stream_output():
node,value = mr_job_pr1.parse_output_line(line)
if node=='*m':
danglingmass = value[0]
else:
f.write(node+'\t'+str(value)+'\n')
'''
with mr_job_pagerank.make_runner() as runner:
runner.run()
for line in runner.stream_output():
node,value = mr_job_pagerank.parse_output_line(line)
print node, value
'''
return danglingmass
#'--jobconf', undirected_parameter,
def pagerank_step1(graphinputfilename,alpha,G):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_pr1 = MRJobPageRank_step1(args=[graphinputfilename,'--jobconf','alpha=0.15','--jobconf','G=11'])
danglingmass= 0
with mr_job_pr1.make_runner() as runner:
runner.run()
with open(graphinputfilename, 'w+') as f:
for line in runner.stream_output():
node,value = mr_job_pr1.parse_output_line(line)
f.write(node+'\t'+str(value)+'\n')
print line
return None
def hw9_1():
#graphfilename = "graph_input_undirected_toy.txt"
#graphfilename = "graph_input_directed_toy.txt"
graphfilename = "PageRank-test.txt"
#numNodes(graphfilename)
preprocessGraph(graphfilename)
graphinputfilename = graphfilename.split('.')[0]+'_input.txt'
alpha = 0.85
G = 11
#pagerank_step1(graphinputfilename, alpha, G)
#pagerank_step1(graphinputfilename, alpha, G)
for i in range(10):
print "Iteration",i
pagerank_step1(graphinputfilename, alpha, G)
return None
hw9_1()
HW 9.2: Exploring PageRank teleportation and network plots
In order to overcome problems such as disconnected components, the damping factor (a typical value for d is 0.85) can be varied.
Using the graph in HW1, plot the test graph (using networkx, https://networkx.github.io/) for several values of the damping parameter alpha, so that each nodes radius is proportional to its PageRank score. In particular you should do this for the following damping factors: [0,0.25,0.5,0.75, 0.85, 1].
Note your plots should look like the following:
https://en.wikipedia.org/wiki/PageRank#/media/File:PageRanks-Example.svg
HW 9.3: Applying PageRank to the Wikipedia hyperlinks network
Run your PageRank implementation on the Wikipedia dataset for 10 iterations, and display the top 100 ranked nodes (with alpha = 0.85).
Run your PageRank implementation on the Wikipedia dataset for 50 iterations, and display the top 100 ranked nodes (with teleportation factor of 0.15).
Have the top 100 ranked pages changed? Comment on your findings. Plot both 100 curves.
HW 9.4: Topic-specific PageRank implementation using MRJob
Modify your PageRank implementation to produce a topic specific PageRank implementation, as described in:
http://www-cs-students.stanford.edu/~taherh/papers/topic-sensitive-pagerank.pdf
Note in this article that there is a special caveat to ensure that the transition matrix is irreducible.
This caveat lies in footnote 3 on page 3:
A minor caveat: to ensure that M is irreducible when p
contains any 0 entries, nodes not reachable from nonzero
nodes in p should be removed. In practice this is not problematic.
and must be adhered to for convergence to be guaranteed.
Run topic specific PageRank on the following randomly generated network of 100 nodes:
s3://ucb-mids-mls-networks/randNet.txt
which are organized into ten topics, as described in the file:
s3://ucb-mids-mls-networks/randNet_topics.txt
Since there are 10 topics, your result should be 11 PageRank vectors (one for the vanilla PageRank implementation in 9.1, and one for each topic with the topic specific implementation). Print out the top ten ranking nodes and their topics for each of the 11 versions, and comment on your result.
Assume a teleportation factor of 0.15 in all your analyses.
HW 9.5: Applying topic-specific PageRank to Wikipedia
Here you will apply your topic-specific PageRank implementation to Wikipedia, defining topics (very arbitrarily) for each page by the length (number of characters) of the name of the article mod 10, so that there are 10 topics. Once again, print out the top ten ranking nodes and their topics for each of the 11 versions, and comment on your result.
Assume a teleportation factor of 0.15 in all your analyses.
In [ ]:
In [ ]: