DATASCI W261: Machine Learning at Scale

Week 10, Homework 9

Group C: Katrina Adams, Eric Freeman, Doug Kelley,,

3 November 2015

Some Imports

In [ ]:
#%matplotlib inline

#import matplotlib.pyplot as plt
#import numpy as np

%cd ~/Documents/W261/hw9/

What is PageRank and what is it used for in the context of web search?
What modifications have to be made to the webgraph in order to leverage the machinery of Markov Chains to compute the steady stade distibuton?
OPTIONAL: In topic-specific pagerank, how can we insure that the irreducible property is satified? (HINT: see HW9.4)

__HW 9.0 ANSWER__

HW 9.1: MRJob implementation of basic PageRank

Write a basic MRJob implementation of the iterative PageRank algorithm that takes sparse adjacency lists as input (as explored in HW 7).
Make sure that your implementation utilizes teleportation (1-damping/the number of nodes in the network), and further, distributes the mass of dangling nodes with each iteration so that the output of each iteration is correctly normalized (sums to 1).
[NOTE: The PageRank algorithm assumes that a random surfer (walker), starting from a random web page, chooses the next page to which it will move by clicking at random, with probability d, one of the hyperlinks in the current page. This probability is represented by a so-called ‘damping factor’ d, where d ∈ (0, 1). Otherwise, with probability (1 − d), the surfer jumps to any web page in the network. If a page is a dangling end, meaning it has no outgoing hyperlinks, the random surfer selects an arbitrary web page from a uniform distribution and “teleports” to that page]

As you build your code, use the test data


with teleportation parameter set to 0.15 (1-d, where d, the damping factor is set to 0.85), and crosscheck your work with the true result, displayed in the first image in the Wikipedia article:

and here for reference are the corresponding PageRank probabilities:


In [ ]:

Map: emit graph and PR/(num outgoing nodes)
Reduce: sum PR's and emit with graph


In [2]:
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from sys import maxint

class MRJobNumNodes(MRJob):

    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper = self.mapper)]
        return [MRStep(
                mapper = self.mapper,
                reducer = self.reducer
                reducer = self.reducer_count
    def mapper(self, _, line):
        line = line.strip().split('\t')
        key = line[0]
        value = literal_eval(line[1])
        destnodes = value.keys()
        yield key, None
        for node in destnodes:
            yield node, None
    def reducer(self,node,_):
        yield None, node
    def reducer_count(self,_,nodes):
        count = 0
        for node in nodes:
        yield None, count

if __name__ == '__main__':


In [64]:
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from sys import maxint

class MRJobPreprocessGraph(MRJob):

    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper_init = self.mapper_init,
                           mapper = self.mapper)]
        return [MRStep(
                mapper_init = self.mapper_init,
                mapper = self.mapper,
                reducer = self.reducer
    def mapper_init(self):
        N = 11
        self.PR = 1.0/N
    def mapper(self, _, line):
        line = line.strip().split('\t')
        key = line[0]
        edges = literal_eval(line[1])
        value = [self.PR, edges]
        yield key, value
        for node in edges.keys():
            yield node, [self.PR]

    def reducer(self,key,value):
        PR = 0
        edges = {}
        for v in value:
            if len(v)==1:
                PR = v[0]
                PR = v[0]
                edges = v[1]
        yield key, [PR, edges]

    def mapper(self, _, line):
        N = 11
        line = line.strip().split('\t')
        key = line[0]
        edges = literal_eval(line[1])
        PR = 1.0/N
        value = [PR, edges]
        yield None,{key:value}

    def reducer(self,_,data):
        for line in data:
            key = line.keys()[0]
            value = line[key]
            yield key, value
if __name__ == '__main__':


In [15]:
!python PageRank-test.txt > PageRank-test_input.txt

using configs in /Users/davidadams/.mrjob.conf
creating tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151102.020723.925265

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at

writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151102.020723.925265/step-0-mapper_part-00000
Counters from step 1:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151102.020723.925265/step-0-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151102.020723.925265/step-0-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151102.020723.925265/step-0-reducer_part-00000
Counters from step 1:
  (no counters found)
Moving /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151102.020723.925265/step-0-reducer_part-00000 -> /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151102.020723.925265/output/part-00000
Streaming final output from /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151102.020723.925265/output
removing tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151102.020723.925265

In [16]:
!cat PageRank-test_input.txt

"B"	[0.09090909090909091, {"C": 1}]
"C"	[0.09090909090909091, {"B": 1}]
"D"	[0.09090909090909091, {"A": 1, "B": 1}]
"E"	[0.09090909090909091, {"B": 1, "D": 1, "F": 1}]
"F"	[0.09090909090909091, {"B": 1, "E": 1}]
"G"	[0.09090909090909091, {"B": 1, "E": 1}]
"H"	[0.09090909090909091, {"B": 1, "E": 1}]
"I"	[0.09090909090909091, {"B": 1, "E": 1}]
"J"	[0.09090909090909091, {"E": 1}]
"K"	[0.09090909090909091, {"E": 1}]

In [103]:
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from mrjob.compat import jobconf_from_env

class MRJobPageRank_step1(MRJob):

    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper = self.mapper)]
        #return [MRStep(
        #        mapper = self.mapper,
        #        reducer = self.reducer
        #        )]
        return [MRStep(
                mapper = self.mapper,
                reducer = self.reducer
                mapper = self.mapper_redistr)]
    dangling_mass = None
    def mapper(self, _, line):
        line = line.strip().split('\t')
        key = line[0]
        value = literal_eval(line[1])

        PR = value[0]
        edges = value[1]
        yield key, [None,edges]
        destnodes = edges.keys()
        outdeg = len(destnodes)
        if outdeg==0:
            yield '*m',[PR]
        for node in destnodes:
            yield node, [1.0*PR/outdeg]
    def reducer(self,node,data):
        global dangling_mass
        PR = 0
        edges = {}
        for value in data:
            yield node, data
            if len(value)==1:
                edges = value[1]
        if node=='*m':
            dangling_mass = PR
            yield node, [PR, edges]
    def mapper_redistr(self, node, data, parameter=dangling_mass):
        G = int(jobconf_from_env('G'))
        alpha = float(jobconf_from_env('alpha'))
        m = dangling_mass
        PR = data[0]
        edges = data[1]
        PR_adj = alpha*(1.0/G)+(1-alpha)*(m/G+PR)
        yield node, [PR_adj, edges]

if __name__ == '__main__':


In [114]:
#%load_ext autoreload
%autoreload 2

from MRJob_NumberOfNodes import MRJobNumNodes
from MRJob_PreprocessGraph import MRJobPreprocessGraph
from MRJob_PageRankStep1 import MRJobPageRank_step1

def numNodes(graphfilename):
    #mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
    mr_job_numnodes = MRJobNumNodes(args=[graphfilename])

    with mr_job_numnodes.make_runner() as runner:
        for line in runner.stream_output():
            null,count =  mr_job_numnodes.parse_output_line(line)
            print "There are "+str(count)+" nodes in the graph."
    return None

def preprocessGraph(graphfilename):
    #mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
    mr_job_preprocess = MRJobPreprocessGraph(args=[graphfilename])
    outputfilename = graphfilename.split('.')[0]+'_input.txt'
    with mr_job_preprocess.make_runner() as runner:
        with open(outputfilename, 'w') as f:
            for line in runner.stream_output():
                node,value =  mr_job_preprocess.parse_output_line(line)
    return None

def pagerank_step1(graphinputfilename):
    #mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
    mr_job_pr1 = MRJobPageRank_step1(args=[graphinputfilename])
    danglingmass= 0
    with mr_job_pr1.make_runner() as runner:
        with open(graphinputfilename, 'w+') as f:
            for line in runner.stream_output():
                node,value =  mr_job_pr1.parse_output_line(line)
                if node=='*m':
                    danglingmass = value[0]
    with mr_job_pagerank.make_runner() as runner:
        for line in runner.stream_output():
            node,value =  mr_job_pagerank.parse_output_line(line)
            print node, value
    return danglingmass

#'--jobconf', undirected_parameter,

def pagerank_step1(graphinputfilename,alpha,G):
    #mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
    mr_job_pr1 = MRJobPageRank_step1(args=[graphinputfilename,'--jobconf','alpha=0.15','--jobconf','G=11'])
    danglingmass= 0
    with mr_job_pr1.make_runner() as runner:
        with open(graphinputfilename, 'w+') as f:
            for line in runner.stream_output():
                node,value =  mr_job_pr1.parse_output_line(line)
                print line

    return None

def hw9_1():

    #graphfilename = "graph_input_undirected_toy.txt"
    #graphfilename = "graph_input_directed_toy.txt"
    graphfilename = "PageRank-test.txt"

    graphinputfilename = graphfilename.split('.')[0]+'_input.txt'
    alpha = 0.85
    G = 11
    #pagerank_step1(graphinputfilename, alpha, G)
    #pagerank_step1(graphinputfilename, alpha, G)
    for i in range(10):
        print "Iteration",i
        pagerank_step1(graphinputfilename, alpha, G)
    return None


WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at
Iteration 0
"A"	[0.059297520661157024, {}]
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at
"B"	[0.3168732782369146, {"C": 1}]

"C"	[0.09793388429752066, {"B": 1}]

"D"	[0.046418732782369146, {"A": 1, "B": 1}]

"E"	[0.32975206611570246, {"B": 1, "D": 1, "F": 1}]

"F"	[0.046418732782369146, {"B": 1, "E": 1}]

"G"	[0.02066115702479339, {"B": 1, "E": 1}]

"H"	[0.02066115702479339, {"B": 1, "E": 1}]

"I"	[0.02066115702479339, {"B": 1, "E": 1}]

"J"	[0.02066115702479339, {"E": 1}]

"K"	[0.02066115702479339, {"E": 1}]

Iteration 1
"A"	[0.03794640621086902, {}]
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at
"B"	[0.26069089656899574, {"C": 1}]

"C"	[0.2875607312797395, {"B": 1}]

"D"	[0.11164819684447784, {"A": 1, "B": 1}]

"E"	[0.09941334835962935, {"B": 1, "D": 1, "F": 1}]

"F"	[0.11164819684447784, {"B": 1, "E": 1}]

"G"	[0.018218444778362132, {"B": 1, "E": 1}]

"H"	[0.018218444778362132, {"B": 1, "E": 1}]

"I"	[0.018218444778362132, {"B": 1, "E": 1}]

"J"	[0.018218444778362132, {"E": 1}]

"K"	[0.018218444778362132, {"E": 1}]

Iteration 2
"A"	[0.06401906959337932, {}]
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at
"B"	[0.4072918073010343, {"C": 1}]

"C"	[0.2381558480181226, {"B": 1}]

"D"	[0.04473570130303789, {"A": 1, "B": 1}]

"E"	[0.11821894280900667, {"B": 1, "D": 1, "F": 1}]

"F"	[0.04473570130303789, {"B": 1, "E": 1}]

"G"	[0.016568585934476243, {"B": 1, "E": 1}]

"H"	[0.016568585934476243, {"B": 1, "E": 1}]

"I"	[0.016568585934476243, {"B": 1, "E": 1}]

"J"	[0.016568585934476243, {"E": 1}]

"K"	[0.016568585934476243, {"E": 1}]

Iteration 3
"A"	[0.037595964795097685, {}]
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at
"B"	[0.31366142285996873, {"C": 1}]

"C"	[0.3647813279471857, {"B": 1}]

"D"	[0.05207865887052514, {"A": 1, "B": 1}]

"E"	[0.08688750795016452, {"B": 1, "D": 1, "F": 1}]

"F"	[0.05207865887052514, {"B": 1, "E": 1}]

"G"	[0.018583291741306584, {"B": 1, "E": 1}]

"H"	[0.018583291741306584, {"B": 1, "E": 1}]

"I"	[0.018583291741306584, {"B": 1, "E": 1}]

"J"	[0.018583291741306584, {"E": 1}]

"K"	[0.018583291741306584, {"E": 1}]

Iteration 4
"A"	[0.038674936390503456, {}]

"B"	[0.419184319388297, {"C": 1}]

"C"	[0.28315371580150367, {"B": 1}]

"D"	[0.04115963362307689, {"A": 1, "B": 1}]

"E"	[0.09396022932089054, {"B": 1, "D": 1, "F": 1}]

"F"	[0.04115963362307689, {"B": 1, "E": 1}]

"G"	[0.016541506370530274, {"B": 1, "E": 1}]

"H"	[0.016541506370530274, {"B": 1, "E": 1}]

"I"	[0.016541506370530274, {"B": 1, "E": 1}]

"J"	[0.016541506370530274, {"E": 1}]

"K"	[0.016541506370530274, {"E": 1}]

Iteration 5
"A"	[0.03411772573816476, {}]
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at
"B"	[0.34000371405592894, {"C": 1}]

"C"	[0.3729315529284095, {"B": 1}]

"D"	[0.04324694642260941, {"A": 1, "B": 1}]

"E"	[0.08332870719049233, {"B": 1, "D": 1, "F": 1}]

"F"	[0.04324694642260941, {"B": 1, "E": 1}]

"G"	[0.016624881448357085, {"B": 1, "E": 1}]

"H"	[0.016624881448357085, {"B": 1, "E": 1}]

"I"	[0.016624881448357085, {"B": 1, "E": 1}]

"J"	[0.016624881448357085, {"E": 1}]

"K"	[0.016624881448357085, {"E": 1}]

Iteration 6
"A"	[0.03465268558210355, {}]

"B"	[0.4148309820181553, {"C": 1}]

"C"	[0.3052758903000341, {"B": 1}]

"D"	[0.039882533723134043, {"A": 1, "B": 1}]

"E"	[0.08411170789096588, {"B": 1, "D": 1, "F": 1}]

"F"	[0.039882533723134043, {"B": 1, "E": 1}]

"G"	[0.01627273335249455, {"B": 1, "E": 1}]

"H"	[0.01627273335249455, {"B": 1, "E": 1}]

"I"	[0.01627273335249455, {"B": 1, "E": 1}]

"J"	[0.01627273335249455, {"E": 1}]

"K"	[0.01627273335249455, {"E": 1}]

Iteration 7
"A"	[0.03326414799094906, {}]
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at
"B"	[0.35427811717184754, {"C": 1}]

"C"	[0.36892040587404906, {"B": 1}]

"D"	[0.04014572172772409, {"A": 1, "B": 1}]

"E"	[0.08167552971462036, {"B": 1, "D": 1, "F": 1}]

"F"	[0.04014572172772409, {"B": 1, "E": 1}]

"G"	[0.01631407115861709, {"B": 1, "E": 1}]

"H"	[0.01631407115861709, {"B": 1, "E": 1}]

"I"	[0.01631407115861709, {"B": 1, "E": 1}]

"J"	[0.01631407115861709, {"E": 1}]

"K"	[0.01631407115861709, {"E": 1}]

Iteration 8
"A"	[0.03326870680631062, {}]
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at
"B"	[0.40785482434658094, {"C": 1}]

"C"	[0.3173431746680983, {"B": 1}]

"D"	[0.039348175157836986, {"A": 1, "B": 1}]

"E"	[0.08180306850319648, {"B": 1, "D": 1, "F": 1}]

"F"	[0.039348175157836986, {"B": 1, "E": 1}]

"G"	[0.01620677507202788, {"B": 1, "E": 1}]

"H"	[0.01620677507202788, {"B": 1, "E": 1}]

"I"	[0.01620677507202788, {"B": 1, "E": 1}]

"J"	[0.01620677507202788, {"E": 1}]

"K"	[0.01620677507202788, {"E": 1}]

Iteration 9
"A"	[0.03293010178620472, {}]

"B"	[0.36323594898891015, {"C": 1}]

"C"	[0.36288372803871777, {"B": 1}]

"D"	[0.03938466342002967, {"A": 1, "B": 1}]

"E"	[0.08114525762548766, {"B": 1, "D": 1, "F": 1}]

"F"	[0.03938466342002967, {"B": 1, "E": 1}]

"G"	[0.016207127344124002, {"B": 1, "E": 1}]

"H"	[0.016207127344124002, {"B": 1, "E": 1}]

"I"	[0.016207127344124002, {"B": 1, "E": 1}]

"J"	[0.016207127344124002, {"E": 1}]

"K"	[0.016207127344124002, {"E": 1}]

HW 9.2: Exploring PageRank teleportation and network plots

In order to overcome problems such as disconnected components, the damping factor (a typical value for d is 0.85) can be varied.
Using the graph in HW1, plot the test graph (using networkx, for several values of the damping parameter alpha, so that each nodes radius is proportional to its PageRank score. In particular you should do this for the following damping factors: [0,0.25,0.5,0.75, 0.85, 1].
Note your plots should look like the following:

HW 9.3: Applying PageRank to the Wikipedia hyperlinks network

Run your PageRank implementation on the Wikipedia dataset for 10 iterations, and display the top 100 ranked nodes (with alpha = 0.85).

Run your PageRank implementation on the Wikipedia dataset for 50 iterations, and display the top 100 ranked nodes (with teleportation factor of 0.15).
Have the top 100 ranked pages changed? Comment on your findings. Plot both 100 curves.

HW 9.4: Topic-specific PageRank implementation using MRJob

Modify your PageRank implementation to produce a topic specific PageRank implementation, as described in:

Note in this article that there is a special caveat to ensure that the transition matrix is irreducible.
This caveat lies in footnote 3 on page 3:

A minor caveat: to ensure that M is irreducible when p
contains any 0 entries, nodes not reachable from nonzero
nodes in p should be removed. In practice this is not problematic.

and must be adhered to for convergence to be guaranteed.

Run topic specific PageRank on the following randomly generated network of 100 nodes:

which are organized into ten topics, as described in the file:

Since there are 10 topics, your result should be 11 PageRank vectors (one for the vanilla PageRank implementation in 9.1, and one for each topic with the topic specific implementation). Print out the top ten ranking nodes and their topics for each of the 11 versions, and comment on your result.
Assume a teleportation factor of 0.15 in all your analyses.

HW 9.5: Applying topic-specific PageRank to Wikipedia

Here you will apply your topic-specific PageRank implementation to Wikipedia, defining topics (very arbitrarily) for each page by the length (number of characters) of the name of the article mod 10, so that there are 10 topics. Once again, print out the top ten ranking nodes and their topics for each of the 11 versions, and comment on your result.
Assume a teleportation factor of 0.15 in all your analyses.

In [ ]:

In [ ]: