DATASCI W261: Machine Learning at Scale

Week 10, Homework 9

Group C: Katrina Adams, Eric Freeman, Doug Kelley

kradams@ischool.berkeley.edu, ericfreeman@ischool.berkeley.edu, kelleydac@ischool.berkeley.edu

3 November 2015



In [15]:
%cd ~/Documents/W261/hw9/


/Users/davidadams/Documents/W261/hw9

HW9.0:
What is PageRank and what is it used for in the context of web search?
What modifications have to be made to the webgraph in order to leverage the machinery of Markov Chains to compute the steady stade distibuton?
OPTIONAL: In topic-specific pagerank, how can we insure that the irreducible property is satified? (HINT: see HW9.4)

PageRank is an algorithm for scoring web pages based primarily on the number of links to each page. The algorithm is used by Google and others to deliver search results in response to a user query, attempting to ensure that the most relevant results are provided first. To represent the transition matrix of the graph as a Markov chain, the graph must be irreducible, meaning that here are no dangling nodes with no outlinks, and aperiodic, meaning that there are no closed cycles in the graph. Irreducibility is ensured by adding a random transition probability to each node, and aperiodicity is ensured by introducing a random transition probability of jumping to the same node. This probability $\alpha$ is controlled by the damping factor $$\alpha = 1 - d $$.

HW7.5.1:
Can we utilize combiners in the HW 7 to perform the shortest path implementation?

Does order inversion help with the HW 7 shortest path implementation?

Combiners can help by preselecting shortest paths within each mapper's output, reducing the network traffic and reducing the number of path comparisons each reducer must perform. The mappers provide two types of output -- new paths with bogus link lists for frontier nodes, and the graph stripes themselves. Order inversion could ensure that the new paths are received first; since the new paths are generally smaller, less data would need to be cached to update the graph stripes properly.


HW 9.1: MRJob implementation of basic PageRank

Write a basic MRJob implementation of the iterative PageRank algorithm that takes sparse adjacency lists as input (as explored in HW 7).
Make sure that your implementation utilizes teleportation (1-damping/the number of nodes in the network), and further, distributes the mass of dangling nodes with each iteration so that the output of each iteration is correctly normalized (sums to 1).
[NOTE: The PageRank algorithm assumes that a random surfer (walker), starting from a random web page, chooses the next page to which it will move by clicking at random, with probability d, one of the hyperlinks in the current page. This probability is represented by a so-called ‘damping factor’ d, where d ∈ (0, 1). Otherwise, with probability (1 − d), the surfer jumps to any web page in the network. If a page is a dangling end, meaning it has no outgoing hyperlinks, the random surfer selects an arbitrary web page from a uniform distribution and “teleports” to that page]

As you build your code, use the test data

s3://ucb-mids-mls-networks/PageRank-test.txt

with teleportation parameter set to 0.15 (1-d, where d, the damping factor is set to 0.85), and crosscheck your work with the true result, displayed in the first image in the Wikipedia article:

https://en.wikipedia.org/wiki/PageRank

and here for reference are the corresponding PageRank probabilities:

A,0.033
B,0.384
C,0.343
D,0.039
E,0.081
F,0.039
G,0.016
H,0.016
I,0.016
J,0.016
K,0.016


In [ ]:
%%writefile MRJob_NumberOfNodes.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from sys import maxint

class MRJobNumNodes(MRJob):
    

    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper = self.mapper)]
        return [MRStep(
                mapper = self.mapper,
                reducer = self.reducer
                ),
               MRStep(
                reducer = self.reducer_count
                )]
    
    def mapper(self, _, line):
        line = line.strip().split('\t')
        key = line[0]
        value = literal_eval(line[1])
        destnodes = value.keys()
        
        yield key, None
        
        for node in destnodes:
            yield node, None
           
    def reducer(self,node,_):
        yield None, node
        
    def reducer_count(self,_,nodes):
        count = 0
        for node in nodes:
            count+=1
        yield None, count

        
if __name__ == '__main__':
    MRJobNumNodes.run()

In [ ]:
%%writefile MRJob_PreprocessGraph.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from sys import maxint


class MRJobPreprocessGraph(MRJob):
    

    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper_init = self.mapper_init,
                           mapper = self.mapper)]
        return [MRStep(
                mapper_init = self.mapper_init,
                mapper = self.mapper,
                reducer = self.reducer
                )]
    
    def mapper_init(self):
        N = 11
        self.PR = 1.0/N
        
    
    def mapper(self, _, line):
        line = line.strip().split('\t')
        key = line[0]
        edges = literal_eval(line[1])
        value = [self.PR, edges]
        yield key, value
        
        for node in edges.keys():
            yield node, [self.PR]

           
    def reducer(self,key,value):
        PR = 0
        edges = {}
        for v in value:
            if len(v)==1:
                PR = v[0]
            else:
                PR = v[0]
                edges = v[1]
        yield key, [PR, edges]

    
    '''
    def mapper(self, _, line):
        N = 11
        line = line.strip().split('\t')
        key = line[0]
        edges = literal_eval(line[1])
        PR = 1.0/N
        value = [PR, edges]
        yield None,{key:value}

           
    def reducer(self,_,data):
        for line in data:
            key = line.keys()[0]
            value = line[key]
            yield key, value
    '''
        
if __name__ == '__main__':
    MRJobPreprocessGraph.run()

In [101]:
!python MRJob_PreprocessGraph.py PageRank-test.txt > PageRank-test_input2.txt


using configs in /Users/davidadams/.mrjob.conf
creating tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151105.171519.674233

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151105.171519.674233/step-0-mapper_part-00000
Counters from step 1:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151105.171519.674233/step-0-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151105.171519.674233/step-0-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151105.171519.674233/step-0-reducer_part-00000
Counters from step 1:
  (no counters found)
Moving /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151105.171519.674233/step-0-reducer_part-00000 -> /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151105.171519.674233/output/part-00000
Streaming final output from /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151105.171519.674233/output
removing tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151105.171519.674233

In [102]:
!cat PageRank-test_input2.txt


"A"	[0.09090909090909091, {}]
"B"	[0.09090909090909091, {"C": 1}]
"C"	[0.09090909090909091, {"B": 1}]
"D"	[0.09090909090909091, {"A": 1, "B": 1}]
"E"	[0.09090909090909091, {"B": 1, "D": 1, "F": 1}]
"F"	[0.09090909090909091, {"B": 1, "E": 1}]
"G"	[0.09090909090909091, {"B": 1, "E": 1}]
"H"	[0.09090909090909091, {"B": 1, "E": 1}]
"I"	[0.09090909090909091, {"B": 1, "E": 1}]
"J"	[0.09090909090909091, {"E": 1}]
"K"	[0.09090909090909091, {"E": 1}]

In [43]:
%%writefile MRJob_PageRank.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from mrjob.compat import jobconf_from_env
import sys

class MRJobPageRank(MRJob):
    

    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper = self.mapper)]
        
        
        return [MRStep(
                mapper = self.mapper_firstiter)]+\
                [MRStep(
                mapper = self.mapper,
                combiner = self.combiner,
                reducer = self.reducer)]*10
    
    def mapper_firstiter(self, _, line):
        line = line.strip().split('\t')
        key = line[0]
        value = literal_eval(line[1])

        PR = value[0]
        edges = value[1]
        
        yield key, [None,edges]
        
        destnodes = edges.keys()
        outdeg = len(destnodes)
        
        if outdeg==0:
            yield '*m',[PR]
        
        for node in destnodes:
            yield node, [1.0*PR/outdeg]
            

    def mapper(self, key, value):

        sys.stderr.write("MAPPER INPUT: ({0}, {1})\n".format(key,value))
        #sys.stderr.write("MAPPER")
        
        PR = value[0]
        edges = value[1]
        
        yield key, [None,edges]
        
        destnodes = edges.keys()
        outdeg = len(destnodes)
        
        if outdeg==0:
            yield '*m',[PR]
        
        for node in destnodes:
            yield node, [1.0*PR/outdeg]
            
            
    def combiner(self,node,data):
        PR = 0
        edges = {}
        for value in data:
            if value[0]==None:
                yield node, value
            else:
                PR+=value[0]
        yield node, [PR]
    
    
    def reducer(self,node,data):
        
        #sys.stderr.write("REDUCER NODE: {0}\n".format(node))
        
        G = int(jobconf_from_env('G'))
        alpha = float(jobconf_from_env('alpha'))
        
        PR = 0
        edges = {}
        
        for value in data:
            #yield node, value
            if value[0]==None:
                edges = value[1]
                
            else:
                PR+=value[0]
        
        if node=='*m':
            self.m = PR
        
        #sys.stderr.write("REDUCER PR: {0}\n".format(PR))
        
        else:
            PR_adj = alpha*(1.0/G)+(1-alpha)*(1.0*self.m/G+PR) 
            #yield node, PR
            #yield node, PR_adj
            
            #line = node+'\t'+str([PR_adj, edges])
            #yield line, ''
            
            yield node, [PR_adj, edges]

   
if __name__ == '__main__':
    MRJobPageRank.run()


Overwriting MRJob_PageRank.py

In [19]:
%load_ext autoreload
%autoreload 2

from MRJob_NumberOfNodes import MRJobNumNodes
from MRJob_PreprocessGraph import MRJobPreprocessGraph
from MRJob_PageRank import MRJobPageRank

def numNodes(graphfilename):
    #mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
    mr_job_numnodes = MRJobNumNodes(args=[graphfilename])

    with mr_job_numnodes.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            null,count =  mr_job_numnodes.parse_output_line(line)
            print "There are "+str(count)+" nodes in the graph."
                    
    return None

def preprocessGraph(graphfilename):
    #mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
    mr_job_preprocess = MRJobPreprocessGraph(args=[graphfilename])
    outputfilename = graphfilename.split('.')[0]+'_input.txt'
    with mr_job_preprocess.make_runner() as runner:
        with open(outputfilename, 'w') as f:
            runner.run()
            for line in runner.stream_output():
                node,value =  mr_job_preprocess.parse_output_line(line)
                f.write(node+'\t'+str(value)+'\n')
                    
    return None


def pagerank(graphinputfilename):
    #mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
    mr_job_pr1 = MRJobPageRank(args=[graphinputfilename,'--jobconf','alpha=0.15','--jobconf','G=11'])
    danglingmass= 0
    with mr_job_pr1.make_runner() as runner:
        runner.run()
        with open(graphinputfilename, 'w+') as f:
            for line in runner.stream_output():
                node,value =  mr_job_pr1.parse_output_line(line)
                f.write(node+'\t'+str(value)+'\n')
                print line

    return None




def hw9_1():

    graphfilename = "PageRank-test.txt"
    #numNodes(graphfilename)
    
    preprocessGraph(graphfilename)

    graphinputfilename = graphfilename.split('.')[0]+'_input.txt'
    
    #pagerank_step1(graphinputfilename, alpha, G)
    
    #pagerank_step1(graphinputfilename, alpha, G)
    
    for i in range(1):
        print "Iteration",i
        pagerank(graphinputfilename)
    
    
    
    
    return None
        
  

hw9_1()


WARNING:mrjob.runner:
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols
WARNING:mrjob.runner:
WARNING:mrjob.runner:
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols
WARNING:mrjob.runner:
Iteration 0
"A"	[0.059297520661157024, {}]

"B"	[0.3168732782369146, {"C": 1}]

"C"	[0.09793388429752066, {"B": 1}]

"D"	[0.046418732782369146, {"A": 1, "B": 1}]

"E"	[0.32975206611570246, {"B": 1, "D": 1, "F": 1}]

"F"	[0.046418732782369146, {"B": 1, "E": 1}]

"G"	[0.02066115702479339, {"B": 1, "E": 1}]

"H"	[0.02066115702479339, {"B": 1, "E": 1}]

"I"	[0.02066115702479339, {"B": 1, "E": 1}]

"J"	[0.02066115702479339, {"E": 1}]

"K"	[0.02066115702479339, {"E": 1}]

HW 9.2: Exploring PageRank teleportation and network plots

In order to overcome problems such as disconnected components, the damping factor (a typical value for d is 0.85) can be varied.
Using the graph in HW1, plot the test graph (using networkx, https://networkx.github.io/) for several values of the damping parameter alpha, so that each nodes radius is proportional to its PageRank score. In particular you should do this for the following damping factors: [0,0.25,0.5,0.75, 0.85, 1].
Note your plots should look like the following:

https://en.wikipedia.org/wiki/PageRank#/media/File:PageRanks-Example.svg

PageRank is an algorithm for scoring web pages based primarily on the number of links to each page. The algorithm is used by Google and others to deliver search results in response to a user query, attempting to ensure that the most relevant results are provided first. To represent the transition matrix of the graph as a Markov chain, the graph must be irreducible, meaning that here are no dangling nodes with no outlinks, and aperiodic, meaning that there are no closed cycles in the graph. Irreducibility is ensured by adding a random transition probability to each node, and aperiodicity is ensured by introducing a random transition probability of jumping to the same node. This probability $\alpha$ is controlled by the damping factor $$\alpha = 1 - d $$.

Draw the graph using networkx with the node size proportional to the rank. Use a circular layout so no nodes are blocked.


In [97]:
import networkx as nx
from ast import literal_eval
G = nx.DiGraph()
with open('PageRank-test.txt', 'r') as f:
    for line in f:
        node, edgestr = line.strip().split('\t')
        G.add_node(node)
        edges = literal_eval(edgestr)
        for key in edges.keys():
            G.add_edge(node, key)
%matplotlib inline
pr = nx.pagerank(G, alpha=0.0)
radii = list()
for key in pr.keys():
    radii.append(pr[key]*10000)
nx.draw_circular(G, with_labels=True, withArrows=True, node_size=radii)


{'A': 0.09090909090909091, 'C': 0.09090909090909091, 'B': 0.09090909090909091, 'E': 0.09090909090909091, 'D': 0.09090909090909091, 'G': 0.09090909090909091, 'F': 0.09090909090909091, 'I': 0.09090909090909091, 'H': 0.09090909090909091, 'K': 0.09090909090909091, 'J': 0.09090909090909091}

With no damping, all pages are equally likely


In [98]:
pr = nx.pagerank(G, alpha=0.25)
radii = list()
for key in pr.keys():
    radii.append(pr[key]*10000)
nx.draw_circular(G, with_labels=True, withArrows=True, node_size=radii)



In [99]:
pr = nx.pagerank(G, alpha=0.5)
radii = list()
for key in pr.keys():
    radii.append(pr[key]*10000)
nx.draw_circular(G, with_labels=True, withArrows=True, node_size=radii)


With more damping, B and C become more important


In [100]:
pr = nx.pagerank(G, alpha=0.75)
radii = list()
for key in pr.keys():
    radii.append(pr[key]*10000)
nx.draw_circular(G, with_labels=True, withArrows=True, node_size=radii)



In [120]:
pr = nx.pagerank(G, alpha=0.99, max_iter=5000)
radii = list()
for key in pr.keys():
    radii.append(pr[key]*10000)
nx.draw_circular(G, with_labels=True, withArrows=True, node_size=radii)


For alpha = 1.0, pagerank fails to converge in 1000000 iterations

HW 9.3: Applying PageRank to the Wikipedia hyperlinks network

Run your PageRank implementation on the Wikipedia dataset for 10 iterations, and display the top 100 ranked nodes (with alpha = 0.85).

Run your PageRank implementation on the Wikipedia dataset for 50 iterations, and display the top 100 ranked nodes (with teleportation factor of 0.15). Have the top 100 ranked pages changed? Comment on your findings. Plot both 100 curves.


In [ ]:
%%writefile mrPageRank2i.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.protocol import RawProtocol
from mrjob.step import MRStep
import re
from itertools import combinations
from ast import literal_eval
import sys

class mrPageRank2i(MRJob):
    #INPUT_PROTOCOL = JSONProtocol
    # INTERNAL_PROTOCOL and OUTPUT_PROTOCOL default to JSONProtocol
    #OUTPUT_PROTOCOL = RawProtocol
    def configure_options(self):
        super(mrPageRank2i, self).configure_options()
        
        self.add_passthrough_option(
                '--iterations',
                dest = 'iterations',
                default = 10,
                type='int',
                help='number of iterations to run')
         
        self.add_passthrough_option(
                '--damping-factor',
                dest = 'damping_factor',
                default = 0.85,
                type='float',
                help='probability that user will continue clicking links')
        self.add_passthrough_option(
                '--nodecount',
                dest = 'nodecount',
                default = 11.0,
                type='float',
                help='total number of nodes in graph')


       
    def steps(self):
        return ([MRStep(
                mapper = self.send_score, 
                reducer = self.receive_score
            )] * self.options.iterations)

    # Raw input protocol so key is not provided
    def send_score(self, _, line):
        #sys.stderr.write("MAPPER INPUT: ({0})\n".format(line))
        N = self.options.nodecount
        if type(line) != dict:
            fields = line.strip().split('\t')
            nodeID = fields[0]
            nodeDict = literal_eval(fields[1])
            if 'score' not in nodeDict.keys():
                #first time through
                node = {}
                node['score'] = 1.0/N
                node['links'] = nodeDict
                node['id'] = nodeID
            else:
                node = nodeDict
        else:
            node = line
            nodeID = node['id']
        yield nodeID, node
        if len(node['links']) > 0:
            rank = node['score'] / float(len(node['links']))
            for link in node['links'].keys():
                yield link, rank
        else:
            # dangling mass
            for link in range(1, int(N)):
                yield str(link), node['score']/N
        #assert rank <= 1.0
            
    def receive_score(self, node_id, values):
        myNodeID = node_id
        myValues = list(values)
        #sys.stderr.write("REDUCER INPUT: ({0},{1})\n".format(myNodeID, myValues))
        node = {}
        node['score'] = 0.0
        node['links'] = {}
        node['id'] = myNodeID
        total_score = 0.0
        count = 0
        for p in myValues:
            #sys.stderr.write("REDUCER VALUE: {0}\n".format(p))
            try:
                total_score += float(p)
            except TypeError:
                node = p

        node['prev_score'] = node['score']
        d = self.options.damping_factor
        N = self.options.nodecount

        node['score'] = (1.0 - d)/N + d*total_score
        #sys.stderr.write("REDUCER OUTPUT: {0} -> {1}\n".format(node['id'], node['score']))
        #assert node['score'] <= 1.0
        yield node['id'], node
        
if __name__ == '__main__':
    mrPageRank2i.run()

In [ ]:
!chmod a+x mrPageRank2i.py

In [ ]:
!./mrPageRank2i.py s3://ucb-mids-mls-networks/PageRank-test_indexed.txt --iterations=10 -r emr \
    --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputTest --no-output

In [ ]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/outputTest HW9_3Test/ --recursive

In [ ]:
!cat HW9_3Test/part* > test-graph.txt

In [ ]:
prdict={}
with open('test-graph.txt', 'r') as f:
    for line in f:
        fields = line.strip().split('\t')
        prdict[fields[0]] = literal_eval(fields[1])['score']
for key in sorted(prdict, key=prdict.get, reverse=True):
    print "{0}\t{1}".format(key, prdict[key])

In [ ]:
!./mrPageRank2i.py s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt --iterations=10 --nodecount=15192277 -r emr \
    --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW10i --no-output

In [ ]:
!./mrPageRank2i.py s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt --iterations=50 --nodecount=15192277 -r emr \
    --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW50i --no-output

In [ ]:
%%writefile MRtop100.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from mrjob.step import MRStep
import re
from itertools import combinations
from ast import literal_eval
import sys
import string

class MRtop100(MRJob):
    OUTPUT_PROTOCOL = JSONProtocol
    outputList = list()

    def jobconf(self):
        orig_jobconf = super(MRtop100, self).jobconf()        
        custom_jobconf = {
            'mapred.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
            'mapred.text.key.comparator.options': '-k1rn',
            'mapred.reduce.tasks': '1',
        }
        combined_jobconf = orig_jobconf
        combined_jobconf.update(custom_jobconf)
        self.jobconf = combined_jobconf
        return combined_jobconf

    def steps(self):
        return [MRStep(
                mapper = self.mapper, 
                mapper_final = self.mapper_final,
                reducer = self.reducer
            )]

    def mapper(self, key, line):
        # get the line, which consists of an id and a dict separated by a tab
        #sys.stderr.write("MAPPER INPUT: ({0},{1})\n".format(key,line))
        fields = line.strip().split('\t')
        nodeid = fields[0]
        nodeDict = literal_eval(fields[1])
        self.outputList.append((nodeid, nodeDict['score']))
        self.outputList.sort(key=lambda tup: tup[1], reverse=True)
        self.outputList = self.outputList[:100]
    def mapper_final(self):
        for k in range(100):
            yield self.outputList[k][1], self.outputList[k][0]
            
    def reducer(self, score, nodeids):
        nodeList = list(nodeids)
        myScore = score
        #sys.stderr.write("REDUCER INPUT: ({0},{1})\n".format(myScore, nodeList))
        count = 0
        myOutputList = list()
        for node in nodeList:
            myOutputList.append((node, myScore))
            count += 1
        
        myOutputList.sort(key=lambda tup: tup[1], reverse=True)
        myOutputList = myOutputList[:100]
        for tup in myOutputList:
            nodeid = tup[0].translate(string.maketrans("",""), string.punctuation)
            yield nodeid, tup[1]
if __name__ == '__main__':
    MRtop100.run()

In [ ]:
!./MRtop100.py s3://ucb-mids-mls-dougkelley/HW9_3/outputW50i/ -r emr --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW50i_top100 --no-output

In [ ]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/outputW50i_top100 W50i_100/ --recursive

Using earlier implementation without correct treatment of dangling mass, the scores for the 10 iteration and 50 iteration runs are nearly the same.


In [ ]:
%matplotlib inline
from matplotlib import pyplot as plt
scores50=list()
with open('W50_100/part-00000', 'r') as f:
    for line in f:
        score=float(line.strip().split('\t')[1])
        scores50.append(score)
scores10=list()
with open('W10_100/part-00000', 'r') as f:
    for line in f:
        score=float(line.strip().split('\t')[1])
        scores10.append(score)
plt.semilogy(range(100),scores50,range(100),scores10)
plt.grid()

In [ ]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/outputW10i_top100 W10_100/ --recursive

In [ ]:
hitcount=list()
with open('W10_100/part-00000','r') as f10, open('W50_100/part-00000','r') as f50:
    for line10 in f10:
        nid10 = line10.strip().split('\t')[0]
        nid50 = f50.readline().strip().split('\t')[0]
        hitcount.append(nid10 == nid50)
print sum(hitcount)
plt.plot(hitcount)

In [ ]:
from sets import Set
set10=Set()
set50=Set()
with open('W10_100/part-00000','r') as f10:
    for line in f10:
        set10.add(line.strip().split('\t')[0])
with open('W50_100/part-00000','r') as f50:
    for line in f50:
        set50.add(line.strip().split('\t')[0])
print len(set10.intersection(set50))

for item in set10:
    if item not in set50:
        print "Unique item: {0}\n".format(item)

In [ ]:
list10=list()
list50=list()
updates=list()
with open('W10_100/part-00000','r') as f10:
    for line in f10:
        list10.append(line.strip().split('\t')[0])
with open('W50_100/part-00000','r') as f50:
    for line in f50:
        list50.append(line.strip().split('\t')[0])
for item in list10:
    updates.append(list50.index(item))

In [ ]:
print updates

In [ ]:
fig,ax = plt.subplots()
fig.set_size_inches(3.0, 10.0)
plt.xticks([10.0, 50.0])
plt.xlim([5, 55])
plt.ylabel('PageRank')
plt.xlabel('Iterations')
for k in range(100):
    ax.plot([10,50],[100-k,100-updates[k]], '-o', markersize=5)

In [ ]:
!./MRtop100.py s3://ucb-mids-mls-katieadams/output-wiki-10x/ -r emr --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW10d_top100 --no-output

In [ ]:
%%writefile MRtop100k.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from mrjob.step import MRStep
import re
from itertools import combinations
from ast import literal_eval
import sys
import string
from numpy import log

class MRtop100k(MRJob):
    OUTPUT_PROTOCOL = JSONProtocol
    outputList = list()

    def jobconf(self):
        orig_jobconf = super(MRtop100k, self).jobconf()        
        custom_jobconf = {
            'mapred.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
            'mapred.text.key.comparator.options': '-k1rn',
            'mapred.reduce.tasks': '1',
        }
        combined_jobconf = orig_jobconf
        combined_jobconf.update(custom_jobconf)
        self.jobconf = combined_jobconf
        return combined_jobconf

    def steps(self):
        return [MRStep(
                mapper = self.mapper, 
                mapper_final = self.mapper_final,
                reducer = self.reducer
            )]

    def mapper(self, key, line):
        # get the line, which consists of an id and a dict separated by a tab
        #sys.stderr.write("MAPPER INPUT: ({0},{1})\n".format(key,line))
        fields = line.strip().split('\t')
        nodeid = fields[0]
        nodeList = literal_eval(fields[1])
        self.outputList.append((nodeid, float(nodeList[0])))
        self.outputList.sort(key=lambda tup: tup[1], reverse=True)
        self.outputList = self.outputList[:100]
    def mapper_final(self):
        for k in range(100):
            yield log(self.outputList[k][1]),(self.outputList[k][0], self.outputList[k][1])
            
    def reducer(self, score, nodeTuples):
        nodeList = list(nodeTuples)
        myScore = score
        #sys.stderr.write("REDUCER INPUT: ({0},{1})\n".format(myScore, nodeList))
        myOutputList = list()
        for nodeTuple in nodeList:
            node = nodeTuple[0]
            myScore = float(nodeTuple[1])
            myOutputList.append((node, myScore))
        
        myOutputList.sort(key=lambda tup: tup[1], reverse=True)
        for tup in myOutputList[:100]:
            nodeid = tup[0].translate(string.maketrans("",""), string.punctuation)
            yield nodeid, tup[1]
if __name__ == '__main__':
    MRtop100k.run()

In [ ]:
!chmod a+x ./MRtop100k.py

In [ ]:
!./MRtop100k.py part0-1000.txt

In [ ]:
!./MRtop100k.py s3://ucb-mids-mls-katieadams/output-wiki-10x/ -r emr --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW10d_top100 --no-output

In [ ]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/outputW10d_top100 W10k_100/ --recursive

In [ ]:
!wc -l W10k_100/part-00000

In [ ]:
!./MRtop100k.py s3://ucb-mids-mls-katieadams/output-wiki-50x/ -r emr --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/outputW50d_top100 --no-output

In [ ]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/outputW50d_top100 W50k_100/ --recursive

In [ ]:
cat W50k_100/part-00000

In [ ]:
from sets import Set
set10=Set()
set50=Set()
with open('top100_10x.txt','r') as f10:
    for line in f10:
        set10.add(line.strip().split('\t')[0])
with open('top100_50x.txt','r') as f50:
    for line in f50:
        set50.add(line.strip().split('\t')[0])
print len(set10.intersection(set50))

for item in set10:
    if item not in set50:
        print "Unique item: {0}\n".format(item)

In [1]:
%matplotlib inline
from matplotlib import pyplot as plt
scores10x=list()
scores50x=list()
from numpy import arange
with open('top100_10x.txt', 'r') as f:
    for line in f:
        score=float(line.strip().split('\t')[1])
        scores10x.append(score)
with open('top100_50x.txt', 'r') as f:
    for line in f:
        score=float(line.strip().split('\t')[1])
        scores50x.append(score)

plt.semilogy(arange(100),scores10x,arange(100),scores50x)
plt.grid()



In [17]:
%%writefile matchIndex.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol, JSONValueProtocol
from mrjob.step import MRStep
import re
from itertools import combinations
from ast import literal_eval
import sys

class matchIndex(MRJob):
    OUTPUT_PROTOCOL = RawValueProtocol
    nodes = []
    def steps(self):
        return [MRStep(
                mapper_init = self.mapper_init,
                mapper = self.mapper, 
                reducer = self.reducer
            )]
    
    def mapper_init(self):
        for line in open('top100.txt'):
            self.nodes.append(line.strip().split('\t')[0]) 
            
    #mapper emits node, dictionary for stripe
    def mapper(self, key, line):
        #sys.stderr.write("MAPPER INPUT: ({0},{1})\n".format(key,line))
        fields = line.strip().split('\t')
        node=str(literal_eval(fields[1]))
        if node in self.nodes:
            yield fields[1], (fields[0], fields[2], fields[3])
    #reducer emits key and the rest of the data         
    def reducer(self, key, stripes):
        stripeList = list(stripes)
        myKey = key
        #sys.stderr.write("REDUCER INPUT: ({0},{1})\n".format(myKey, stripeList))
        for stripe in stripeList:
            yield myKey, (myKey, stripe)
if __name__ == '__main__':
    matchIndex.run()


Overwriting matchIndex.py

In [3]:
!chmod a+x ./matchIndex.py

In [14]:
!./matchIndex.py ~/Downloads/indices.txt --file=top100.txt


using configs in /Users/doug/.mrjob.conf
creating tmp directory /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.031446.462766
writing to /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.031446.462766/step-0-mapper_part-00000
Counters from step 1:
  (no counters found)
writing to /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.031446.462766/step-0-mapper-sorted
> sort /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.031446.462766/step-0-mapper_part-00000
writing to /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.031446.462766/step-0-reducer_part-00000
Counters from step 1:
  (no counters found)
Moving /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.031446.462766/step-0-reducer_part-00000 -> /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.031446.462766/output/part-00000
Streaming final output from /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.031446.462766/output
removing tmp directory /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.031446.462766

In [22]:
!./matchIndex.py s3://ucb-mids-mls-networks/wikipedia/indices.txt -r emr --output-dir=s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/ --no-output --file=top100.txt


using configs in /Users/doug/.mrjob.conf
inferring aws_region from scratch bucket's region (us-west-1)
creating tmp directory /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.032720.181007
writing master bootstrap script to /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.032720.181007/b.py
Copying non-input files into s3://ucb-mids-mls-dougkelley/tmp/matchIndex.doug.20151108.032720.181007/files/
Waiting 5.0s for S3 eventual consistency
Creating Elastic MapReduce job flow
Job flow created with ID: j-A7GLS495TC9J
Created new job flow j-A7GLS495TC9J
Job launched 30.5s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 61.3s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 91.8s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 122.7s ago, status STARTING: Configuring cluster software
Job launched 153.0s ago, status STARTING: Configuring cluster software
Job launched 184.0s ago, status STARTING: Configuring cluster software
Job launched 214.3s ago, status BOOTSTRAPPING: Running bootstrap actions
Job launched 244.7s ago, status RUNNING: Running step (matchIndex.doug.20151108.032720.181007: Step 1 of 1)
Job launched 275.1s ago, status RUNNING: Running step (matchIndex.doug.20151108.032720.181007: Step 1 of 1)
Job launched 305.9s ago, status RUNNING: Running step (matchIndex.doug.20151108.032720.181007: Step 1 of 1)
Job launched 336.3s ago, status RUNNING: Running step (matchIndex.doug.20151108.032720.181007: Step 1 of 1)
Job launched 367.2s ago, status RUNNING: Running step (matchIndex.doug.20151108.032720.181007: Step 1 of 1)
Job completed.
Running time was 136.0s (not counting time spent waiting for the EC2 instances)
ec2_key_pair_file not specified, going to S3
Fetching counters from S3...
Waiting 5.0s for S3 eventual consistency
Counters from step 1:
  File Input Format Counters :
    Bytes Read: 517513438
  File Output Format Counters :
    Bytes Written: 0
  FileSystemCounters:
    FILE_BYTES_READ: 280
    FILE_BYTES_WRITTEN: 1259437
    HDFS_BYTES_READ: 3232
    S3_BYTES_READ: 517513438
  Job Counters :
    Launched map tasks: 34
    Launched reduce tasks: 20
    Rack-local map tasks: 34
    SLOTS_MILLIS_MAPS: 1102573
    SLOTS_MILLIS_REDUCES: 447082
    Total time spent by all maps waiting after reserving slots (ms): 0
    Total time spent by all reduces waiting after reserving slots (ms): 0
  Map-Reduce Framework:
    CPU time spent (ms): 351680
    Combine input records: 0
    Combine output records: 0
    Map input bytes: 517438296
    Map input records: 15192277
    Map output bytes: 0
    Map output materialized bytes: 7168
    Map output records: 0
    Physical memory (bytes) snapshot: 11927113728
    Reduce input groups: 0
    Reduce input records: 0
    Reduce output records: 0
    Reduce shuffle bytes: 7168
    SPLIT_RAW_BYTES: 3232
    Spilled Records: 0
    Total committed heap usage (bytes): 10549329920
    Virtual memory (bytes) snapshot: 43876581376
removing tmp directory /var/folders/pc/lb7gplsj0gxfz9k1q0_3lp0m0000gn/T/matchIndex.doug.20151108.032720.181007
Removing all files in s3://ucb-mids-mls-dougkelley/tmp/matchIndex.doug.20151108.032720.181007/
Removing all files in s3://ucb-mids-mls-dougkelley/tmp/logs/j-A7GLS495TC9J/
Terminating job flow: j-A7GLS495TC9J

In [16]:
!aws s3 cp s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/ 10xMatch/ --recursive


download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00001 to 10xMatch/part-00001
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00003 to 10xMatch/part-00003
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00008 to 10xMatch/part-00008
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00009 to 10xMatch/part-00009
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/_SUCCESS to 10xMatch/_SUCCESS
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00010 to 10xMatch/part-00010
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00011 to 10xMatch/part-00011
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00012 to 10xMatch/part-00012
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00002 to 10xMatch/part-00002
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00013 to 10xMatch/part-00013
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00005 to 10xMatch/part-00005
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00007 to 10xMatch/part-00007
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00000 to 10xMatch/part-00000
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00004 to 10xMatch/part-00004
download: s3://ucb-mids-mls-dougkelley/HW9_3/indexMatch10x/part-00006 to 10xMatch/part-00006

In [42]:
%reload_ext autoreload 
%autoreload 2

In [43]:
from ast import literal_eval
nodes10x=[]
nodes50x=[]
scores10x=[]
scores50x=[]
with open('top100_10x.txt','r') as f1, open('top100_50x.txt','r') as f2:
    for line in f1:
        fields = line.strip().split('\t')
        node = literal_eval(fields[0])
        score = literal_eval(fields[1])
        nodes10x.append(node)
        scores10x.append(score)
    for line in f2:
        fields = line.strip().split('\t')
        node = literal_eval(fields[0])
        score = literal_eval(fields[1])
        nodes50x.append(node)
        scores50x.append(score)
links10x=[]
links50x=[]
with open('/Users/doug/Downloads/indices.txt','r') as f:
    for line in f:
        fields=line.strip().split('\t')
        node = str(literal_eval(fields[1]))
        if node in nodes10x:
            links10x.append(fields)
        if node in nodes50x:
            links50x.append(fields)

In [44]:
links10x[1]


Out[44]:
['American football', '1062600', '42597', '425']

In [45]:
links50x[1]


Out[45]:
['Allmusic', '994890', '102160', '1']

In [26]:
links10x[0]


Out[26]:
['Allmusic', '994890', '102160', '1']

In [27]:
nodes10x[0]


Out[27]:
'13455888'

In [47]:
olinks10x=[]
olinks50x=[]
for k, node in enumerate(nodes10x):
    olinks10x.append((list(i for i in links10x if i[1] == node),scores10x[k]))
for k, node in enumerate(nodes50x):
    olinks50x.append((list(i for i in links50x if i[1] == node), scores50x[k]))

In [48]:
olinks50x[:10]


Out[48]:
[([['United States', '13455888', '555979', '1448']], 0.0014615599818941945),
 ([['Animal', '1184351', '182279', '382']], 0.000666017793720564),
 ([['France', '4695850', '184797', '1799']], 0.0006396773758301579),
 ([['Germany', '5051368', '167564', '1418']], 0.0005747671983901166),
 ([['Arthropod', '1384888', '127962', '569']], 0.0004501232222762257),
 ([['Canada', '2437837', '167394', '772']], 0.0004466700517594442),
 ([['Insect', '6113490', '121125', '732']], 0.00044463224410248376),
 ([['List of sovereign states', '7902219', '32924', '381']],
  0.0004438786997347336),
 ([['United Kingdom', '13425865', '136922', '1466']], 0.00043314218180855695),
 ([['India', '6076759', '154884', '1119']], 0.0004277077677812398)]

In [49]:
olinks10x[:10]


Out[49]:
[([['United States', '13455888', '555979', '1448']], 0.00021857674698305465),
 ([['Animal', '1184351', '182279', '382']], 0.0001522441888351541),
 ([['Arthropod', '1384888', '127962', '569']], 0.00011784673445790709),
 ([['Insect', '6113490', '121125', '732']], 0.00011390425173221594),
 ([['France', '4695850', '184797', '1799']], 0.00010755262105576216),
 ([['Germany', '5051368', '167564', '1418']], 9.611569908240295e-05),
 ([['Lepidoptera', '7576704', '86712', '694']], 8.131629722308353e-05),
 ([['India', '6076759', '154884', '1119']], 7.989780973559494e-05),
 ([['National Register of Historic Places', '9276255', '64498', '180']],
  7.430394469619288e-05),
 ([['List of countries', '7835160', '114643', '1']], 7.330773320314969e-05)]

In [64]:
len(olinks10x)


Out[64]:
100

In [112]:
pr10xNames=[]
pr50xNames=[]
pr10xScores=[]
pr50xScores=[]
pr10xinlinks=[]
pr50xinlinks=[]
pr10xoutlinks=[]
pr50xoutlinks=[]

for k in range(len(olinks10x)):
    pr10xNames.append(olinks10x[k][0][0][0])
    pr50xNames.append(olinks50x[k][0][0][0])
    pr10xScores.append(olinks10x[k][1])
    pr50xScores.append(olinks50x[k][1])
    pr10xinlinks.append(float(olinks10x[k][0][0][2]))
    pr50xinlinks.append(float(olinks50x[k][0][0][2]))
    pr10xoutlinks.append(float(olinks10x[k][0][0][3]))
    pr50xoutlinks.append(float(olinks50x[k][0][0][3]))

In [107]:
from numpy import arange
plt.semilogy(arange(100),pr10xScores,arange(100),pr50xScores)
plt.grid()
plt.xlabel('Rank')
plt.ylabel('Pagerank')
plt.legend(('10 iterations', '50 iterations'))


Out[107]:
<matplotlib.legend.Legend at 0x10caf9090>

With more iterations, more mass is concentrated in the more highly ranked nodes -- the rich get richer. Pagerank drops off slowly after the initial few pages. In terms of Google's results, only the highest ranked nodes show up on teh first page of search results.


In [108]:
plt.semilogy(arange(100),pr10xinlinks,arange(100),pr50xinlinks)
plt.grid()
plt.xlabel('Rank')
plt.ylabel('Inlinks')
plt.legend(('10 iterations', '50 iterations'))


Out[108]:
<matplotlib.legend.Legend at 0x10cd32450>

The more highly ranked nodes have a high level of inbound links, but after the first few, the number varies quite a bit but generally drops off.


In [113]:
plt.semilogy(arange(100),pr10xoutlinks,arange(100),pr50xoutlinks)
plt.grid()
plt.xlabel('Rank')
plt.ylabel('Outlinks')
plt.legend(('10 iterations', '50 iterations'))


Out[113]:
<matplotlib.legend.Legend at 0x10d3e2a90>

There appears to be no dependence on rank for the number of outlinks from a page. Several high ranking nodes have no outlinks.


In [100]:
def print_full(x):
    pd.set_option('display.max_rows', len(x))
    print(x)
    pd.reset_option('display.max_rows')

In [105]:
pd.set_option('display.max_rows', 100)

In [117]:
df = pd.DataFrame({'Name 10x':pr10xNames, 'Score 10x':pr10xScores, 'Name 50x':pr50xNames, 'Score 50x':pr50xScores})

In [116]:
df


Out[116]:
Name10x Name50x Score10x Score50x
0 United States United States 0.000219 0.001462
1 Animal Animal 0.000152 0.000666
2 Arthropod France 0.000118 0.000640
3 Insect Germany 0.000114 0.000575
4 France Arthropod 0.000108 0.000450
5 Germany Canada 0.000096 0.000447
6 Lepidoptera Insect 0.000081 0.000445
7 India List of sovereign states 0.000080 0.000444
8 National Register of Historic Places United Kingdom 0.000074 0.000433
9 List of countries India 0.000073 0.000428
10 England England 0.000072 0.000423
11 Canada Iran 0.000070 0.000398
12 village World War II 0.000070 0.000385
13 moth Poland 0.000065 0.000363
14 Iran village 0.000064 0.000344
15 Countries of the world Countries of the world 0.000063 0.000338
16 Private Use Areas Japan 0.000060 0.000329
17 Japan Italy 0.000054 0.000329
18 Hangul List of countries 0.000053 0.000326
19 United Kingdom Australia 0.000052 0.000325
20 Italy Voivodeships of Poland 0.000052 0.000313
21 Australia National Register of Historic Places 0.000051 0.000310
22 Poland Lepidoptera 0.000051 0.000308
23 Voivodeships of Poland Powiat 0.000051 0.000303
24 Powiat Gmina 0.000050 0.000298
25 Departments of France The New York Times 0.000050 0.000286
26 Gmina London 0.000050 0.000284
27 Communes of France English language 0.000050 0.000269
28 Counties of Iran China 0.000048 0.000264
29 Provinces of Iran Russia 0.000048 0.000261
30 Rural Districts of Iran New York City 0.000048 0.000258
31 Bakhsh Departments of France 0.000048 0.000255
32 World War II Spain 0.000047 0.000251
33 Iran Standard Time Communes of France 0.000045 0.000249
34 Iran Daylight Time moth 0.000045 0.000245
35 Brazil Brazil 0.000042 0.000245
36 association football Association football 0.000042 0.000239
37 Russia association football 0.000041 0.000233
38 Association football California 0.000041 0.000221
39 Romanize Counties of Iran 0.000041 0.000215
40 Spain Provinces of Iran 0.000040 0.000215
41 London Central European Time 0.000038 0.000211
42 Romania Romania 0.000035 0.000211
43 List of sovereign states Bakhsh 0.000035 0.000207
44 English language Sweden 0.000035 0.000203
45 California Rural Districts of Iran 0.000034 0.000203
46 Central European Time Netherlands 0.000033 0.000197
47 China Private Use Areas 0.000033 0.000191
48 Allmusic World War I 0.000032 0.000191
49 Sweden New York 0.000031 0.000188
50 gene Central European Summer Time 0.000030 0.000188
51 New York City Mexico 0.000030 0.000187
52 Mexico Iran Standard Time 0.000030 0.000187
53 Central European Summer Time AllMusic 0.000029 0.000185
54 Angiosperms Iran Daylight Time 0.000029 0.000179
55 Netherlands Hangul 0.000028 0.000178
56 New Zealand Scotland 0.000026 0.000173
57 Scotland gene 0.000026 0.000169
58 Norway Soviet Union 0.000026 0.000168
59 Geographic Names Information System Norway 0.000026 0.000167
60 New York Allmusic 0.000025 0.000165
61 genus Paris 0.000025 0.000161
62 Turkey New Zealand 0.000024 0.000161
63 Political divisions of the United States Turkey 0.000024 0.000159
64 species Plant 0.000024 0.000158
65 Beetle Geographic Names Information System 0.000024 0.000155
66 protein Switzerland 0.000024 0.000155
67 South Africa Los Angeles 0.000024 0.000153
68 beetle Romanize 0.000022 0.000149
69 Animalia United States Census Bureau 0.000022 0.000148
70 World War I Europe 0.000022 0.000147
71 Eudicots Angiosperms 0.000022 0.000142
72 Switzerland South Africa 0.000022 0.000141
73 Eastern European Time census 0.000022 0.000139
74 The New York Times Flowering plant 0.000021 0.000138
75 Argentina Austria 0.000021 0.000136
76 Eastern European Summer Time protein 0.000021 0.000135
77 Paris U.S. state 0.000021 0.000135
78 Pakistan Argentina 0.000021 0.000131
79 Chordata Political divisions of the United States 0.000021 0.000130
80 Austria population density 0.000021 0.000130
81 Plantae Catholic Church 0.000021 0.000128
82 ground beetle Chordate 0.000020 0.000128
83 census BBC 0.000020 0.000127
84 family (biology) Belgium 0.000020 0.000127
85 football (soccer) Chicago 0.000019 0.000124
86 Midfielder Washington, D.C. 0.000019 0.000121
87 American football Pakistan 0.000018 0.000120
88 Australian rules football Finland 0.000018 0.000116
89 U.S. state The Guardian 0.000018 0.000115
90 Plant Latin 0.000018 0.000114
91 Soviet Union Ontario 0.000017 0.000114
92 tributary Czech Republic 0.000017 0.000114
93 Ontario Philippines 0.000017 0.000113
94 Azerbaijan Denmark 0.000017 0.000113
95 Finland Greece 0.000017 0.000113
96 Belgium genus 0.000017 0.000113
97 Member of Parliament football (soccer) 0.000017 0.000112
98 Polyphaga Hungary 0.000017 0.000112
99 Texas Eastern European Time 0.000017 0.000112

In [138]:
updates=[]
for item in pr10xNames:
    if item in pr50xNames:
        updates.append(pr50xNames.index(item))
    else:
        updates.append(100)
fig,ax = plt.subplots()
fig.set_size_inches(3.0, 10.0)
plt.xticks([10.0, 50.0])
plt.xlim([5, 55])
plt.ylabel('PageRank')
plt.xlabel('Iterations')
plt.title('From 10x')
for k in range(100):
    ax.plot([10,50],[100-k,100-updates[k]], '-o', markersize=5)



In [136]:
updates50=[]
for item in pr50xNames:
    if item in pr10xNames:
        updates50.append(pr10xNames.index(item))
    else:
        updates50.append(100)
fig,ax = plt.subplots()
fig.set_size_inches(3.0, 10.0)
plt.xticks([10.0, 50.0])
plt.xlim([5, 55])
plt.ylabel('PageRank')
plt.xlabel('Iterations')
plt.title('From 50x')
for k in range(100):
    ax.plot([10,50],[100-updates50[k],100-k], '-o', markersize=5)
print "Number of new items in 50x top 100 list: {0}".format(updates50.count(100))


Number of new items in 50x top 100 list: 18

HW 9.4: Topic-specific PageRank implementation using MRJob

Modify your PageRank implementation to produce a topic specific PageRank implementation, as described in:

http://www-cs-students.stanford.edu/~taherh/papers/topic-sensitive-pagerank.pdf

Note in this article that there is a special caveat to ensure that the transition matrix is irreducible.
This caveat lies in footnote 3 on page 3:

A minor caveat: to ensure that M is irreducible when p
contains any 0 entries, nodes not reachable from nonzero
nodes in p should be removed. In practice this is not problematic.

and must be adhered to for convergence to be guaranteed.

Run topic specific PageRank on the following randomly generated network of 100 nodes:
s3://ucb-mids-mls-networks/randNet.txt

which are organized into ten topics, as described in the file:
s3://ucb-mids-mls-networks/randNet_topics.txt

Since there are 10 topics, your result should be 11 PageRank vectors (one for the vanilla PageRank implementation in 9.1, and one for each topic with the topic specific implementation). Print out the top ten ranking nodes and their topics for each of the 11 versions, and comment on your result.
Assume a teleportation factor of 0.15 in all your analyses.


In [28]:
%%writefile MRJob_PreprocessGraph_topic.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from mrjob.compat import jobconf_from_env


class MRJobPreprocessGraphTopic(MRJob):
    

    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper_init = self.mapper_init,
                           mapper = self.mapper)]
        return [MRStep(
                mapper_init = self.mapper_init,
                mapper = self.mapper,
                reducer = self.reducer
                )]
    
    def mapper_init(self):
        self.topic_labels = {}
        topicIdxFilename = jobconf_from_env('topicIdxFilename')
        with open(topicIdxFilename, 'r') as f:
            for line in f.readlines():
                line = line.strip()
                p,t = line.split('\t')
                self.topic_labels[int(p)]=int(t)
                
        G = int(jobconf_from_env('G'))
        self.PR_init = 1.0/G
        
    
    def mapper(self, _, line):
        line = line.strip().split('\t')
        key = int(line[0])
        edges = literal_eval(line[1])
        value = {}
        value['PR']=[self.PR_init]*11
        value['edges']=edges.keys()
        value['topic']=self.topic_labels[key]
            
        yield key, value
        
        for node in edges.keys():
            key=int(node)
            value = {}
            value['PR']=[self.PR_init]*11
            value['edges']=None
            value['topic']=self.topic_labels[key]
            yield key, value

           
    def reducer(self,key,value):
        data = {}
        data['topic']=None
        data['PR']=None
        data['edges']=[]
        for v in value:
            if data['topic']==None:
                data['topic']=v['topic']
            if v['edges']==None:
                data['PR']=v['PR']
            else:
                data['PR']=v['PR']
                data['edges']=v['edges']
        yield int(key), data

    
        
if __name__ == '__main__':
    MRJobPreprocessGraphTopic.run()


Overwriting MRJob_PreprocessGraph_topic.py

Preprocess RandNet locally to include initialized page ranks and topics


In [29]:
!python MRJob_PreprocessGraph_topic.py randNet.txt --file 'randNet_topics.txt' --jobconf 'G=100' --jobconf 'topicIdxFilename=randNet_topics.txt'> randNet_input.txt


using configs in /Users/davidadams/.mrjob.conf
creating tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.210948.584583

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.210948.584583/step-0-mapper_part-00000
Counters from step 1:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.210948.584583/step-0-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.210948.584583/step-0-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.210948.584583/step-0-reducer_part-00000
Counters from step 1:
  (no counters found)
Moving /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.210948.584583/step-0-reducer_part-00000 -> /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.210948.584583/output/part-00000
Streaming final output from /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.210948.584583/output
removing tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.210948.584583

Test preprocessing on emr


In [31]:
!python MRJob_PreprocessGraph_topic.py randNet.txt --file 'randNet_topics.txt' --jobconf 'G=100' --jobconf 'topicIdxFilename=randNet_topics.txt' -r emr --num-ec2-instances 2 --ec2-task-instance-type m1.medium --output-dir s3://ucb-mids-mls-katieadams/output-wiki-preprocess-topics --no-output


using configs in /Users/davidadams/.mrjob.conf
creating tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.211319.807689
writing master bootstrap script to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.211319.807689/b.py

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

Copying non-input files into s3://ucb-mids-mls-katieadams/tmp/MRJob_PreprocessGraph_topic.davidadams.20151106.211319.807689/files/
Waiting 5.0s for S3 eventual consistency
Creating Elastic MapReduce job flow
Job flow created with ID: j-VP5CFD55XZ98
Created new job flow j-VP5CFD55XZ98
Job launched 31.1s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 62.2s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 93.3s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 124.5s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 155.6s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 186.6s ago, status STARTING: Configuring cluster software
Job launched 217.8s ago, status STARTING: Configuring cluster software
Job launched 248.8s ago, status BOOTSTRAPPING: Running bootstrap actions
Job launched 279.9s ago, status RUNNING: Running step
Job launched 311.1s ago, status RUNNING: Running step
Job launched 342.2s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151106.211319.807689: Step 1 of 1)
Job launched 373.2s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151106.211319.807689: Step 1 of 1)
Job launched 404.3s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151106.211319.807689: Step 1 of 1)
Job completed.
Running time was 113.0s (not counting time spent waiting for the EC2 instances)
ec2_key_pair_file not specified, going to S3
Fetching counters from S3...
Waiting 5.0s for S3 eventual consistency
Counters from step 1:
  File Input Format Counters :
    Bytes Read: 21735
  File Output Format Counters :
    Bytes Written: 15616
  FileSystemCounters:
    FILE_BYTES_READ: 10867
    FILE_BYTES_WRITTEN: 160006
    HDFS_BYTES_READ: 660
    S3_BYTES_READ: 21735
    S3_BYTES_WRITTEN: 15616
  Job Counters :
    Launched map tasks: 4
    Launched reduce tasks: 1
    Rack-local map tasks: 4
    SLOTS_MILLIS_MAPS: 76467
    SLOTS_MILLIS_REDUCES: 35966
    Total time spent by all maps waiting after reserving slots (ms): 0
    Total time spent by all reduces waiting after reserving slots (ms): 0
  Map-Reduce Framework:
    CPU time spent (ms): 4910
    Combine input records: 0
    Combine output records: 0
    Map input bytes: 8691
    Map input records: 100
    Map output bytes: 113303
    Map output materialized bytes: 12125
    Map output records: 1029
    Physical memory (bytes) snapshot: 863367168
    Reduce input groups: 100
    Reduce input records: 1029
    Reduce output records: 100
    Reduce shuffle bytes: 12125
    SPLIT_RAW_BYTES: 660
    Spilled Records: 2058
    Total committed heap usage (bytes): 606838784
    Virtual memory (bytes) snapshot: 3231440896
removing tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151106.211319.807689
Removing all files in s3://ucb-mids-mls-katieadams/tmp/MRJob_PreprocessGraph_topic.davidadams.20151106.211319.807689/
Removing all files in s3://ucb-mids-mls-katieadams/tmp/logs/j-VP5CFD55XZ98/
Terminating job flow: j-VP5CFD55XZ98

In [30]:
!head -1 randNet_input.txt


1	{"topic": 10, "PR": [0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01, 0.01], "edges": ["11", "27", "63", "46", "47", "35", "89", "5"]}

In [18]:
from collections import defaultdict
def numNodesByTopic(topicsIdxFile, topicsCountFile):
    numNodesTopic = defaultdict(int)
    with open('randNet_topics.txt', 'r') as f:
        for line in f.readlines():
            line = line.strip()
            p,t = line.split('\t')
            numNodesTopic[int(t)]+=1
            numNodesTopic['*']+=1
    with open('randNet_topicCount.txt','w') as f:
        for k,v in numNodesTopic.iteritems():
            line = str(k)+'\t'+str(v)+'\n'
            f.write(line)

            
topicsIdxFile, topicsCountFile
numNodesByTopic(topicsIdxFile, topicsCountFile)

In [21]:
!cat randNet_topicCount.txt


1	17
2	8
3	9
4	13
5	9
6	6
7	10
8	9
9	7
10	12
*	100

In [62]:
%%writefile MRJob_PageRank_Topic.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from mrjob.compat import jobconf_from_env
import sys


class MRJobPageRankTopic(MRJob):
    
    
    def configure_options(self):
        super(MRJobPageRankTopic, self).configure_options()
        
        self.add_passthrough_option(
                '--iterations',
                dest = 'iterations',
                default = 10,
                type='int',
                help='number of iterations to run')
    

    def steps(self):        
        
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper = self.mapper_firstiter)]
        
        return [MRStep(
                mapper = self.mapper_firstiter,
                reducer_init = self.reducer_init,
                reducer = self.reducer
                ),
               MRStep(
                reducer = self.reducer_redistr)]+\
                [MRStep(
                mapper = self.mapper,
                reducer_init = self.reducer_init,
                reducer = self.reducer
                ),
               MRStep(
                reducer = self.reducer_redistr)]*self.options.iterations
        
    
    def mapper_firstiter(self, _, line):
        
        line = line.strip().split('\t')
        key = line[0]
        value = literal_eval(line[1])
        
        graphvalue = {}
        graphvalue['PR']=None
        graphvalue['edges']=value['edges']
        graphvalue['topic']=value['topic']
        
        yield int(key), graphvalue
        
        destnodes = value['edges']
        outdeg = len(destnodes)
        
        sendvalue = {}
        sendvalue['PR']=[]
        sendvalue['topic']=None
        sendvalue['edges']=None
        
        if outdeg==0:
            sendvalue['PR']=value['PR']
            yield '*m',sendvalue
        else:
            
            for i in range(len(value['PR'])):
                PRi = value['PR'][i]
                sendvalue['PR'].append(1.0*PRi/outdeg)
            for node in destnodes:
                yield int(node), sendvalue
            

    def mapper(self, key, value):
       
        graphvalue = {}
        graphvalue['PR']=None
        graphvalue['edges']=value['edges']
        graphvalue['topic']=value['topic']
        
        yield int(key), graphvalue
        
        destnodes = value['edges']
        outdeg = len(destnodes)
        
        sendvalue = {}
        sendvalue['PR']=[]
        sendvalue['topic']=None
        sendvalue['edges']=None
        
        if outdeg==0:
            sendvalue['PR']=value['PR']
            yield '*m',sendvalue
        else:
            for i in range(len(value['PR'])):
                PRi = value['PR'][i]
                sendvalue['PR'].append(1.0*PRi/outdeg)
            for node in destnodes:
                yield int(node), sendvalue
  
    def reducer_init(self):
        
        self.alpha = float(jobconf_from_env('alpha'))
        self.G = int(jobconf_from_env('G'))
        self.n = int(jobconf_from_env('num_topics'))
        self.beta = float(jobconf_from_env('beta'))
        topicCountFilename = jobconf_from_env('topicCountFilename')
        
        self.T = {}
        with open(topicCountFilename, 'r') as f:
            for line in f.readlines():
                topic,count = line.strip().split('\t')
                if topic=='*':
                    topic = 0
                else: 
                    topic = int(topic)
                self.T[topic]=int(count)
            
    def reducer(self,node,data):
        
        sendvalue = {}
        sendvalue['PR']=[0]*(self.n+1)
        sendvalue['topic']=None
        sendvalue['edges']=None
       
        for value in data:
            if value['edges']==None:
                for i in range(len(value['PR'])):
                    sendvalue['PR'][i]+=value['PR'][i]
            else:
                sendvalue['topic']=int(value['topic'])
                sendvalue['edges']=value['edges']
        
        if node=='*m':
            PR_adj = []
            for i in range(self.n+1):
                PR_adj.append([(1.0/self.G)*(1.0-self.alpha)*sendvalue['PR'][i]])
            sendvalue['PR']=PR_adj
            for i in range(self.G):
                yield i+1, sendvalue
        else:
            PR_adj = []
            PR_adj.append(self.alpha*(1.0/self.G)+(1.0-self.alpha)*sendvalue['PR'][i])
            for i in range(1,self.n+1):
                if sendvalue['topic']==i:
                    PR_adj.append(self.alpha*self.beta*(1.0/self.T[i])+(1.0-self.alpha)*sendvalue['PR'][i])
                else:
                    PR_adj.append(self.alpha*(1-self.beta)*(1.0/(self.G-self.T[i]))+(1.0-self.alpha)*sendvalue['PR'][i])
                    
            sendvalue['PR']=PR_adj
            yield int(node), sendvalue
        
    
    def reducer_redistr(self, node, data):
        
        n = int(jobconf_from_env('num_topics'))
        
        finalvalue = {}
        finalvalue['PR']=[0]*(n+1)
        finalvalue['topic']=None
        finalvalue['edges']=None
        
        for value in data:
            for i in range(n+1):
                finalvalue['PR'][i]+=value['PR'][i]
            if value['edges']!=None:
                finalvalue['edges'] = value['edges']
                finalvalue['topic']=int(value['topic'])
        
        yield int(node), finalvalue
        
   
if __name__ == '__main__':
    MRJobPageRankTopic.run()


Overwriting MRJob_PageRank_Topic.py

In [65]:
!python MRJob_PageRank_Topic.py randNet_input.txt --file 'randNet_topicCount.txt' --jobconf 'G=100' --jobconf 'alpha=0.15'\
                                                    --jobconf 'beta=0.99' --jobconf 'topicCountFilename=randNet_topicCount.txt'\
                                                    --jobconf 'num_topics=10' --iterations 9 > randNet_Topic_output.txt


using configs in /Users/davidadams/.mrjob.conf
creating tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-0-mapper_part-00000
Counters from step 1:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-0-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-0-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-0-reducer_part-00000
Counters from step 1:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-1-mapper_part-00000
Counters from step 2:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-1-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-1-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-1-reducer_part-00000
Counters from step 2:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-2-mapper_part-00000
Counters from step 3:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-2-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-2-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-2-reducer_part-00000
Counters from step 3:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-3-mapper_part-00000
Counters from step 4:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-3-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-3-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-3-reducer_part-00000
Counters from step 4:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-4-mapper_part-00000
Counters from step 5:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-4-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-4-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-4-reducer_part-00000
Counters from step 5:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-5-mapper_part-00000
Counters from step 6:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-5-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-5-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-5-reducer_part-00000
Counters from step 6:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-6-mapper_part-00000
Counters from step 7:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-6-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-6-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-6-reducer_part-00000
Counters from step 7:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-7-mapper_part-00000
Counters from step 8:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-7-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-7-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-7-reducer_part-00000
Counters from step 8:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-8-mapper_part-00000
Counters from step 9:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-8-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-8-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-8-reducer_part-00000
Counters from step 9:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-9-mapper_part-00000
Counters from step 10:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-9-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-9-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-9-reducer_part-00000
Counters from step 10:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-10-mapper_part-00000
Counters from step 11:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-10-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-10-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-10-reducer_part-00000
Counters from step 11:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-11-mapper_part-00000
Counters from step 12:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-11-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-11-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-11-reducer_part-00000
Counters from step 12:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-12-mapper_part-00000
Counters from step 13:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-12-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-12-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-12-reducer_part-00000
Counters from step 13:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-13-mapper_part-00000
Counters from step 14:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-13-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-13-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-13-reducer_part-00000
Counters from step 14:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-14-mapper_part-00000
Counters from step 15:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-14-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-14-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-14-reducer_part-00000
Counters from step 15:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-15-mapper_part-00000
Counters from step 16:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-15-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-15-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-15-reducer_part-00000
Counters from step 16:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-16-mapper_part-00000
Counters from step 17:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-16-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-16-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-16-reducer_part-00000
Counters from step 17:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-17-mapper_part-00000
Counters from step 18:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-17-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-17-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-17-reducer_part-00000
Counters from step 18:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-18-mapper_part-00000
Counters from step 19:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-18-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-18-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-18-reducer_part-00000
Counters from step 19:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-19-mapper_part-00000
Counters from step 20:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-19-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-19-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-19-reducer_part-00000
Counters from step 20:
  (no counters found)
Moving /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/step-19-reducer_part-00000 -> /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/output/part-00000
Streaming final output from /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776/output
removing tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PageRank_Topic.davidadams.20151107.031945.328776

In [8]:
%%writefile MRtop10.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.protocol import JSONProtocol
from mrjob.step import MRStep
import re
from itertools import combinations
from ast import literal_eval
import sys
import string
from operator import itemgetter
from collections import defaultdict
from mrjob.compat import jobconf_from_env

'''
    Modified from Doug and Group E's code
'''


class MRtop10(MRJob):
    OUTPUT_PROTOCOL = JSONProtocol
    mapperListDict = defaultdict(list)
    reducerListDict = defaultdict(list)
    
    def jobconf(self):
        orig_jobconf = super(MRtop10, self).jobconf()        
        custom_jobconf = {
            'mapred.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
            'mapred.text.key.comparator.options': '-k1rn',
            'mapred.reduce.tasks': '1',
        }
        combined_jobconf = orig_jobconf
        combined_jobconf.update(custom_jobconf)
        self.jobconf = combined_jobconf
        return combined_jobconf

    def steps(self):
        return [MRStep(
                mapper = self.mapper, 
                reducer_init = self.reducer_init,
                reducer = self.reducer,
                reducer_final = self.reducer_final
            )]
    def getFloatKey(item):
        return float(item[1])
    
    def mapper(self, key, line):

        fields = line.strip().split('\t')
        nodeid = fields[0]
        nodeDict = literal_eval(fields[1])
        for i in range(11):
            yield float(nodeDict['PR'][i]), (i, nodeid, nodeDict['topic'])

    def reducer_init(self):
        self.n = int(jobconf_from_env('num_topics'))
        self.top_pages = defaultdict(list)
            
    def reducer(self, score, nodetups):
        for node in nodetups:
            topic = node[0]
            if len(self.top_pages[topic])<10:
                self.top_pages[topic].append([score, node])

                
    def reducer_final(self):
        for page, scores in self.top_pages.iteritems():
            for score in scores:
                rank = score[0]
                node = score[1][1]
                topic = score[1][2]
                yield "Vector "+str(page), (rank, node, topic)

                
                
if __name__ == '__main__':
    MRtop10.run()


Overwriting MRtop10.py

In [10]:
!python MRtop10.py randNet_Topic_output.txt --jobconf 'num_topics=10' -r emr


using configs in /Users/davidadams/.mrjob.conf
creating tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRtop10.davidadams.20151107.045928.232288
writing master bootstrap script to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRtop10.davidadams.20151107.045928.232288/b.py

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

Copying non-input files into s3://ucb-mids-mls-katieadams/tmp/MRtop10.davidadams.20151107.045928.232288/files/
Waiting 5.0s for S3 eventual consistency
Creating Elastic MapReduce job flow
Job flow created with ID: j-25CYGX08O6UFX
Created new job flow j-25CYGX08O6UFX
Job launched 31.3s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 62.5s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 93.6s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 125.2s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 156.4s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 188.1s ago, status BOOTSTRAPPING: Running bootstrap actions
Job launched 219.3s ago, status BOOTSTRAPPING: Running bootstrap actions
Job launched 250.6s ago, status BOOTSTRAPPING: Running bootstrap actions
Job launched 281.8s ago, status BOOTSTRAPPING: Running bootstrap actions
Job launched 313.0s ago, status RUNNING: Running step
Job launched 344.2s ago, status RUNNING: Running step (MRtop10.davidadams.20151107.045928.232288: Step 1 of 1)
Job launched 375.4s ago, status RUNNING: Running step (MRtop10.davidadams.20151107.045928.232288: Step 1 of 1)
Job launched 406.5s ago, status RUNNING: Running step (MRtop10.davidadams.20151107.045928.232288: Step 1 of 1)
Job launched 437.6s ago, status RUNNING: Running step (MRtop10.davidadams.20151107.045928.232288: Step 1 of 1)
Job completed.
Running time was 120.0s (not counting time spent waiting for the EC2 instances)
ec2_key_pair_file not specified, going to S3
Fetching counters from S3...
Waiting 5.0s for S3 eventual consistency
Counters from step 1:
  File Input Format Counters :
    Bytes Read: 49631
  File Output Format Counters :
    Bytes Written: 4740
  FileSystemCounters:
    FILE_BYTES_READ: 23913
    FILE_BYTES_WRITTEN: 183059
    HDFS_BYTES_READ: 632
    S3_BYTES_READ: 49631
    S3_BYTES_WRITTEN: 4740
  Job Counters :
    Launched map tasks: 4
    Launched reduce tasks: 1
    Rack-local map tasks: 4
    SLOTS_MILLIS_MAPS: 79773
    SLOTS_MILLIS_REDUCES: 37341
    Total time spent by all maps waiting after reserving slots (ms): 0
    Total time spent by all reduces waiting after reserving slots (ms): 0
  Map-Reduce Framework:
    CPU time spent (ms): 6590
    Combine input records: 0
    Combine output records: 0
    Map input bytes: 33090
    Map input records: 100
    Map output bytes: 38120
    Map output materialized bytes: 24782
    Map output records: 1100
    Physical memory (bytes) snapshot: 871424000
    Reduce input groups: 1100
    Reduce input records: 1100
    Reduce output records: 110
    Reduce shuffle bytes: 24782
    SPLIT_RAW_BYTES: 632
    Spilled Records: 2200
    Total committed heap usage (bytes): 606838784
    Virtual memory (bytes) snapshot: 3240636416
Streaming final output from s3://ucb-mids-mls-katieadams/tmp/MRtop10.davidadams.20151107.045928.232288/output/
"Vector 0"	[0.018450492994579813, "63", 4]
"Vector 0"	[0.017279654921740766, "61", 8]
"Vector 0"	[0.016480564476828941, "15", 3]
"Vector 0"	[0.015456832014712155, "74", 10]
"Vector 0"	[0.014915487546274911, "92", 1]
"Vector 0"	[0.014886676428498469, "100", 8]
"Vector 0"	[0.014724267309124666, "90", 5]
"Vector 0"	[0.014495723860992503, "13", 6]
"Vector 0"	[0.014248697591624835, "85", 7]
"Vector 0"	[0.014141694236526918, "71", 2]
"Vector 1"	[0.020645898325225003, "32", 1]
"Vector 1"	[0.02054756962678736, "77", 1]
"Vector 1"	[0.019754313100732439, "52", 1]
"Vector 1"	[0.019529238246263958, "92", 1]
"Vector 1"	[0.018565525448276273, "10", 1]
"Vector 1"	[0.018522539827068918, "27", 1]
"Vector 1"	[0.017840510571832245, "85", 7]
"Vector 1"	[0.017692389508390936, "98", 1]
"Vector 1"	[0.017514128674978226, "46", 1]
"Vector 1"	[0.016028121317860768, "74", 10]
"Vector 2"	[0.03084746002637323, "58", 2]
"Vector 2"	[0.029665243324997904, "71", 2]
"Vector 2"	[0.029296846892829638, "9", 2]
"Vector 2"	[0.028914805418738834, "73", 2]
"Vector 2"	[0.026888935387018635, "12", 2]
"Vector 2"	[0.025799681887661255, "59", 2]
"Vector 2"	[0.024849600531229588, "75", 2]
"Vector 2"	[0.022858211891270377, "82", 2]
"Vector 2"	[0.016322098538761654, "52", 1]
"Vector 2"	[0.01515867219849016, "17", 10]
"Vector 3"	[0.031529068397347246, "15", 3]
"Vector 3"	[0.027076597693858763, "70", 3]
"Vector 3"	[0.026527964044297941, "86", 3]
"Vector 3"	[0.024463316416756184, "91", 3]
"Vector 3"	[0.024148527884714856, "66", 3]
"Vector 3"	[0.023705087243707054, "2", 3]
"Vector 3"	[0.022767111411451758, "31", 3]
"Vector 3"	[0.022178501156581275, "40", 3]
"Vector 3"	[0.019745051424447244, "20", 3]
"Vector 3"	[0.01589997292892676, "74", 10]
"Vector 4"	[0.026202043207444136, "63", 4]
"Vector 4"	[0.021760079633412529, "83", 4]
"Vector 4"	[0.020623779777070743, "65", 4]
"Vector 4"	[0.020210101152229841, "78", 4]
"Vector 4"	[0.019908459750517585, "41", 4]
"Vector 4"	[0.0195198800713322, "84", 4]
"Vector 4"	[0.018428650565935505, "79", 4]
"Vector 4"	[0.017515429366835658, "38", 4]
"Vector 4"	[0.016752163539375291, "15", 3]
"Vector 4"	[0.016694729499179141, "72", 4]
"Vector 5"	[0.028963266500299595, "99", 5]
"Vector 5"	[0.028344984671495925, "90", 5]
"Vector 5"	[0.027168703255116916, "88", 5]
"Vector 5"	[0.026830795332821233, "51", 5]
"Vector 5"	[0.025553308477928548, "45", 5]
"Vector 5"	[0.023919955028234258, "5", 5]
"Vector 5"	[0.023909744584958762, "34", 5]
"Vector 5"	[0.023363274382490386, "4", 5]
"Vector 5"	[0.022839620864129398, "80", 5]
"Vector 5"	[0.016741485269746215, "100", 8]
"Vector 6"	[0.034571570877532272, "13", 6]
"Vector 6"	[0.032853906714508899, "56", 6]
"Vector 6"	[0.031777155876364656, "37", 6]
"Vector 6"	[0.031339648198519324, "11", 6]
"Vector 6"	[0.03012079634490477, "69", 6]
"Vector 6"	[0.02834890267401214, "23", 6]
"Vector 6"	[0.017242580581008006, "15", 3]
"Vector 6"	[0.016989438344123808, "85", 7]
"Vector 6"	[0.01660216963107063, "52", 1]
"Vector 6"	[0.015461058978408842, "74", 10]
"Vector 7"	[0.026794227495348936, "85", 7]
"Vector 7"	[0.026608778613501603, "25", 7]
"Vector 7"	[0.024827245457770573, "28", 7]
"Vector 7"	[0.024767654194382913, "53", 7]
"Vector 7"	[0.024200828678767147, "35", 7]
"Vector 7"	[0.023395013979336465, "97", 7]
"Vector 7"	[0.022865013665705175, "47", 7]
"Vector 7"	[0.022560854063987799, "55", 7]
"Vector 7"	[0.022134839131015584, "30", 7]
"Vector 7"	[0.020089080196239371, "50", 7]
"Vector 8"	[0.032866030460939763, "100", 8]
"Vector 8"	[0.027858570822448362, "61", 8]
"Vector 8"	[0.027195521505354078, "39", 8]
"Vector 8"	[0.027153099743645281, "8", 8]
"Vector 8"	[0.025346793563654338, "62", 8]
"Vector 8"	[0.025298416024840099, "87", 8]
"Vector 8"	[0.023506852654286991, "6", 8]
"Vector 8"	[0.022891738248983953, "54", 8]
"Vector 8"	[0.020622660245957835, "18", 8]
"Vector 8"	[0.015379799633741306, "9", 2]
"Vector 9"	[0.030198889934084209, "94", 9]
"Vector 9"	[0.029495197684241921, "14", 9]
"Vector 9"	[0.029197862681776883, "42", 9]
"Vector 9"	[0.028399569111395757, "21", 9]
"Vector 9"	[0.027461793981146124, "57", 9]
"Vector 9"	[0.026260949679310815, "96", 9]
"Vector 9"	[0.025773074359528945, "24", 9]
"Vector 9"	[0.017161492066928675, "63", 4]
"Vector 9"	[0.016377328880887894, "61", 8]
"Vector 9"	[0.014277712975620794, "74", 10]
"Vector 10"	[0.026331832014712155, "74", 10]
"Vector 10"	[0.023590536112239317, "17", 10]
"Vector 10"	[0.023574267728797862, "49", 10]
"Vector 10"	[0.020629340496081131, "95", 10]
"Vector 10"	[0.019913805171583917, "7", 10]
"Vector 10"	[0.019364174927853535, "43", 10]
"Vector 10"	[0.019038271865623197, "68", 10]
"Vector 10"	[0.019005045680706455, "48", 10]
"Vector 10"	[0.018998083854615253, "1", 10]
"Vector 10"	[0.018639556040770718, "3", 10]
removing tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRtop10.davidadams.20151107.045928.232288
Removing all files in s3://ucb-mids-mls-katieadams/tmp/MRtop10.davidadams.20151107.045928.232288/
Removing all files in s3://ucb-mids-mls-katieadams/tmp/logs/j-25CYGX08O6UFX/
Terminating job flow: j-25CYGX08O6UFX

HW 9.5: Applying topic-specific PageRank to Wikipedia

Here you will apply your topic-specific PageRank implementation to Wikipedia, defining topics (very arbitrarily) for each page by the length (number of characters) of the name of the article mod 10, so that there are 10 topics. Once again, print out the top ten ranking nodes and their topics for each of the 11 versions, and comment on your result.
Assume a teleportation factor of 0.15 in all your analyses.


In [25]:
def assignWikiTopics():
    ftopics = open('wiki_topics.txt','w')
    i=0
    with open('indices.txt','r') as fidx:
        for line in fidx.readlines():
            i+=1
            if i%500000==0:
                print "progress: ",i*1.0/15192277
            line = line.strip().split('\t')
            topic = len(line[0])%10
            idx = int(line[1])
            ftopics.write(str(idx)+'\t'+str(topic)+'\n')
    ftopics.close()

assignWikiTopics()


progress:  0.00329114588945
progress:  0.00658229177891
progress:  0.00987343766836
progress:  0.0131645835578
progress:  0.0164557294473
progress:  0.0197468753367
progress:  0.0230380212262
progress:  0.0263291671156
progress:  0.0296203130051
progress:  0.0329114588945
progress:  0.036202604784
progress:  0.0394937506735
progress:  0.0427848965629
progress:  0.0460760424524
progress:  0.0493671883418
progress:  0.0526583342313
progress:  0.0559494801207
progress:  0.0592406260102
progress:  0.0625317718996
progress:  0.0658229177891
progress:  0.0691140636785
progress:  0.072405209568
progress:  0.0756963554574
progress:  0.0789875013469
progress:  0.0822786472364
progress:  0.0855697931258
progress:  0.0888609390153
progress:  0.0921520849047
progress:  0.0954432307942
progress:  0.0987343766836
progress:  0.102025522573
progress:  0.105316668463
progress:  0.108607814352
progress:  0.111898960241
progress:  0.115190106131
progress:  0.11848125202
progress:  0.12177239791
progress:  0.125063543799
progress:  0.128354689689
progress:  0.131645835578
progress:  0.134936981468
progress:  0.138228127357
progress:  0.141519273247
progress:  0.144810419136
progress:  0.148101565025
progress:  0.151392710915
progress:  0.154683856804
progress:  0.157975002694
progress:  0.161266148583
progress:  0.164557294473
progress:  0.167848440362
progress:  0.171139586252
progress:  0.174430732141
progress:  0.177721878031
progress:  0.18101302392
progress:  0.184304169809
progress:  0.187595315699
progress:  0.190886461588
progress:  0.194177607478
progress:  0.197468753367
progress:  0.200759899257
progress:  0.204051045146
progress:  0.207342191036
progress:  0.210633336925
progress:  0.213924482815
progress:  0.217215628704
progress:  0.220506774593
progress:  0.223797920483
progress:  0.227089066372
progress:  0.230380212262
progress:  0.233671358151
progress:  0.236962504041
progress:  0.24025364993
progress:  0.24354479582
progress:  0.246835941709
progress:  0.250127087599
progress:  0.253418233488
progress:  0.256709379377
progress:  0.260000525267
progress:  0.263291671156
progress:  0.266582817046
progress:  0.269873962935
progress:  0.273165108825
progress:  0.276456254714
progress:  0.279747400604
progress:  0.283038546493
progress:  0.286329692383
progress:  0.289620838272
progress:  0.292911984161
progress:  0.296203130051
progress:  0.29949427594
progress:  0.30278542183
progress:  0.306076567719
progress:  0.309367713609
progress:  0.312658859498
progress:  0.315950005388
progress:  0.319241151277
progress:  0.322532297167
progress:  0.325823443056
progress:  0.329114588945
progress:  0.332405734835
progress:  0.335696880724
progress:  0.338988026614
progress:  0.342279172503
progress:  0.345570318393
progress:  0.348861464282
progress:  0.352152610172
progress:  0.355443756061
progress:  0.358734901951
progress:  0.36202604784
progress:  0.365317193729
progress:  0.368608339619
progress:  0.371899485508
progress:  0.375190631398
progress:  0.378481777287
progress:  0.381772923177
progress:  0.385064069066
progress:  0.388355214956
progress:  0.391646360845
progress:  0.394937506735
progress:  0.398228652624
progress:  0.401519798513
progress:  0.404810944403
progress:  0.408102090292
progress:  0.411393236182
progress:  0.414684382071
progress:  0.417975527961
progress:  0.42126667385
progress:  0.42455781974
progress:  0.427848965629
progress:  0.431140111519
progress:  0.434431257408
progress:  0.437722403297
progress:  0.441013549187
progress:  0.444304695076
progress:  0.447595840966
progress:  0.450886986855
progress:  0.454178132745
progress:  0.457469278634
progress:  0.460760424524
progress:  0.464051570413
progress:  0.467342716303
progress:  0.470633862192
progress:  0.473925008081
progress:  0.477216153971
progress:  0.48050729986
progress:  0.48379844575
progress:  0.487089591639
progress:  0.490380737529
progress:  0.493671883418
progress:  0.496963029308
progress:  0.500254175197
progress:  0.503545321086
progress:  0.506836466976
progress:  0.510127612865
progress:  0.513418758755
progress:  0.516709904644
progress:  0.520001050534
progress:  0.523292196423
progress:  0.526583342313
progress:  0.529874488202
progress:  0.533165634092
progress:  0.536456779981
progress:  0.53974792587
progress:  0.54303907176
progress:  0.546330217649
progress:  0.549621363539
progress:  0.552912509428
progress:  0.556203655318
progress:  0.559494801207
progress:  0.562785947097
progress:  0.566077092986
progress:  0.569368238876
progress:  0.572659384765
progress:  0.575950530654
progress:  0.579241676544
progress:  0.582532822433
progress:  0.585823968323
progress:  0.589115114212
progress:  0.592406260102
progress:  0.595697405991
progress:  0.598988551881
progress:  0.60227969777
progress:  0.60557084366
progress:  0.608861989549
progress:  0.612153135438
progress:  0.615444281328
progress:  0.618735427217
progress:  0.622026573107
progress:  0.625317718996
progress:  0.628608864886
progress:  0.631900010775
progress:  0.635191156665
progress:  0.638482302554
progress:  0.641773448444
progress:  0.645064594333
progress:  0.648355740222
progress:  0.651646886112
progress:  0.654938032001
progress:  0.658229177891
progress:  0.66152032378
progress:  0.66481146967
progress:  0.668102615559
progress:  0.671393761449
progress:  0.674684907338
progress:  0.677976053228
progress:  0.681267199117
progress:  0.684558345006
progress:  0.687849490896
progress:  0.691140636785
progress:  0.694431782675
progress:  0.697722928564
progress:  0.701014074454
progress:  0.704305220343
progress:  0.707596366233
progress:  0.710887512122
progress:  0.714178658012
progress:  0.717469803901
progress:  0.72076094979
progress:  0.72405209568
progress:  0.727343241569
progress:  0.730634387459
progress:  0.733925533348
progress:  0.737216679238
progress:  0.740507825127
progress:  0.743798971017
progress:  0.747090116906
progress:  0.750381262796
progress:  0.753672408685
progress:  0.756963554574
progress:  0.760254700464
progress:  0.763545846353
progress:  0.766836992243
progress:  0.770128138132
progress:  0.773419284022
progress:  0.776710429911
progress:  0.780001575801
progress:  0.78329272169
progress:  0.78658386758
progress:  0.789875013469
progress:  0.793166159358
progress:  0.796457305248
progress:  0.799748451137
progress:  0.803039597027
progress:  0.806330742916
progress:  0.809621888806
progress:  0.812913034695
progress:  0.816204180585
progress:  0.819495326474
progress:  0.822786472364
progress:  0.826077618253
progress:  0.829368764142
progress:  0.832659910032
progress:  0.835951055921
progress:  0.839242201811
progress:  0.8425333477
progress:  0.84582449359
progress:  0.849115639479
progress:  0.852406785369
progress:  0.855697931258
progress:  0.858989077148
progress:  0.862280223037
progress:  0.865571368926
progress:  0.868862514816
progress:  0.872153660705
progress:  0.875444806595
progress:  0.878735952484
progress:  0.882027098374
progress:  0.885318244263
progress:  0.888609390153
progress:  0.891900536042
progress:  0.895191681932
progress:  0.898482827821
progress:  0.90177397371
progress:  0.9050651196
progress:  0.908356265489
progress:  0.911647411379
progress:  0.914938557268
progress:  0.918229703158
progress:  0.921520849047
progress:  0.924811994937
progress:  0.928103140826
progress:  0.931394286716
progress:  0.934685432605
progress:  0.937976578494
progress:  0.941267724384
progress:  0.944558870273
progress:  0.947850016163
progress:  0.951141162052
progress:  0.954432307942
progress:  0.957723453831
progress:  0.961014599721
progress:  0.96430574561
progress:  0.9675968915
progress:  0.970888037389
progress:  0.974179183278
progress:  0.977470329168
progress:  0.980761475057
progress:  0.984052620947
progress:  0.987343766836
progress:  0.990634912726
progress:  0.993926058615
progress:  0.997217204505

In [34]:
!python MRJob_PreprocessGraph_topic.py s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt --file 'wiki_topics.txt' --jobconf G=15192277 --jobconf 'topicIdxFilename=wiki_topics.txt' -r emr --num-ec2-instances 4 --ec2-instance-type m1.large --output-dir s3://ucb-mids-mls-katieadams/output-wiki-preprocess-topics --no-output


using configs in /Users/davidadams/.mrjob.conf
creating tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300
writing master bootstrap script to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300/b.py

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

Copying non-input files into s3://ucb-mids-mls-katieadams/tmp/MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300/files/
Waiting 5.0s for S3 eventual consistency
Creating Elastic MapReduce job flow
Job flow created with ID: j-273B8CMZ72WBW
Created new job flow j-273B8CMZ72WBW
Job launched 31.1s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 62.7s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 93.8s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 125.3s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 156.5s ago, status STARTING: Provisioning Amazon EC2 capacity
Job launched 187.7s ago, status STARTING: Configuring cluster software
Job launched 218.8s ago, status STARTING: Configuring cluster software
Job launched 250.0s ago, status BOOTSTRAPPING: Running bootstrap actions
Job launched 281.1s ago, status BOOTSTRAPPING: Running bootstrap actions
Job launched 312.3s ago, status RUNNING: Running step
Job launched 343.4s ago, status RUNNING: Running step
Job launched 374.6s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 405.7s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 436.6s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 467.6s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 498.7s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 530.5s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 561.6s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 592.7s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 623.8s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 655.0s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 686.1s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 717.2s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job launched 748.9s ago, status RUNNING: Running step (MRJob_PreprocessGraph_topic.davidadams.20151107.002601.081300: Step 1 of 1)
Job on job flow j-273B8CMZ72WBW failed with status TERMINATING: Shut down as step failed
Logs are in s3://ucb-mids-mls-katieadams/tmp/logs/j-273B8CMZ72WBW/
ec2_key_pair_file not specified, going to S3
Scanning S3 logs for probable cause of failure
Waiting 5.0s for S3 eventual consistency
Terminating job flow: j-273B8CMZ72WBW
Traceback (most recent call last):
  File "MRJob_PreprocessGraph_topic.py", line 73, in <module>
    MRJobPreprocessGraphTopic.run()
  File "/Library/Python/2.7/site-packages/mrjob/job.py", line 461, in run
    mr_job.execute()
  File "/Library/Python/2.7/site-packages/mrjob/job.py", line 479, in execute
    super(MRJob, self).execute()
  File "/Library/Python/2.7/site-packages/mrjob/launch.py", line 153, in execute
    self.run_job()
  File "/Library/Python/2.7/site-packages/mrjob/launch.py", line 216, in run_job
    runner.run()
  File "/Library/Python/2.7/site-packages/mrjob/runner.py", line 470, in run
    self._run()
  File "/Library/Python/2.7/site-packages/mrjob/emr.py", line 882, in _run
    self._wait_for_job_to_complete()
  File "/Library/Python/2.7/site-packages/mrjob/emr.py", line 1767, in _wait_for_job_to_complete
    raise Exception(msg)
Exception: Job on job flow j-273B8CMZ72WBW failed with status TERMINATING: Shut down as step failed

Memory Error on emr => running locally -- ran out of disk space in map step


In [35]:
!python MRJob_PreprocessGraph_topic.py all-pages-indexed-out.txt --file 'wiki_topics.txt' --jobconf 'G=15192277' --jobconf 'topicIdxFilename=wiki_topics.txt'


using configs in /Users/davidadams/.mrjob.conf
creating tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151107.010126.858967

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph_topic.davidadams.20151107.010126.858967/step-0-mapper_part-00000
^CTraceback (most recent call last):
  File "MRJob_PreprocessGraph_topic.py", line 73, in <module>
    MRJobPreprocessGraphTopic.run()
  File "/Library/Python/2.7/site-packages/mrjob/job.py", line 461, in run
    mr_job.execute()
  File "/Library/Python/2.7/site-packages/mrjob/job.py", line 479, in execute
    super(MRJob, self).execute()
  File "/Library/Python/2.7/site-packages/mrjob/launch.py", line 153, in execute
    self.run_job()
  File "/Library/Python/2.7/site-packages/mrjob/launch.py", line 216, in run_job
    runner.run()
  File "/Library/Python/2.7/site-packages/mrjob/runner.py", line 470, in run
    self._run()
  File "/Library/Python/2.7/site-packages/mrjob/sim.py", line 173, in _run
    self._invoke_step(step_num, 'mapper')
  File "/Library/Python/2.7/site-packages/mrjob/sim.py", line 260, in _invoke_step
    working_dir, env)
  File "/Library/Python/2.7/site-packages/mrjob/inline.py", line 160, in _run_step
    child_instance.execute()
  File "/Library/Python/2.7/site-packages/mrjob/job.py", line 470, in execute
    self.run_mapper(self.options.step_num)
  File "/Library/Python/2.7/site-packages/mrjob/job.py", line 535, in run_mapper
    for out_key, out_value in mapper(key, value) or ():
  File "MRJob_PreprocessGraph_topic.py", line 51, in mapper
    value['topic']=self.topic_labels[key]
KeyboardInterrupt

In [ ]:


In [ ]: