DATASCI W261: Machine Learning at Scale

Week 9, Homework 7

Group C: Katrina Adams, Eric Freeman, Doug Kelley

kradams@ischool.berkeley.edu, ericfreeman@ischool.berkeley.edu, kelleydac@ischool.berkeley.edu

27 October 2015


Undirected toy network dataset

In an undirected network all links are symmetric, i.e., for a pair of nodes 'A' and 'B,' both of the links:

A -> B and B -> A

will exist.

The toy data are available in a sparse (stripes) representation:

(node) \t (dictionary of links)

on AWS via the url:

s3://ucb-mids-mls-networks/undirected_toy.txt

In the dictionary, target nodes are keys, link weights are values (here, all weights are 1, i.e., the network is unweighted).

Directed toy network dataset

In a directed network all links are not necessarily symmetric, i.e., for a pair of nodes 'A' and 'B,' it is possible for only one of:

A -> B or B -> A

to exist.

These toy data are available in a sparse (stripes) representation:

(node) \t (dictionary of links)

on AWS via the url:

s3://ucb-mids-mls-networks/directed_toy.txt

In the dictionary, target nodes are keys, link weights are values (here, all weights are 1, i.e., the network is unweighted).


Some Imports


In [1]:
#%matplotlib inline

#import matplotlib.pyplot as plt
#import numpy as np

HW7.0:
In this part of your assignment you will develop the base of your code for the week.

Write MRJob classes to find shortest path graph distances, as described in the lectures. In addition to finding the distances, your code should also output a distance-minimizing path between the source and target.
Work locally for this part of the assignment, and use both of the undirected and directed toy networks.

To proof you code's function, run the following jobs

  • shortest path in the undirected network from node 1 to node 4
    Solution: 1,5,4

  • shortest path in the directed network from node 1 to node 5
    Solution: 1,2,4,5

and report your output---make sure it is correct!


In [1]:
'''
    HW 7.0

'''

%cd ~/Documents/W261/hw7/


/Users/davidadams/Documents/W261/hw7

In [114]:
%%writefile MRJob_SingleSourceShortestPath.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from sys import maxint

class MRJobSSSP(MRJob):
    

    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper = self.mapper)]
        return [MRStep(
                mapper = self.mapper,
                reducer = self.reducer
                )]
    
    def mapper(self, _, line):
        line = line.strip().split('\t')
        key = int(line[0])
        value = literal_eval(line[1])

        destnodes = value[0]
        dist = value[1]
        path = value[2]
        status = value[3]
                
        if status=='Q':
            for node in destnodes:
                yield int(node), [None, dist+1, path+[node], 'Q']
            yield key, [destnodes, dist, path, 'V']
            
        else:
            yield key, value
            
    def reducer(self,node,data):
        mindist = maxint
        statuslist=list()
        status = 'U'
        pathlen = 0
        paths = {}
        destnodes = []
        for value in data:
            if value[0]!=None and len(destnodes)==0:
                destnodes = value[0]
            dist = int(value[1])
            if dist<=mindist:
                mindist = dist
            if status=='U':
                status=value[3]
            elif status=='Q' and value[3]=='V':
                status='V'
            paths[value[3]]=value[2]
        
        if 'V' in paths.keys():
            realpath = paths['V']
        elif 'Q' in paths.keys():
            realpath = paths['Q']
        else:
            realpath = paths['U']
            
        
        yield int(node), [destnodes, mindist, realpath, status]


        
if __name__ == '__main__':
    MRJobSSSP.run()


Writing MRJob_SingleSourceShortestPath.py

In [119]:
#%load_ext autoreload
%autoreload 2

from MRJob_SingleSourceShortestPath import MRJobSSSP
from ast import literal_eval
from sys import maxint


def preprocessGraph(graphfilename):
    startnode = 1
    fpp = open('current_graph.txt', 'w')
    fraw = open(graphfilename, 'r')
    for line in fraw.readlines():
        line = line.strip().split('\t')
        key = line[0]
        edges = literal_eval(line[1])
        if int(key)==startnode:
            status = 'Q'
            dist = 0
        else:
            status = 'U'
            dist = maxint
        destnodes = list(edges.keys())
        path = []
        if int(key)==startnode:
            path=[key]
        value = [destnodes, dist, path, status]
        fpp.write(key+'\t'+str(value)+'\n')
    fpp.close()
    fraw.close()
    
    return None


def mrjob_sssp():
    
    mr_job = MRJobSSSP_Directed(args=["current_graph.txt", "--strict-protocols"])
    
    stop = False
    i = 0

    while(not stop):
        i+=1
        #print i
        numfrontier = 0
        with mr_job.make_runner() as runner:
            runner.run()
            with open('current_graph.txt', 'w+') as f:
                for line in runner.stream_output():
                    #print line
            
                    key,value =  mr_job.parse_output_line(line)
                    if value[3]=='Q':
                        numfrontier+=1
                    f.write(line)        
        #print str(i)+","+str(numfrontier)+" nodes in frontier"
        if numfrontier==0 or i>10:
            stop=True
    
    return None


def hw7_0():

    
    graphs = ["Undirected", "Directed"]
    graphfilenames = ["undirected_toy.txt", "directed_toy.txt"]
    
    for i in range(len(graphs)):
        preprocessGraph(graphfilenames[i])
        #!cp graph_input.txt current_graph.txt
        
        print graphs[i]
        
        mrjob_sssp()
        !cat current_graph.txt
        
    return None
        
    
        


hw7_0()


Undirected
1	[["2", "5"], 0, ["1"], "V"]
2	[["1", "3", "5", "4"], 1, ["1", "2"], "V"]
3	[["2", "4"], 2, ["1", "2", "3"], "V"]
4	[["3", "2", "5"], 2, ["1", "5", "4"], "V"]
5	[["1", "2", "4"], 1, ["1", "5"], "V"]
Directed
1	[["2", "6"], 0, ["1"], "V"]
2	[["1", "3", "4"], 1, ["1", "2"], "V"]
3	[["2", "4"], 2, ["1", "2", "3"], "V"]
4	[["2", "5"], 2, ["1", "2", "4"], "V"]
5	[["1", "2", "4"], 3, ["1", "2", "4", "5"], "V"]
6	[[], 1, ["1", "6"], "V"]

Main dataset 1: NLTK synonyms

In the next part of this assignment you will explore a network derived from the NLTK synonym database used for evaluation in HW 5. At a high level, this network is undirected, defined so that there exists link between two nodes/words if the pair or words are a synonym.
These data may be found at the location:

s3://ucb-mids-mls-networks/synNet/synNet.txt
s3://ucb-mids-mls-networks/synNet/indices.txt

where synNet.txt contains a sparse representation of the network:

(index) \t (dictionary of links)

in indexed form, and indices.txt contains a lookup list

(word) \t (index)

of indices and words. This network is small enough for you to explore and run scripts locally, but will also be good for a systems test (for later) on AWS.

In the dictionary, target nodes are keys, link weights are values (here, all weights are 1, i.e., the network is unweighted).

HW 7.1: Exploratory data analysis (NLTK synonyms)

Using MRJob, explore the synonyms network data. Consider plotting the degree distribution (does it follow a power law?), and determine some of the key features, like:

number of nodes,
number links,
or the average degree (i.e., the average number of links per node),
etc...

As you develop your code, please be sure to run it locally first (though on the whole dataset).
Once you have gotten you code to run locally, deploy it on AWS as a systems test in preparation for our next dataset (which will require AWS).


In [144]:
%%writefile MRJob_PreprocessGraph.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from sys import maxint


class MRJobPreprocessGraph(MRJob):
    

    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper = self.mapper)]
        return [MRStep(
                mapper = self.mapper,
                reducer = self.reducer
                )]
    
    def mapper(self, _, line):
        # start at "walk" (index=7827) and ending at "make" (index=536)
        startnode = 7827
        #startnode = 1
        line = line.strip().split('\t')
        key = line[0]
        edges = literal_eval(line[1])
        if int(key)==startnode:
            status = 'Q'
            dist = 0
        else:
            status = 'U'
            dist = maxint
        destnodes = list(edges.keys())
        path = []
        if int(key)==startnode:
            path=[key]
        value = [destnodes, dist, path, status]
        yield None,{key:value}

           
    def reducer(self,_,data):
        for line in data:
            key = line.keys()[0]
            value = line[key]
            yield int(key), value

        
if __name__ == '__main__':
    MRJobPreprocessGraph.run()


Overwriting MRJob_PreprocessGraph.py

In [158]:
!python MRJob_PreprocessGraph.py directed_toy.txt > graph_input_directed_toy.txt


using configs in /Users/davidadams/.mrjob.conf
creating tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151026.151432.088915

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151026.151432.088915/step-0-mapper_part-00000
Counters from step 1:
  (no counters found)
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151026.151432.088915/step-0-mapper-sorted
> sort /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151026.151432.088915/step-0-mapper_part-00000
writing to /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151026.151432.088915/step-0-reducer_part-00000
Counters from step 1:
  (no counters found)
Moving /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151026.151432.088915/step-0-reducer_part-00000 -> /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151026.151432.088915/output/part-00000
Streaming final output from /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151026.151432.088915/output
removing tmp directory /var/folders/7t/6bhz6vw52k52g_3jqj57mz6r0000gn/T/MRJob_PreprocessGraph.davidadams.20151026.151432.088915

In [159]:
!cat graph_input_directed_toy.txt


1	[["2", "6"], 9223372036854775807, [], "U"]
2	[["1", "3", "4"], 9223372036854775807, [], "U"]
3	[["2", "4"], 9223372036854775807, [], "U"]
4	[["2", "5"], 9223372036854775807, [], "U"]
5	[["1", "2", "4"], 9223372036854775807, [], "U"]

In [241]:
%%writefile MRJob_NumberOfNodes.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from sys import maxint

class MRJobNumNodes(MRJob):
    

    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper = self.mapper)]
        return [MRStep(
                mapper = self.mapper,
                reducer = self.reducer
                ),
               MRStep(
                reducer = self.reducer_count
                )]
    
    def mapper(self, _, line):
        line = line.strip().split('\t')
        key = int(line[0])
        value = literal_eval(line[1])
        destnodes = value[0]
        
        yield int(key), None
        
        for node in destnodes:
            yield int(node), None
           
    def reducer(self,node,_):
        yield None, int(node)
        
    def reducer_count(self,_,nodes):
        count = 0
        for node in nodes:
            count+=1
        yield None, count

        
if __name__ == '__main__':
    MRJobNumNodes.run()


Overwriting MRJob_NumberOfNodes.py

In [233]:
%%writefile MRJob_NumberOfLinks.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from mrjob.compat import jobconf_from_env


class MRJobNumLinks(MRJob):
    
    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper = self.mapper,reducer = self.reducer)]
        return [MRStep(
                mapper = self.mapper,
                reducer = self.reducer
                ),
               MRStep(
                reducer = self.reducer_count
                )]
    
    def mapper(self, _, line):
        isUndirected = int(jobconf_from_env('isUndirected'))
        line = line.strip().split('\t')
        key = int(line[0])
        value = literal_eval(line[1])
        destnodes = value[0]
        
        for node in destnodes:
            node = int(node)
            if isUndirected:
                if key<node:
                    yield (key,node),None
                else:
                    yield (node,key),None
            else:
                yield (key,node), None

           
    def reducer(self,link,_):
        yield None, link
        
    def reducer_count(self,_,links):
        count = 0
        for link in links:
            count+=1
        yield None, count

        
if __name__ == '__main__':
    MRJobNumLinks.run()


Overwriting MRJob_NumberOfLinks.py

In [263]:
%%writefile MRJob_AvgDegree.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from mrjob.compat import jobconf_from_env


class MRJobAvgDeg(MRJob):
    
    def steps(self):
        #return [MRStep(
        #        mapper = self.mapper)]
        
        return [MRStep(
                mapper = self.mapper,
                reducer = self.reducer
                )]
    
    def mapper(self, _, line):
        line = line.strip().split('\t')
        key = int(line[0])
        value = literal_eval(line[1])
        outdegree = len(value[0])
        yield None, outdegree

           
    def reducer(self,_,outdegrees):
        degTotal = 0
        count = 0
        for deg in outdegrees:
            degTotal += int(deg)
            count+=1
        yield None, 1.0*degTotal/count

        
if __name__ == '__main__':
    MRJobAvgDeg.run()


Overwriting MRJob_AvgDegree.py

In [266]:
#%load_ext autoreload
%autoreload 2

from MRJob_NumberOfNodes import MRJobNumNodes
from MRJob_NumberOfLinks import MRJobNumLinks
from MRJob_AvgDegree import MRJobAvgDeg


def numNodes(graphfilename):
    #mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
    mr_job_numnodes = MRJobNumNodes(args=[graphfilename])

    with mr_job_numnodes.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            null,count =  mr_job.parse_output_line(line)
            print "There are "+str(count)+" nodes in the graph."
                    
    return None

def numLinks(graphfilename, isUndirected):

    undirected_parameter = "isUndirected="+str(int(isUndirected))
    mr_job_numlinks = MRJobNumLinks(args=[graphfilename, '--jobconf', undirected_parameter])

    with mr_job_numlinks.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            null,count =  mr_job.parse_output_line(line)
            print "There are "+str(count)+" links in the graph."
                    
    return None

def avgDeg(graphfilename):
    mr_job_avgdeg = MRJobAvgDeg(args=[graphfilename])

    with mr_job_avgdeg.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            null,result =  mr_job.parse_output_line(line)
            print "The average out degree of the graph is "+str(result)+"."
                    
    return None


def hw7_1():

    #graphfilename = "graph_input_undirected_toy.txt"
    #graphfilename = "graph_input_directed_toy.txt"
    graphfilename = "graph_input_synNet.txt"
    isUndirected = True
    numNodes(graphfilename)
    numLinks(graphfilename, isUndirected)
    avgDeg(graphfilename)
        
    return None
        
  

hw7_1()


WARNING:mrjob.runner:
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols
WARNING:mrjob.runner:
WARNING:mrjob.runner:
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols
WARNING:mrjob.runner:
There are 8271 nodes in the graph.
There are 30567 links in the graph.
WARNING:mrjob.runner:
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols
WARNING:mrjob.runner:
The average out degree of the graph is 7.39136742836.

In [266]:
#%load_ext autoreload
%autoreload 2

from MRJob_NumberOfNodes import MRJobNumNodes
from MRJob_NumberOfLinks import MRJobNumLinks
from MRJob_AvgDegree import MRJobAvgDeg


def numNodes(graphfilename):
    #mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
    mr_job_numnodes = MRJobNumNodes(args=[graphfilename, "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small"])

    with mr_job_numnodes.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            null,count =  mr_job.parse_output_line(line)
            print "There are "+str(count)+" nodes in the graph."
                    
    return None

def numLinks(graphfilename, isUndirected):

    undirected_parameter = "isUndirected="+str(int(isUndirected))
    mr_job_numlinks = MRJobNumLinks(args=[graphfilename, '--jobconf', undirected_parameter, "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small"])

    with mr_job_numlinks.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            null,count =  mr_job.parse_output_line(line)
            print "There are "+str(count)+" links in the graph."
                    
    return None

def avgDeg(graphfilename):
    mr_job_avgdeg = MRJobAvgDeg(args=[graphfilename, "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small"])

    with mr_job_avgdeg.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            null,result =  mr_job.parse_output_line(line)
            print "The average out degree of the graph is "+str(result)+"."
                    
    return None


def hw7_1():

    graphfile = "graph_input_synNet.txt"
    isUndirected = True
    numNodes(graphfilename)
    numLinks(graphfilename, isUndirected)
    avgDeg(graphfilename)
        
    return None
        
  

hw7_1()


WARNING:mrjob.runner:
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols
WARNING:mrjob.runner:
WARNING:mrjob.runner:
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols
WARNING:mrjob.runner:
There are 8271 nodes in the graph.
There are 30567 links in the graph.
WARNING:mrjob.runner:
WARNING:mrjob.runner:PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols
WARNING:mrjob.runner:
The average out degree of the graph is 7.39136742836.

HW 7.2: Shortest path graph distances (NLTK synonyms)

Write (reuse your code from 7.0) an MRJob class to find shortest path graph distances, and apply it to the NLTK synonyms network dataset.

Proof your code's function by running the job:

  • shortest path starting at "walk" (index=7827) and ending at "make" (index=536),

and showing you code's output. Once again, your output should include the path and the distance.

As you develop your code, please be sure to run it locally first (though on the whole dataset).
Once you have gotten you code to run locally, deploy it on AWS as a systems test in preparation for our next dataset (which will require AWS).


In [114]:
%%writefile MRJob_SingleSourceShortestPath.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep   
from ast import literal_eval
from sys import maxint

class MRJobSSSP(MRJob):
    

    def steps(self):
        mapper_only = 0
        if mapper_only:
            return [MRStep(mapper = self.mapper)]
        return [MRStep(
                mapper = self.mapper,
                reducer = self.reducer
                )]
    
    def mapper(self, _, line):
        line = line.strip().split('\t')
        key = int(line[0])
        value = literal_eval(line[1])

        destnodes = value[0]
        dist = value[1]
        path = value[2]
        status = value[3]
                
        if status=='Q':
            for node in destnodes:
                yield int(node), [None, dist+1, path+[node], 'Q']
            yield key, [destnodes, dist, path, 'V']
            
        else:
            yield key, value
            
    def reducer(self,node,data):
        mindist = maxint
        statuslist=list()
        status = 'U'
        pathlen = 0
        paths = {}
        destnodes = []
        for value in data:
            if value[0]!=None and len(destnodes)==0:
                destnodes = value[0]
            dist = int(value[1])
            if dist<=mindist:
                mindist = dist
            if status=='U':
                status=value[3]
            elif status=='Q' and value[3]=='V':
                status='V'
            paths[value[3]]=value[2]
        
        if 'V' in paths.keys():
            realpath = paths['V']
        elif 'Q' in paths.keys():
            realpath = paths['Q']
        else:
            realpath = paths['U']
            
        
        yield int(node), [destnodes, mindist, realpath, status]


        
if __name__ == '__main__':
    MRJobSSSP.run()


Writing MRJob_SingleSourceShortestPath.py

In [127]:
#%load_ext autoreload
%autoreload 2

from MRJob_SingleSourceShortestPath import MRJobSSSP
from ast import literal_eval
from sys import maxint

def index_lookup(word):
    with open('indices.txt', 'r') as f:
        for line in f.readlines():
            line = line.strip().split('\t')
            if line[0]==word:
                return int(line[1])
    return None


def word_lookup(idx):
    with open('indices.txt', 'r') as f:
        for line in f.readlines():
            line = line.strip().split('\t')
            if line[1]==idx:
                return int(line[0])
    return None


def preprocessGraph(graphfilename):
    # start at "walk" (index=7827) and ending at "make" (index=536)
    startnode = 7827
    fpp = open('current_graph.txt', 'w')
    fraw = open(graphfilename, 'r')
    for line in fraw.readlines():
        line = line.strip().split('\t')
        key = line[0]
        edges = literal_eval(line[1])
        if int(key)==startnode:
            status = 'Q'
            dist = 0
        else:
            status = 'U'
            dist = maxint
        destnodes = list(edges.keys())
        path = []
        if int(key)==startnode:
            path=[key]
        value = [destnodes, dist, path, status]
        fpp.write(key+'\t'+str(value)+'\n')
    fpp.close()
    fraw.close()
    
    return None


def mrjob_sssp():
    
    mr_job = MRJobSSSP_Directed(args=["current_graph.txt", "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
    #mr_job = MRJobSSSP_Directed(args=["current_graph.txt", "--strict-protocols"])
    
    stop = False
    i = 0

    while(not stop):
        i+=1
        print i
        numfrontier = 0
        with mr_job.make_runner() as runner:
            runner.run()
            with open('current_graph.txt', 'w+') as f:
                for line in runner.stream_output():
                    #print line
            
                    key,value =  mr_job.parse_output_line(line)
                    if int(key)==536 and value[3]=='Q':
                        stop=True
                        print line
                    f.write(line)
                    
    return None


def hw7_2():

    graphfilename = "synNet.txt"
    preprocessGraph(graphfilename)
        
    mrjob_sssp()
    
    #!cat current_graph.txt
        
    return None
        
    
        


hw7_2()


WARNING:mrjob.util:hash_object() is deprecated and will be removed in v0.5
WARNING:mrjob.util:hash_object() is deprecated and will be removed in v0.5
1
2
WARNING:mrjob.util:hash_object() is deprecated and will be removed in v0.5
3
WARNING:mrjob.util:hash_object() is deprecated and will be removed in v0.5
536	[["215", "3648", "662", "57", "3658", "3656", "3657", "3651", "3652", "3653", "3555", "3554", "6002", "3552", "6000", "6001", "404", "2248", "2249", "3750", "2246", "2247", "3993", "3992", "3760", "3761", "1313", "3481", "3769", "3483", "3482", "3485", "121", "265", "264", "3647", "1554", "5593", "5592", "5594", "5596", "3241", "534", "533", "532", "531", "1668", "2257", "2256", "2255", "2254", "5911", "2252", "2251", "2250", "3775", "3774", "4320", "585", "4651", "4652", "3471", "3478", "3621", "448", "3749", "5353", "3742", "3743", "3740", "3741", "3310", "6003", "1647", "3551", "2253", "4420", "4421", "641", "2794", "5912", "2160", "3593", "4318", "3595", "3594", "3751", "4180", "3599", "3598", "64", "69", "657", "653", "659", "631", "3601", "3600", "4608", "4609", "3688", "1801", "722", "6058", "6057", "6056", "2439", "1209", "5210", "1162", "3738", "1661", "3739", "5211", "3737", "1199", "5354", "1810", "1811", "5351", "1195", "770", "4316", "4317", "5281", "4319", "3622", "616", "73", "5209", "5208", "4791", "5352", "6004", "5366", "5364", "5365", "1681", "1685", "1477", "474", "1688", "3553"], 3, ["7827", "4655", "631", "536"], "Q"]

Main dataset 2: English Wikipedia

For the remainder of this assignment you will explore the English Wikipedia hyperlink network.
The dataset is built from the Sept. 2015 XML snapshot of English Wikipedia.
For this directed network, a link between articles:

A -> B

is defined by the existence of a hyperlink in A pointing to B.
This network also exists in the indexed format:

Data: s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt
Data: s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-in.txt
Data: s3://ucb-mids-mls-networks/wikipedia/indices.txt

but has an index with more detailed data:

(article name) \t (index) \t (in degree) \t (out degree)

In the dictionary, target nodes are keys, link weights are values. Here, a weight indicates the number of time a page links to another. However, for the sake of this assignment, treat this an unweighted network, and set all weights to 1 upon data input.

HW 7.3: Exploratory data analysis (Wikipedia)

Using MRJob, explore the Wikipedia network data on the AWS cloud. Reuse your code from HW 7.1---does is scale well?
Be cautioned that Wikipedia is a directed network, where links are not symmetric.
So, even though a node may be linked to, it will not appear as a primary record itself if it has no out-links.
This means that you may have to ADJUST your code (depending on its design).
To be sure of your code's functionality in this context, run a systems test on the directed_toy.txt network.

HW 7.4: Shortest path graph distances (Wikipedia)

Using MRJob, find shortest path graph distances in the Wikipedia network on the AWS cloud. Reuse your code from 7.2, but once again be warned of Wikipedia being a directed network. To be sure of your code's functionality in this context, run a systems test on the directed_toy.txt network.

When running your code on the Wikipedia network, proof its function by running the job:

  • shortest path from "Ireland" (index=6176135) to "University of California, Berkeley" (index=13466359),

and show your code's output.

Once your code is running, find some other shortest paths and report your results.

HW 7.5: Conceptual exercise: Largest single-source network distances

Suppose you wanted to find the largest network distance from a single source, i.e., a node that is the furthest (but still reachable) from a single source.

How would you implement this task? How is this different from finding the shortest path graph distances?

Is this task more difficult to implement than the shortest path distance?

As you respond, please comment on program structure, runtimes, iterations, general system requirements, etc...

This task could be implemented as a breadth-first search with the stopping criteria being no more nodes on the frontier. It would require as many iterations as there are steps in the longest path. It would only be more difficult than finding the shortest path between two nodes in that it would need to run until the entire network has been explored. You would need to keep track of the nodes that are in the frontier in the previous step, because those nodes will be the furthest from the source node at that step.

HW 7.6: Computational exercise: Largest single-source network distances (optional)

Using MRJob, write a code to find the largest graph distance and distance-maximizing nodes from a single-source. Test your code first on the toy networks and synonyms network to proof its function.


In [ ]:


In [ ]: