In [1]:
import pyspark
import numpy as np # we'll be using numpy for some numeric operations
sc = pyspark.SparkContext()
In [2]:
def generate_dataset(N, degree):
# the returned dataset is a list
# each element of this list is a tuple,
# representing one page and its links (to other pages)
# (each page is represented by an integer)
dataset = []
for i in range(N):
dataset.append((i, list(range(i+1, min(N, i+degree+1)))))
return dataset
In [3]:
N = 10000 # number of pages, each represented with a number
degree = 10 # degree of each page (constant for all pages)
# make an RDD
linksRDD = sc.parallelize((i, list(range(i+1, min(N, i+degree+1)))) for i in range(N))
# ask spark to keep it in memory
linksRDD.persist()
# action
N = linksRDD.count()
In [13]:
# initialize rank scores of pages
ranks = linksRDD.map(lambda x: (x[0], 1/N))
In [12]:
## constrants and auxilliary functions
ITERATIONS = 5; alpha = 0.15
def contr(x):
"""
return the contributions of a single page u
"""
node_u, _tmp = x # x is a pair
links, rank = _tmp # _tmp is a pair
result = [] # the result is a list of pairs
for node_v in links:
result.append((node_v, rank / len(links)))
# node u gives 0 contribution to itself
# this guarantees that it'll have a score even
# if no other node links to it
result.append((node_u, 0))
return result
def smoothen(x):
"""
calculate a pagerank score for node v
from the sum of contributions it receives
from other nodes
"""
node_v = x[0] # x is a pair
sum_of_contributions = x[1]
return (node_v, alpha / N + (1 - alpha)* sum_of_contributions)
def add(x, y):
""" return the sum of x and y"""
return x + y
In [14]:
# pagerank computation
for i in range(ITERATIONS):
contribs = linksRDD.join(ranks).flatMap(contr)
ranks = contribs.reduceByKey(add).map(smoothen) # no action yet!
In [17]:
ranks.top(5, lambda x: x[1]) # action - collect result
Out[17]:
In [ ]: