Undirected toy network dataset
In an undirected network all links are symmetric, i.e., for a pair of nodes 'A' and 'B,' both of the links:
A -> B and B -> A
will exist.
The toy data are available in a sparse (stripes) representation:
(node) \t (dictionary of links)
on AWS via the url:
s3://ucb-mids-mls-networks/undirected_toy.txt
In the dictionary, target nodes are keys, link weights are values (here, all weights are 1, i.e., the network is unweighted).
Directed toy network dataset
In a directed network all links are not necessarily symmetric, i.e., for a pair of nodes 'A' and 'B,' it is possible for only one of:
A -> B or B -> A
to exist.
These toy data are available in a sparse (stripes) representation:
(node) \t (dictionary of links)
on AWS via the url:
s3://ucb-mids-mls-networks/directed_toy.txt
In the dictionary, target nodes are keys, link weights are values (here, all weights are 1, i.e., the network is unweighted).
Some Imports
In [1]:
#%matplotlib inline
#import matplotlib.pyplot as plt
#import numpy as np
HW7.0:
In this part of your assignment you will develop the base of your code for the week.
Write MRJob classes to find shortest path graph distances, as described in the lectures. In addition to finding the distances, your code should also output a distance-minimizing path between the source and target.
Work locally for this part of the assignment, and use
both of the undirected and directed toy networks.
To proof you code's function, run the following jobs
shortest path in the undirected network from node 1 to node 4
Solution: 1,5,4
shortest path in the directed network from node 1 to node 5
Solution: 1,2,4,5
and report your output---make sure it is correct!
In [1]:
'''
HW 7.0
'''
%cd ~/Documents/W261/hw7/
In [114]:
%%writefile MRJob_SingleSourceShortestPath.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from sys import maxint
class MRJobSSSP(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper,
reducer = self.reducer
)]
def mapper(self, _, line):
line = line.strip().split('\t')
key = int(line[0])
value = literal_eval(line[1])
destnodes = value[0]
dist = value[1]
path = value[2]
status = value[3]
if status=='Q':
for node in destnodes:
yield int(node), [None, dist+1, path+[node], 'Q']
yield key, [destnodes, dist, path, 'V']
else:
yield key, value
def reducer(self,node,data):
mindist = maxint
statuslist=list()
status = 'U'
pathlen = 0
paths = {}
destnodes = []
for value in data:
if value[0]!=None and len(destnodes)==0:
destnodes = value[0]
dist = int(value[1])
if dist<=mindist:
mindist = dist
if status=='U':
status=value[3]
elif status=='Q' and value[3]=='V':
status='V'
paths[value[3]]=value[2]
if 'V' in paths.keys():
realpath = paths['V']
elif 'Q' in paths.keys():
realpath = paths['Q']
else:
realpath = paths['U']
yield int(node), [destnodes, mindist, realpath, status]
if __name__ == '__main__':
MRJobSSSP.run()
In [119]:
#%load_ext autoreload
%autoreload 2
from MRJob_SingleSourceShortestPath import MRJobSSSP
from ast import literal_eval
from sys import maxint
def preprocessGraph(graphfilename):
startnode = 1
fpp = open('current_graph.txt', 'w')
fraw = open(graphfilename, 'r')
for line in fraw.readlines():
line = line.strip().split('\t')
key = line[0]
edges = literal_eval(line[1])
if int(key)==startnode:
status = 'Q'
dist = 0
else:
status = 'U'
dist = maxint
destnodes = list(edges.keys())
path = []
if int(key)==startnode:
path=[key]
value = [destnodes, dist, path, status]
fpp.write(key+'\t'+str(value)+'\n')
fpp.close()
fraw.close()
return None
def mrjob_sssp():
mr_job = MRJobSSSP_Directed(args=["current_graph.txt", "--strict-protocols"])
stop = False
i = 0
while(not stop):
i+=1
#print i
numfrontier = 0
with mr_job.make_runner() as runner:
runner.run()
with open('current_graph.txt', 'w+') as f:
for line in runner.stream_output():
#print line
key,value = mr_job.parse_output_line(line)
if value[3]=='Q':
numfrontier+=1
f.write(line)
#print str(i)+","+str(numfrontier)+" nodes in frontier"
if numfrontier==0 or i>10:
stop=True
return None
def hw7_0():
graphs = ["Undirected", "Directed"]
graphfilenames = ["undirected_toy.txt", "directed_toy.txt"]
for i in range(len(graphs)):
preprocessGraph(graphfilenames[i])
#!cp graph_input.txt current_graph.txt
print graphs[i]
mrjob_sssp()
!cat current_graph.txt
return None
hw7_0()
Main dataset 1: NLTK synonyms
In the next part of this assignment you will explore a network derived from the NLTK synonym database used for evaluation in HW 5. At a high level, this network is undirected, defined so that there exists link between two nodes/words if the pair or words are a synonym.
These data may be found at the location:
s3://ucb-mids-mls-networks/synNet/synNet.txt
s3://ucb-mids-mls-networks/synNet/indices.txt
where synNet.txt contains a sparse representation of the network:
(index) \t (dictionary of links)
in indexed form, and indices.txt contains a lookup list
(word) \t (index)
of indices and words. This network is small enough for you to explore and run scripts locally, but will also be good for a systems test (for later) on AWS.
In the dictionary, target nodes are keys, link weights are values (here, all weights are 1, i.e., the network is unweighted).
HW 7.1: Exploratory data analysis (NLTK synonyms)
Using MRJob, explore the synonyms network data. Consider plotting the degree distribution (does it follow a power law?), and determine some of the key features, like:
number of nodes,
number links,
or the average degree (i.e., the average number of links per node),
etc...
As you develop your code, please be sure to run it locally first (though on the whole dataset).
Once you have gotten you code to run locally, deploy it on AWS as a systems test in preparation for our next dataset (which will require AWS).
In [144]:
%%writefile MRJob_PreprocessGraph.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from sys import maxint
class MRJobPreprocessGraph(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper,
reducer = self.reducer
)]
def mapper(self, _, line):
# start at "walk" (index=7827) and ending at "make" (index=536)
startnode = 7827
#startnode = 1
line = line.strip().split('\t')
key = line[0]
edges = literal_eval(line[1])
if int(key)==startnode:
status = 'Q'
dist = 0
else:
status = 'U'
dist = maxint
destnodes = list(edges.keys())
path = []
if int(key)==startnode:
path=[key]
value = [destnodes, dist, path, status]
yield None,{key:value}
def reducer(self,_,data):
for line in data:
key = line.keys()[0]
value = line[key]
yield int(key), value
if __name__ == '__main__':
MRJobPreprocessGraph.run()
In [158]:
!python MRJob_PreprocessGraph.py directed_toy.txt > graph_input_directed_toy.txt
In [159]:
!cat graph_input_directed_toy.txt
In [241]:
%%writefile MRJob_NumberOfNodes.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from sys import maxint
class MRJobNumNodes(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper,
reducer = self.reducer
),
MRStep(
reducer = self.reducer_count
)]
def mapper(self, _, line):
line = line.strip().split('\t')
key = int(line[0])
value = literal_eval(line[1])
destnodes = value[0]
yield int(key), None
for node in destnodes:
yield int(node), None
def reducer(self,node,_):
yield None, int(node)
def reducer_count(self,_,nodes):
count = 0
for node in nodes:
count+=1
yield None, count
if __name__ == '__main__':
MRJobNumNodes.run()
In [233]:
%%writefile MRJob_NumberOfLinks.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
class MRJobNumLinks(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper,reducer = self.reducer)]
return [MRStep(
mapper = self.mapper,
reducer = self.reducer
),
MRStep(
reducer = self.reducer_count
)]
def mapper(self, _, line):
isUndirected = int(jobconf_from_env('isUndirected'))
line = line.strip().split('\t')
key = int(line[0])
value = literal_eval(line[1])
destnodes = value[0]
for node in destnodes:
node = int(node)
if isUndirected:
if key<node:
yield (key,node),None
else:
yield (node,key),None
else:
yield (key,node), None
def reducer(self,link,_):
yield None, link
def reducer_count(self,_,links):
count = 0
for link in links:
count+=1
yield None, count
if __name__ == '__main__':
MRJobNumLinks.run()
In [263]:
%%writefile MRJob_AvgDegree.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from mrjob.compat import jobconf_from_env
class MRJobAvgDeg(MRJob):
def steps(self):
#return [MRStep(
# mapper = self.mapper)]
return [MRStep(
mapper = self.mapper,
reducer = self.reducer
)]
def mapper(self, _, line):
line = line.strip().split('\t')
key = int(line[0])
value = literal_eval(line[1])
outdegree = len(value[0])
yield None, outdegree
def reducer(self,_,outdegrees):
degTotal = 0
count = 0
for deg in outdegrees:
degTotal += int(deg)
count+=1
yield None, 1.0*degTotal/count
if __name__ == '__main__':
MRJobAvgDeg.run()
In [266]:
#%load_ext autoreload
%autoreload 2
from MRJob_NumberOfNodes import MRJobNumNodes
from MRJob_NumberOfLinks import MRJobNumLinks
from MRJob_AvgDegree import MRJobAvgDeg
def numNodes(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_numnodes = MRJobNumNodes(args=[graphfilename])
with mr_job_numnodes.make_runner() as runner:
runner.run()
for line in runner.stream_output():
null,count = mr_job.parse_output_line(line)
print "There are "+str(count)+" nodes in the graph."
return None
def numLinks(graphfilename, isUndirected):
undirected_parameter = "isUndirected="+str(int(isUndirected))
mr_job_numlinks = MRJobNumLinks(args=[graphfilename, '--jobconf', undirected_parameter])
with mr_job_numlinks.make_runner() as runner:
runner.run()
for line in runner.stream_output():
null,count = mr_job.parse_output_line(line)
print "There are "+str(count)+" links in the graph."
return None
def avgDeg(graphfilename):
mr_job_avgdeg = MRJobAvgDeg(args=[graphfilename])
with mr_job_avgdeg.make_runner() as runner:
runner.run()
for line in runner.stream_output():
null,result = mr_job.parse_output_line(line)
print "The average out degree of the graph is "+str(result)+"."
return None
def hw7_1():
#graphfilename = "graph_input_undirected_toy.txt"
#graphfilename = "graph_input_directed_toy.txt"
graphfilename = "graph_input_synNet.txt"
isUndirected = True
numNodes(graphfilename)
numLinks(graphfilename, isUndirected)
avgDeg(graphfilename)
return None
hw7_1()
In [266]:
#%load_ext autoreload
%autoreload 2
from MRJob_NumberOfNodes import MRJobNumNodes
from MRJob_NumberOfLinks import MRJobNumLinks
from MRJob_AvgDegree import MRJobAvgDeg
def numNodes(graphfilename):
#mr_job = MRJobNumNodes(args=[graphfilename, "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
mr_job_numnodes = MRJobNumNodes(args=[graphfilename, "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small"])
with mr_job_numnodes.make_runner() as runner:
runner.run()
for line in runner.stream_output():
null,count = mr_job.parse_output_line(line)
print "There are "+str(count)+" nodes in the graph."
return None
def numLinks(graphfilename, isUndirected):
undirected_parameter = "isUndirected="+str(int(isUndirected))
mr_job_numlinks = MRJobNumLinks(args=[graphfilename, '--jobconf', undirected_parameter, "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small"])
with mr_job_numlinks.make_runner() as runner:
runner.run()
for line in runner.stream_output():
null,count = mr_job.parse_output_line(line)
print "There are "+str(count)+" links in the graph."
return None
def avgDeg(graphfilename):
mr_job_avgdeg = MRJobAvgDeg(args=[graphfilename, "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small"])
with mr_job_avgdeg.make_runner() as runner:
runner.run()
for line in runner.stream_output():
null,result = mr_job.parse_output_line(line)
print "The average out degree of the graph is "+str(result)+"."
return None
def hw7_1():
graphfile = "graph_input_synNet.txt"
isUndirected = True
numNodes(graphfilename)
numLinks(graphfilename, isUndirected)
avgDeg(graphfilename)
return None
hw7_1()
HW 7.2: Shortest path graph distances (NLTK synonyms)
Write (reuse your code from 7.0) an MRJob class to find shortest path graph distances, and apply it to the NLTK synonyms network dataset.
Proof your code's function by running the job:
and showing you code's output. Once again, your output should include the path and the distance.
As you develop your code, please be sure to run it locally first (though on the whole dataset).
Once you have gotten you code to run locally, deploy it on AWS as a systems test in preparation for our next dataset (which will require AWS).
In [114]:
%%writefile MRJob_SingleSourceShortestPath.py
#!/usr/bin/env python
from mrjob.job import MRJob
from mrjob.step import MRStep
from ast import literal_eval
from sys import maxint
class MRJobSSSP(MRJob):
def steps(self):
mapper_only = 0
if mapper_only:
return [MRStep(mapper = self.mapper)]
return [MRStep(
mapper = self.mapper,
reducer = self.reducer
)]
def mapper(self, _, line):
line = line.strip().split('\t')
key = int(line[0])
value = literal_eval(line[1])
destnodes = value[0]
dist = value[1]
path = value[2]
status = value[3]
if status=='Q':
for node in destnodes:
yield int(node), [None, dist+1, path+[node], 'Q']
yield key, [destnodes, dist, path, 'V']
else:
yield key, value
def reducer(self,node,data):
mindist = maxint
statuslist=list()
status = 'U'
pathlen = 0
paths = {}
destnodes = []
for value in data:
if value[0]!=None and len(destnodes)==0:
destnodes = value[0]
dist = int(value[1])
if dist<=mindist:
mindist = dist
if status=='U':
status=value[3]
elif status=='Q' and value[3]=='V':
status='V'
paths[value[3]]=value[2]
if 'V' in paths.keys():
realpath = paths['V']
elif 'Q' in paths.keys():
realpath = paths['Q']
else:
realpath = paths['U']
yield int(node), [destnodes, mindist, realpath, status]
if __name__ == '__main__':
MRJobSSSP.run()
In [127]:
#%load_ext autoreload
%autoreload 2
from MRJob_SingleSourceShortestPath import MRJobSSSP
from ast import literal_eval
from sys import maxint
def index_lookup(word):
with open('indices.txt', 'r') as f:
for line in f.readlines():
line = line.strip().split('\t')
if line[0]==word:
return int(line[1])
return None
def word_lookup(idx):
with open('indices.txt', 'r') as f:
for line in f.readlines():
line = line.strip().split('\t')
if line[1]==idx:
return int(line[0])
return None
def preprocessGraph(graphfilename):
# start at "walk" (index=7827) and ending at "make" (index=536)
startnode = 7827
fpp = open('current_graph.txt', 'w')
fraw = open(graphfilename, 'r')
for line in fraw.readlines():
line = line.strip().split('\t')
key = line[0]
edges = literal_eval(line[1])
if int(key)==startnode:
status = 'Q'
dist = 0
else:
status = 'U'
dist = maxint
destnodes = list(edges.keys())
path = []
if int(key)==startnode:
path=[key]
value = [destnodes, dist, path, status]
fpp.write(key+'\t'+str(value)+'\n')
fpp.close()
fraw.close()
return None
def mrjob_sssp():
mr_job = MRJobSSSP_Directed(args=["current_graph.txt", "--strict-protocols", "-r", "emr", "--num-ec2-instances", "2", "--ec2-task-instance-type", "m1.small", "--pool-emr-job-flows", "--max-hours-idle=1"])
#mr_job = MRJobSSSP_Directed(args=["current_graph.txt", "--strict-protocols"])
stop = False
i = 0
while(not stop):
i+=1
print i
numfrontier = 0
with mr_job.make_runner() as runner:
runner.run()
with open('current_graph.txt', 'w+') as f:
for line in runner.stream_output():
#print line
key,value = mr_job.parse_output_line(line)
if int(key)==536 and value[3]=='Q':
stop=True
print line
f.write(line)
return None
def hw7_2():
graphfilename = "synNet.txt"
preprocessGraph(graphfilename)
mrjob_sssp()
#!cat current_graph.txt
return None
hw7_2()
Main dataset 2: English Wikipedia
For the remainder of this assignment you will explore the English Wikipedia hyperlink network.
The dataset is built from the Sept. 2015 XML snapshot of English Wikipedia.
For this directed network, a link between articles:
A -> B
is defined by the existence of a hyperlink in A pointing to B.
This network also exists in the indexed format:
Data: s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt
Data: s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-in.txt
Data: s3://ucb-mids-mls-networks/wikipedia/indices.txt
but has an index with more detailed data:
(article name) \t (index) \t (in degree) \t (out degree)
In the dictionary, target nodes are keys, link weights are values. Here, a weight indicates the number of time a page links to another. However, for the sake of this assignment, treat this an unweighted network, and set all weights to 1 upon data input.
HW 7.3: Exploratory data analysis (Wikipedia)
Using MRJob, explore the Wikipedia network data on the AWS cloud. Reuse your code from HW 7.1---does is scale well?
Be cautioned that Wikipedia is a directed network, where links are not symmetric.
So, even though a node may be linked to, it will not appear as a primary record itself if it has no out-links.
This means that you may have to ADJUST your code (depending on its design).
To be sure of your code's functionality in this context, run a systems test on the directed_toy.txt network.
HW 7.4: Shortest path graph distances (Wikipedia)
Using MRJob, find shortest path graph distances in the Wikipedia network on the AWS cloud. Reuse your code from 7.2, but once again be warned of Wikipedia being a directed network. To be sure of your code's functionality in this context, run a systems test on the directed_toy.txt network.
When running your code on the Wikipedia network, proof its function by running the job:
and show your code's output.
Once your code is running, find some other shortest paths and report your results.
HW 7.5: Conceptual exercise: Largest single-source network distances
Suppose you wanted to find the largest network distance from a single source, i.e., a node that is the furthest (but still reachable) from a single source.
How would you implement this task? How is this different from finding the shortest path graph distances?
Is this task more difficult to implement than the shortest path distance?
As you respond, please comment on program structure, runtimes, iterations, general system requirements, etc...
This task could be implemented as a breadth-first search with the stopping criteria being no more nodes on the frontier. It would require as many iterations as there are steps in the longest path. It would only be more difficult than finding the shortest path between two nodes in that it would need to run until the entire network has been explored. You would need to keep track of the nodes that are in the frontier in the previous step, because those nodes will be the furthest from the source node at that step.
HW 7.6: Computational exercise: Largest single-source network distances (optional)
Using MRJob, write a code to find the largest graph distance and distance-maximizing nodes from a single-source. Test your code first on the toy networks and synonyms network to proof its function.
In [ ]:
In [ ]: