In [1]:
import pyspark
import numpy as np # we'll be using numpy for some numeric operations
sc = pyspark.SparkContext()

In [2]:
def generate_dataset(N, degree):
    # the returned dataset is a list
    # each element of this list is a tuple,
    # representing one page and its links (to other pages)
    # (each page is represented by an integer)
    dataset = []
    for i in range(N):
        dataset.append((i, list(range(i+1, min(N, i+degree+1)))))
    return dataset

In [3]:
N = 10000 # number of pages, each represented with a number
degree = 10 # degree of each page (constant for all pages)

# make an RDD
linksRDD = sc.parallelize((i, list(range(i+1, min(N, i+degree+1)))) for i in range(N))

# ask spark to keep it in memory
linksRDD.persist()

# action
N = linksRDD.count()

In [13]:
# initialize rank scores of pages
ranks = linksRDD.map(lambda x: (x[0], 1/N))

In [12]:
## constrants and auxilliary functions

ITERATIONS = 5; alpha = 0.15

def contr(x):
    """
    return the contributions of a single page u
    """
    node_u, _tmp = x    # x is a pair
    links, rank = _tmp  # _tmp is a pair

    result = []  # the result is a list of pairs
    for node_v in links:
        result.append((node_v, rank / len(links)))
    # node u gives 0 contribution to itself
    # this guarantees that it'll have a score even
    # if no other node links to it
    result.append((node_u, 0)) 
    return result

def smoothen(x):
    """
    calculate a pagerank score for node v
    from the sum of contributions it receives
    from other nodes
    """
    node_v = x[0]  # x is a pair
    sum_of_contributions = x[1]
    return (node_v, alpha / N + (1 - alpha)* sum_of_contributions)

def add(x, y):
    """ return the sum of x and y"""
    return x + y

In [14]:
# pagerank computation
for i in range(ITERATIONS):
    contribs = linksRDD.join(ranks).flatMap(contr)
    ranks = contribs.reduceByKey(add).map(smoothen) # no action yet!

In [17]:
ranks.top(5, lambda x: x[1]) # action - collect result


Out[17]:
[(99, 0.04361374540773896),
 (98, 0.024043260885319356),
 (97, 0.017469765314475517),
 (96, 0.014275219550243882),
 (95, 0.012474413019531248)]


In [ ]: