Some Imports
In [15]:
#%matplotlib inline
#import matplotlib.pyplot as plt
#import numpy as np
%cd ~/Documents/W261/hw9/
HW9.0:
What is PageRank and what is it used for in the context of web search?
What modifications have to be made to the webgraph in order to leverage the machinery of Markov Chains to compute the steady stade distibuton?
OPTIONAL: In topic-specific pagerank, how can we insure that the irreducible property is satified? (HINT: see HW9.4)
__HW 9.0 ANSWER__
HW 9.1: MRJob implementation of basic PageRank
Write a basic MRJob implementation of the iterative PageRank algorithm
that takes sparse adjacency lists as input (as explored in HW 7).
Make sure that your implementation utilizes teleportation (1-damping/the number of nodes in the network), and further, distributes the mass of dangling nodes with each iteration so that the output of each iteration is correctly normalized (sums to 1).
[NOTE: The PageRank algorithm assumes that a random surfer (walker), starting from a random web page, chooses the next page to which it will move by clicking at random, with probability d, one of the hyperlinks in the current page. This probability is represented by a so-called ‘damping factor’ d, where d ∈ (0, 1). Otherwise, with probability (1 − d), the surfer jumps to any web page in the network. If a page is a dangling end, meaning it has no outgoing hyperlinks, the random surfer selects an arbitrary web page from a uniform distribution and “teleports” to that page]
As you build your code, use the test data
s3://ucb-mids-mls-networks/PageRank-test.txt
with teleportation parameter set to 0.15 (1-d, where d, the damping factor is set to 0.85), and crosscheck your work with the true result, displayed in the first image in the Wikipedia article:
https://en.wikipedia.org/wiki/PageRank
and here for reference are the corresponding PageRank probabilities:
A,0.033
B,0.384
C,0.343
D,0.039
E,0.081
F,0.039
G,0.016
H,0.016
I,0.016
J,0.016
K,0.016
In [ ]:
'''
Map: emit graph and PR/(num outgoing nodes)
Reduce: sum PR's and emit with graph
'''
In [ ]:
%%writefile MRJob_NumberOfNodes.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from sys import maxint
class MRJobNumNodes(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper,
reducer = self.reducer
),
MRStep(
reducer = self.reducer_count
)]
def mapper(self, _, line):
line = line.strip().split('\t')
key = line[0]
value = literal_eval(line[1])
destnodes = value.keys()
yield key, None
for node in destnodes:
yield node, None
def reducer(self,node,_):
yield None, node
def reducer_count(self,_,nodes):
count = 0
for node in nodes:
count+=1
yield None, count
if __name__ == '__main__':
MRJobNumNodes.run()
In [ ]:
%%writefile MRJob_PreprocessGraph.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from sys import maxint
class MRJobPreprocessGraph(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper_init = self.mapper_init,
mapper = self.mapper)]
return [MRStep(
mapper_init = self.mapper_init,
mapper = self.mapper,
reducer = self.reducer
)]
def mapper_init(self):
N = 11
self.PR = 1.0/N
def mapper(self, _, line):
line = line.strip().split('\t')
key = line[0]
edges = literal_eval(line[1])
value = [self.PR, edges]
yield key, value
for node in edges.keys():
yield node, [self.PR]
def reducer(self,key,value):
PR = 0
edges = {}
for v in value:
if len(v)==1:
PR = v[0]
else:
PR = v[0]
edges = v[1]
yield key, [PR, edges]
'''
def mapper(self, _, line):
N = 11
line = line.strip().split('\t')
key = line[0]
edges = literal_eval(line[1])
PR = 1.0/N
value = [PR, edges]
yield None,{key:value}
def reducer(self,_,data):
for line in data:
key = line.keys()[0]
value = line[key]
yield key, value
'''
if __name__ == '__main__':
MRJobPreprocessGraph.run()
In [101]:
!python MRJob_PreprocessGraph.py PageRank-test.txt > PageRank-test_input2.txt
In [102]:
!cat PageRank-test_input2.txt
In [ ]:
%%writefile MRJob_PageRank.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
class MRJobPageRank(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
#return [MRStep(
# mapper = self.mapper,
# reducer = self.reducer
# )]
return [MRStep(
mapper = self.mapper,
reducer = self.reducer
),
MRStep(
mapper = self.mapper_redistr)]
dangling_mass = 1000
def mapper(self, _, line):
line = line.strip().split('\t')
key = line[0]
value = literal_eval(line[1])
PR = value[0]
edges = value[1]
yield key, [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield node, [1.0*PR/outdeg]
def reducer(self,node,data):#, parameter=dangling_mass):
#global dangling_mass
PR = 0
edges = {}
for value in data:
yield node, data
if len(value)==1:
PR+=value[0]
else:
edges = value[1]
if node=='*m':
self.dangling_mass = PR
else:
yield node, [PR, edges]
def mapper_redistr(self, node, data):#, parameter=dangling_mass):
G = int(jobconf_from_env('G'))
alpha = float(jobconf_from_env('alpha'))
m = self.dangling_mass
PR = data[0]
edges = data[1]
PR_adj = alpha*(1.0/G)+(1-alpha)*(m/G+PR)
yield node, [PR_adj, edges]
if __name__ == '__main__':
MRJobPageRank.run()
In [43]:
%%writefile MRJob_PageRank.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
import sys
class MRJobPageRank(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper_firstiter)]+\
[MRStep(
mapper = self.mapper,
combiner = self.combiner,
reducer = self.reducer)]*10
def mapper_firstiter(self, _, line):
line = line.strip().split('\t')
key = line[0]
value = literal_eval(line[1])
PR = value[0]
edges = value[1]
yield key, [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield node, [1.0*PR/outdeg]
def mapper(self, key, value):
sys.stderr.write("MAPPER INPUT: ({0}, {1})\n".format(key,value))
#sys.stderr.write("MAPPER")
PR = value[0]
edges = value[1]
yield key, [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield node, [1.0*PR/outdeg]
def combiner(self,node,data):
PR = 0
edges = {}
for value in data:
if value[0]==None:
yield node, value
else:
PR+=value[0]
yield node, [PR]
def reducer(self,node,data):
#sys.stderr.write("REDUCER NODE: {0}\n".format(node))
G = int(jobconf_from_env('G'))
alpha = float(jobconf_from_env('alpha'))
PR = 0
edges = {}
for value in data:
#yield node, value
if value[0]==None:
edges = value[1]
else:
PR+=value[0]
if node=='*m':
self.m = PR
#sys.stderr.write("REDUCER PR: {0}\n".format(PR))
else:
PR_adj = alpha*(1.0/G)+(1-alpha)*(1.0*self.m/G+PR)
#yield node, PR
#yield node, PR_adj
#line = node+'\t'+str([PR_adj, edges])
#yield line, ''
yield node, [PR_adj, edges]
if __name__ == '__main__':
MRJobPageRank.run()
In [19]:
%load_ext autoreload
%autoreload 2
from MRJob_NumberOfNodes import MRJobNumNodes
from MRJob_PreprocessGraph import MRJobPreprocessGraph
from MRJob_PageRank import MRJobPageRank
def numNodes(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_numnodes = MRJobNumNodes(args=[graphfilename])
with mr_job_numnodes.make_runner() as runner:
runner.run()
for line in runner.stream_output():
null,count = mr_job_numnodes.parse_output_line(line)
print "There are "+str(count)+" nodes in the graph."
return None
def preprocessGraph(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_preprocess = MRJobPreprocessGraph(args=[graphfilename])
outputfilename = graphfilename.split('.')[0]+'_input.txt'
with mr_job_preprocess.make_runner() as runner:
with open(outputfilename, 'w') as f:
runner.run()
for line in runner.stream_output():
node,value = mr_job_preprocess.parse_output_line(line)
f.write(node+'\t'+str(value)+'\n')
return None
def pagerank(graphinputfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_pr1 = MRJobPageRank(args=[graphinputfilename,'--jobconf','alpha=0.15','--jobconf','G=11'])
danglingmass= 0
with mr_job_pr1.make_runner() as runner:
runner.run()
with open(graphinputfilename, 'w+') as f:
for line in runner.stream_output():
node,value = mr_job_pr1.parse_output_line(line)
f.write(node+'\t'+str(value)+'\n')
print line
return None
def hw9_1():
graphfilename = "PageRank-test.txt"
#numNodes(graphfilename)
preprocessGraph(graphfilename)
graphinputfilename = graphfilename.split('.')[0]+'_input.txt'
#pagerank_step1(graphinputfilename, alpha, G)
#pagerank_step1(graphinputfilename, alpha, G)
for i in range(1):
print "Iteration",i
pagerank(graphinputfilename)
return None
hw9_1()
HW 9.2: Exploring PageRank teleportation and network plots
In order to overcome problems such as disconnected components, the damping factor (a typical value for d is 0.85) can be varied.
Using the graph in HW1, plot the test graph (using networkx, https://networkx.github.io/) for several values of the damping parameter alpha, so that each nodes radius is proportional to its PageRank score. In particular you should do this for the following damping factors: [0,0.25,0.5,0.75, 0.85, 1].
Note your plots should look like the following:
https://en.wikipedia.org/wiki/PageRank#/media/File:PageRanks-Example.svg
HW 9.3: Applying PageRank to the Wikipedia hyperlinks network
Run your PageRank implementation on the Wikipedia dataset for 10 iterations, and display the top 100 ranked nodes (with alpha = 0.85).
Run your PageRank implementation on the Wikipedia dataset for 50 iterations, and display the top 100 ranked nodes (with teleportation factor of 0.15).
Have the top 100 ranked pages changed? Comment on your findings. Plot both 100 curves.
In [61]:
'''
OLD PAGE RANK WITH GLOBAL VAR FOR DANGLING MASS
'''
%%writefile MRJob_PageRank.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
import sys
class MRJobPageRank(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper_firstiter,
reducer = self.reducer
),
MRStep(
mapper = self.mapper_redistr)]+\
[MRStep(
mapper = self.mapper,
reducer = self.reducer
),
MRStep(
mapper = self.mapper_redistr)]*10
dangling_mass = None
def mapper_firstiter(self, _, line):
line = line.strip().split('\t')
key = line[0]
value = literal_eval(line[1])
PR = value[0]
edges = value[1]
yield key, [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield node, [1.0*PR/outdeg]
def mapper(self, key, value):
#sys.stderr.write("\nMAPPER INPUT: ({0}, {1})\n".format(key,value))
#sys.stderr.write("MAPPER")
PR = value[0]
edges = value[1]
yield key, [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield node, [1.0*PR/outdeg]
def reducer(self,node,data):
#sys.stderr.write("REDUCER INPUT: ({0}, {1})\n".format(node,data))
#global dangling_mass
PR = 0
edges = {}
for value in data:
yield node, data
if len(value)==1:
PR+=value[0]
else:
edges = value[1]
if node=='*m':
#dangling_mass = PR
with open('danglingmass.txt', 'w+') as mfile:
mfile.write(str(PR))
else:
yield node, [PR, edges]
def mapper_redistr(self, node, data, parameter=dangling_mass):
#sys.stderr.write("MAPPER2 INPUT: ({0}, {1})\n".format(node,data))
G = int(jobconf_from_env('G'))
alpha = float(jobconf_from_env('alpha'))
#global dangling_mass
with open('danglingmass.txt', 'r') as mfile:
mline = mfile.readline()
m = float(mline.strip())
#m = dangling_mass
#yield node, G*alpha
PR = data[0]
edges = data[1]
PR_adj = alpha*(1.0/G)+(1-alpha)*(m/G+PR)
yield node, [PR_adj, edges]
if __name__ == '__main__':
MRJobPageRank.run()
In [84]:
import boto
from boto.s3.connection import S3Connection
from boto.s3.key import Key
conn = S3Connection('XXXXX', 'XXXXX')
b = conn.get_bucket('ucb-mids-mls-katieadams')
k = Key(b)
k.key = 'danglingmass.txt'
m = 0.000056789
k.set_contents_from_string(str(m))
#k.set_contents_from_filename('danglingmass.txt')
m = float(k.get_contents_as_string())*2
print m
k.get_contents_to_filename('danglingmass_fromS3.txt')
b.delete_key(k)
Out[84]:
In [99]:
%%writefile MRJob_PageRank.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
import sys
import boto
from boto.s3.connection import S3Connection
from boto.s3.key import Key
class MRJobPageRank(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper_firstiter,
reducer_init = self.reducer_init,
reducer = self.reducer
),
MRStep(
mapper = self.mapper_redistr)]+\
[MRStep(
mapper = self.mapper,
reducer_init = self.reducer_init,
reducer = self.reducer
),
MRStep(
mapper = self.mapper_redistr)]*10
dangling_mass = None
def mapper_firstiter(self, _, line):
line = line.strip().split('\t')
key = line[0]
value = literal_eval(line[1])
PR = value[0]
edges = value[1]
yield key, [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield node, [1.0*PR/outdeg]
def mapper(self, key, value):
#sys.stderr.write("\nMAPPER INPUT: ({0}, {1})\n".format(key,value))
#sys.stderr.write("MAPPER")
PR = value[0]
edges = value[1]
yield key, [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield node, [1.0*PR/outdeg]
def reducer_init(self):
conn = S3Connection('XXXXX', 'XXXXX')
b = conn.get_bucket('ucb-mids-mls-katieadams')
self.k = Key(b)
self.k.key = 'danglingmass.txt'
b.delete_key(self.k)
def reducer(self,node,data):
#sys.stderr.write("REDUCER INPUT: ({0}, {1})\n".format(node,data))
#global dangling_mass
PR = 0
edges = {}
for value in data:
yield node, data
if len(value)==1:
PR+=value[0]
else:
edges = value[1]
if node=='*m':
self.k.set_contents_from_string(str(PR))
#with open('danglingmass.txt', 'w+') as mfile:
# mfile.write(str(PR))
else:
yield node, [PR, edges]
def mapper_redistr(self, node, data):
conn = S3Connection('XXXXXX', 'XXXXXX')
b = conn.get_bucket('ucb-mids-mls-katieadams')
k = Key(b)
k.key = 'danglingmass.txt'
#sys.stderr.write("MAPPER2 INPUT: ({0}, {1})\n".format(node,data))
G = int(jobconf_from_env('G'))
alpha = float(jobconf_from_env('alpha'))
m = float(k.get_contents_as_string())
#with open('danglingmass.txt', 'r') as mfile:
# mline = mfile.readline()
#m = float(mline.strip())
PR = data[0]
edges = data[1]
PR_adj = alpha*(1.0/G)+(1-alpha)*(m/G+PR)
yield node, [PR_adj, edges]
if __name__ == '__main__':
MRJobPageRank.run()
In [100]:
!python MRJob_PageRank.py PageRank-test_input.txt --jobconf alpha=0.15 --jobconf G=11 --file danglingmass.txt
In [155]:
%%writefile MRJob_PreprocessGraph.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
class MRJobPreprocessGraph(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper_init = self.mapper_init,
mapper = self.mapper)]
return [MRStep(
mapper_init = self.mapper_init,
mapper = self.mapper,
reducer = self.reducer
)]
def mapper_init(self):
G = int(jobconf_from_env('G'))
self.PR = 1.0/G
def mapper(self, _, line):
line = line.strip().split('\t')
key = line[0]
edges = literal_eval(line[1])
value = [self.PR, edges]
yield int(key), value
for node in edges.keys():
yield int(node), [self.PR]
def reducer(self,key,value):
PR = 0
edges = {}
for v in value:
if len(v)==1:
PR = v[0]
else:
PR = v[0]
edges = v[1]
yield int(key), [PR, edges]
if __name__ == '__main__':
MRJobPreprocessGraph.run()
In [114]:
!python MRJob_PreprocessGraph.py PageRank-testIndexed.txt --jobconf 'G=11' > PageRank-testIndexed_input.txt
In [160]:
%%writefile MRJob_PageRank.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
class MRJobPageRank(MRJob):
def configure_options(self):
super(MRJobPageRank, self).configure_options()
self.add_passthrough_option(
'--iterations',
dest = 'iterations',
default = 10,
type='int',
help='number of iterations to run')
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper_firstiter,
reducer = self.reducer
),
MRStep(
reducer = self.reducer_redistr)]+\
[MRStep(
mapper = self.mapper,
reducer = self.reducer
),
MRStep(
reducer = self.reducer_redistr)]*self.options.iterations
def mapper_firstiter(self, _, line):
line = line.strip().split('\t')
key = line[0]
value = literal_eval(line[1])
PR = value[0]
edges = value[1]
yield int(key), [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield int(node), [1.0*PR/outdeg]
def mapper(self, key, value):
PR = value[0]
edges = value[1]
yield int(key), [None,edges]
destnodes = edges.keys()
outdeg = len(destnodes)
if outdeg==0:
yield '*m',[PR]
for node in destnodes:
yield int(node), [1.0*PR/outdeg]
def reducer(self,node,data):
alpha = float(jobconf_from_env('alpha'))
G = int(jobconf_from_env('G'))
PR = 0
edges = {}
for value in data:
if len(value)==1:
PR+=value[0]
else:
edges = value[1]
if node=='*m':
PR_adj = (1.0/G)*(1.0-alpha)*PR
for i in range(G):
yield i+1, [PR_adj]
else:
PR_adj = alpha*(1.0/G)+(1.0-alpha)*PR
yield int(node), [PR_adj, edges]
def reducer_redistr(self, node, data):
PR = 0
edges = {}
for value in data:
PR+=value[0]
if len(value)>1:
edges = value[1]
yield int(node), [PR, edges]
if __name__ == '__main__':
MRJobPageRank.run()
In [146]:
!python MRJob_PageRank.py PageRank-testIndexed_input.txt --jobconf alpha=0.15 --jobconf G=11 --iterations 50
In [150]:
!python MRJob_PageRank.py s3://ucb-mids-mls-katieadams/PageRank-testIndexed_input.txt --jobconf alpha=0.15 --jobconf G=11 --iterations 5
In [ ]:
!python MRJob_PageRank.py s3://ucb-mids-mls-katieadams/PageRank-testIndexed_input.txt --jobconf alpha=0.15 --jobconf G=11 --iterations 5 --output-dir s3://ucb-mids-mls-katieadams/outputtest --no-output
In [ ]:
!python MRJob_PageRank.py s3://ucb-mids-mls-katieadams/PageRank-testIndexed_input.txt --jobconf alpha=0.15 --jobconf G=11 --iterations 5 -r emr --num-ec2-instances 2 --ec2-task-instance-type m1.medium --output-dir s3://ucb-mids-mls-katieadams/outputtest --no-output --pool-emr-job-flows --max-hours-idle=1
In [153]:
!python MRJob_PageRank.py s3://ucb-mids-mls-katieadams/PageRank-testIndexed_input.txt --jobconf alpha=0.15 --jobconf G=11 --iterations 5 -r emr --num-ec2-instances 2 --ec2-task-instance-type m1.medium --output-dir s3://ucb-mids-mls-katieadams/outputtest --no-output
In [156]:
!python MRJob_PreprocessGraph.py s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt --jobconf G=15192277 -r emr --num-ec2-instances 15 --ec2-task-instance-type m1.medium --output-dir s3://ucb-mids-mls-katieadams/output-wiki-preprocess --no-output
In [158]:
!python MRJob_PageRank.py s3://ucb-mids-mls-katieadams/output-wiki-preprocess/* --jobconf alpha=0.15 --jobconf G=15192277 --iterations 1 -r emr --num-ec2-instances 15 --ec2-instance-type m1.medium --output-dir s3://ucb-mids-mls-katieadams/output-wiki-degub --no-output
In [ ]:
!python MRJob_PageRank.py s3://ucb-mids-mls-katieadams/output-wiki-preprocess/* --jobconf alpha=0.85 --jobconf G=15192277 --iterations 9 -r emr --num-ec2-instances 15 --ec2-instance-type m1.medium --output-dir s3://ucb-mids-mls-katieadams/output-wiki-10x --no-output
In [ ]:
#%load_ext autoreload
%autoreload 2
from MRJob_NumberOfNodes import MRJobNumNodes
from MRJob_PreprocessGraph import MRJobPreprocessGraph
from MRJob_PageRank import MRJobPageRank
def numNodes(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_numnodes = MRJobNumNodes(args=[graphfilename])
with mr_job_numnodes.make_runner() as runner:
runner.run()
for line in runner.stream_output():
null,count = mr_job_numnodes.parse_output_line(line)
print "There are "+str(count)+" nodes in the graph."
return None
def preprocessGraph(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_preprocess = MRJobPreprocessGraph(args=[graphfilename])
outputfilename = graphfilename.split('.')[0]+'_input.txt'
with mr_job_preprocess.make_runner() as runner:
with open(outputfilename, 'w') as f:
runner.run()
for line in runner.stream_output():
node,value = mr_job_preprocess.parse_output_line(line)
f.write(node+'\t'+str(value)+'\n')
return None
def pagerank(graphinputfilename,alpha,G):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_pr1 = MRJobPageRank(args=[graphinputfilename,'--jobconf','alpha=0.15','--jobconf','G=11'])
danglingmass= 0
with mr_job_pr1.make_runner() as runner:
runner.run()
with open(graphinputfilename, 'w+') as f:
for line in runner.stream_output():
node,value = mr_job_pr1.parse_output_line(line)
f.write(node+'\t'+str(value)+'\n')
print line
return None
def hw9_1():
graphfilename = "PageRank-test.txt"
#numNodes(graphfilename)
preprocessGraph(graphfilename)
graphinputfilename = graphfilename.split('.')[0]+'_input.txt'
alpha = 0.85
G = 11
#pagerank_step1(graphinputfilename, alpha, G)
#pagerank_step1(graphinputfilename, alpha, G)
for i in range(20):
print "Iteration",i
pagerank(graphinputfilename, alpha, G)
return None
hw9_1()
In [1]:
!python MRJob_NumberOfNodes.py all-pages-indexed-out.txt -r emr --num-ec2-instances 10 --ec2-task-instance-type m1.medium
In [ ]:
#%load_ext autoreload
%autoreload 2
from MRJob_NumberOfNodes import MRJobNumNodes
from MRJob_PreprocessGraph import MRJobPreprocessGraph
from MRJob_PageRank import MRJobPageRank
def numNodes(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_numnodes = MRJobNumNodes(args=[graphfilename])
with mr_job_numnodes.make_runner() as runner:
runner.run()
for line in runner.stream_output():
null,count = mr_job_numnodes.parse_output_line(line)
print "There are "+str(count)+" nodes in the graph."
return None
def preprocessGraph(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_preprocess = MRJobPreprocessGraph(args=[graphfilename])
outputfilename = 'PageRank-wiki_input.txt'
with mr_job_preprocess.make_runner() as runner:
with open(outputfilename, 'w') as f:
runner.run()
for line in runner.stream_output():
node,value = mr_job_preprocess.parse_output_line(line)
f.write(node+'\t'+str(value)+'\n')
return None
def pagerank(graphinputfilename,alpha,G):
#mr_job_pr1 = MRJobPageRank(args=[graphinputfilename,'--jobconf','alpha=0.15','--jobconf','G=11','-r',"emr", "--num-ec2-instances", "1", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_pr1 = MRJobPageRank(args=[graphinputfilename,'--jobconf','alpha=0.15','--jobconf','G=11','--jobconf', 'mapred.reduce.tasks=1','-r',"emr", "--num-ec2-instances", "10", "--ec2-task-instance-type", "m1.medium", "--pool-emr-job-flows", "--max-hours-idle=1"])
#mr_job_pr1 = MRJobPageRank(args=[graphinputfilename,'--jobconf','alpha=0.15','--jobconf','G=11'])
with mr_job_pr1.make_runner() as runner:
runner.run()
with open(graphinputfilename, 'w+') as f:
for line in runner.stream_output():
node,value = mr_job_pr1.parse_output_line(line)
f.write(node+'\t'+str(value)+'\n')
print line
return None
def hw9_3():
#graphfilename = "s3://ucb-mids-mls-katieadams/PageRank-test.txt"
graphfilename = "all-pages-indexed-out.txt"
#graphfilename = "PageRank-test.txt"
#numNodes(graphfilename)
preprocessGraph(graphfilename)
graphinputfilename = "PageRank-wiki_input.txt"
alpha = 0.85
G = 11
#pagerank_step1(graphinputfilename, alpha, G)
#pagerank(graphinputfilename, alpha, G)
for i in range(10):
print "Iteration",i
pagerank(graphinputfilename, alpha, G)
return None
hw9_3()
HW 9.4: Topic-specific PageRank implementation using MRJob
Modify your PageRank implementation to produce a topic specific PageRank implementation, as described in:
http://www-cs-students.stanford.edu/~taherh/papers/topic-sensitive-pagerank.pdf
Note in this article that there is a special caveat to ensure that the transition matrix is irreducible.
This caveat lies in footnote 3 on page 3:
A minor caveat: to ensure that M is irreducible when p
contains any 0 entries, nodes not reachable from nonzero
nodes in p should be removed. In practice this is not problematic.
and must be adhered to for convergence to be guaranteed.
Run topic specific PageRank on the following randomly generated network of 100 nodes:
s3://ucb-mids-mls-networks/randNet.txt
which are organized into ten topics, as described in the file:
s3://ucb-mids-mls-networks/randNet_topics.txt
Since there are 10 topics, your result should be 11 PageRank vectors (one for the vanilla PageRank implementation in 9.1, and one for each topic with the topic specific implementation). Print out the top ten ranking nodes and their topics for each of the 11 versions, and comment on your result.
Assume a teleportation factor of 0.15 in all your analyses.
HW 9.5: Applying topic-specific PageRank to Wikipedia
Here you will apply your topic-specific PageRank implementation to Wikipedia, defining topics (very arbitrarily) for each page by the length (number of characters) of the name of the article mod 10, so that there are 10 topics. Once again, print out the top ten ranking nodes and their topics for each of the 11 versions, and comment on your result.
Assume a teleportation factor of 0.15 in all your analyses.
In [ ]:
In [ ]: