PageRank on PLD

Partition the adjacency by row (destination).

Data from

http://webdatacommons.org/hyperlinkgraph/2012-08/download.html

Setup

before running pyspark run
export SPARK_DRIVER_MEMORY=20g

Get the subset of the graph

In this segment we load the dataset and keep only a tiny subset

Load PLD


In [1]:
n = 42889798

In [2]:
data = sc.textFile(
                    'datasets/wdc/split-pld-arc/'
                  ).map(
                    lambda line: [int(x) for x in line.split('\t')]
                  )

Keep only nodes with ID within a certain range


In [3]:
size = int(1e7)
if size < n:
    small = data.filter(
                       lambda (source, destination): source<size and destination<size
            )
else:
    small = data
    size = n

In [4]:
part = 300

In [11]:
def partitionFunction(k):
    return k % part

In [6]:
small = small.partitionBy(part, partitionFunc=partitionFunction)

Persist result


In [7]:
small.cache()


Out[7]:
MappedRDD[5] at values at NativeMethodAccessorImpl.java:-2

Reading all of the PLD graph takes 300 seconds. Disk read time is not the issue here. This time is necessary even when the whole 11GB of the dataset are cached in memory.

For $size = 2e7$ this step takes 253sec (part=367, vecPart=64)


In [8]:
small.count()


Out[8]:
39602638

In [7]:
small.take(10)


Out[7]:
[[7, 2125457],
 [15, 307362],
 [15, 9117384],
 [19, 191843],
 [19, 432746],
 [19, 453674],
 [19, 661321],
 [19, 3972553],
 [21, 6228809],
 [23, 53061]]

Retrieving one edge uniformly at random takes 34 seconds.

Do the eigenvector calculation

Initialize First Iteration

Alternative implementation:

Partition A by row, and then just sum the rows for in-degree use partitionBy()


In [14]:
vectorPart = 32*2

In [9]:
from operator import add

In [10]:
#x0 = range(size).map(lambda k: (k,0.15/float(size)))
#x0 = sc.broadcast(map(lambda k: (k,0.15/float(size)), xrange(size)))
x0 = sc.broadcast([0.15/float(size)]*size)

In [12]:
uniform = sc.parallelize(
                            map(lambda k: (k,0.15/float(size)), xrange(size))
           ).partitionBy(
                            part,
                            partitionFunc=partitionFunction
           )

In [53]:
def iteration(A, bx):
    #A=small
    #bx = x0
    xpart = A.map(
                lambda (src,dst): (dst, bx.value[src]),
                preservesPartitioning=True
    )
    sm = xpart.values().sum()
    xred = xpart.mapValues(
                    lambda v: v/(sm/0.85)
               )
    x1 = xred.union(uniform).reduceByKey(add)
    x1d = x1.sortByKey()
    return sc.broadcast(x1d.values().collect())

In [ ]:


In [54]:
x1 = iteration(small, x0)

In [55]:
x2 = iteration(small, x1)

In [56]:
x3 = iteration(small, x2)

In [57]:
x4 = iteration(small, x3)

In [58]:
x5 = iteration(small, x4)

In [59]:
x6 = iteration(small, x5)

In [60]:
x7 = iteration(small, x6)

In [61]:
x8 = iteration(small, x7)

In [ ]:
x9 = iteration(small, x8)

In [ ]:
x10 = iteration(small, x9)

In [ ]:
x11 = iteration(small, x10)

In [ ]:
x12 = iteration(small, x11)

In [50]:


In [ ]:


In [15]:
def exactiteration(A, x, part=vectorPart):
    return x.join(
                    A
            ).map(
                    lambda (src,(juice,dest)): (dest, juice)
            ).reduceByKey(
                    add,
                    numPartitions = part
            )

In [1]:
def normalizevaluesold(x):
    sm = float(x.values().sum())
    cnt = x.count()
    return x.mapValues(
                        lambda v: 0.15/cnt + 0.85*v/sm
                      )

In [20]:
def normalizevalues(x, x0):
    sm = float(x.values().sum())
    return x.mapValues(
                    lambda v: 0.85*v/sm
           ).union(
                    x0
           ).reduceByKey(
                    add
           )

In [17]:
from operator import add

In [18]:
x1 = exactiteration(small, x0, part=vectorPart)

In [21]:
x1.take(5)


Out[21]:
[(131072, 3.4949999999999932e-06),
 (1572864, 2.399999999999999e-07),
 (7987072, 4.499999999999999e-08),
 (9498880, 1.5e-08),
 (8187840, 1.05e-07)]

In [23]:
x1 = normalizevalues(x1,x0)
x1.cache()


Out[23]:
PythonRDD[35] at RDD at PythonRDD.scala:43

In [69]:
sum(x4.value)


Out[69]:
1.000000000075924

In [61]:
x1.count()


Out[61]:
5441903

Exact iteration

For $size = 1e7$ this step takes 48sec (part=367, vecPart=64)

For $size = 2e7$ this step takes 148sec (part=367, vecPart=64)


In [28]:
x2 = exactiteration(small, x1, part=vectorPart)

In [30]:
x2 = normalizevalues(x2,x0)

In [33]:
x2.cache()


Out[33]:
PythonRDD[58] at RDD at PythonRDD.scala:43

In [41]:
x2.take(10)


Out[41]:
[(1572864, 1.7153520233441514e-08),
 (4718592, 1.5563852797025242e-08),
 (8187840, 1.586072569078231e-08),
 (4093920, 1.5012832925631457e-08),
 (0, 1.505060192799591e-08),
 (8790144, 1.5e-08),
 (4696224, 1.5006416462815728e-08),
 (9994752, 1.5e-08),
 (7340064, 9.20185473366667e-07),
 (7105440, 1.5e-08)]

In [42]:
x2.min()


Out[42]:
(0, 1.505060192799591e-08)

Third iteration


In [43]:
x3 = exactiteration(small, x2, part=vectorPart)

In [44]:
x3 = normalizevalues(x3, x0)

In [45]:
x3.cache()


Out[45]:
PythonRDD[87] at RDD at PythonRDD.scala:43

Fourth Iteration


In [47]:
x4 = exactiteration(small, x3, part=vectorPart)

In [50]:
x4 = normalizevalues(x4, x0)
x4.cache()


Out[50]:
PythonRDD[109] at RDD at PythonRDD.scala:43

In [51]:
x4.take(10)


Out[51]:
[(1572864, 1.8130883376645132e-08),
 (4718592, 1.5513485210752607e-08),
 (8187840, 1.6163038507982754e-08),
 (4093920, 1.501834071614663e-08),
 (0, 1.5068409511011074e-08),
 (8790144, 1.5e-08),
 (4696224, 1.500467724134975e-08),
 (9994752, 1.5e-08),
 (7340064, 9.065388396329538e-07),
 (7105440, 1.5e-08)]

Fifth Iteration


In [52]:
x5 = exactiteration(small, x4, part=vectorPart)
x5 = normalizevalues(x5, x0)
x5.cache()


Out[52]:
PythonRDD[129] at RDD at PythonRDD.scala:43

Sixth Iteration


In [53]:
x6 = exactiteration(small, x5, part=vectorPart)
x6 = normalizevalues(x6,x0)
x6.cache()


Out[53]:
PythonRDD[148] at RDD at PythonRDD.scala:43

Seventh Iteration


In [60]:
x7 = exactiteration(small, x6, part=vectorPart)
x7 = normalizevalues(x7,x0)
x7.cache()


Out[60]:
PythonRDD[173] at RDD at PythonRDD.scala:43

Eigthth Iteration


In [61]:
x8 = exactiteration(small, x7, part=vectorPart)
x8 = normalizevalues(x8,x0)
x8.cache()


Out[61]:
PythonRDD[192] at RDD at PythonRDD.scala:43

Show top domains


In [32]:
names = sc.textFile('datasets/wdc/pld-index')

In [33]:
def indexline(line):
    parts = line.split('\t')
    return (int(parts[1]), parts[0])

In [34]:
index = names.map(indexline).filter(lambda (k,v): k<size)

In [35]:
index.cache()


Out[35]:
PythonRDD[92] at RDD at PythonRDD.scala:43

In [36]:
topnodes = index.join(x4.filter(lambda (k,v): v>0.0001))

In [37]:
topnodes.cache()


Out[37]:
PythonRDD[100] at RDD at PythonRDD.scala:43

In [38]:
topnodes.take(10)


Out[38]:
[]

In [39]:
topnodes.sortBy(lambda (k,v): v[1], ascending=False).take(10)


---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-39-78cedd6852c6> in <module>()
----> 1 topnodes.sortBy(lambda (k,v): v[1], ascending=False).take(10)

/home/alex/yannis/workspace/spark/python/pyspark/rdd.pyc in sortBy(self, keyfunc, ascending, numPartitions)
    631         [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    632         """
--> 633         return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
    634 
    635     def glom(self):

/home/alex/yannis/workspace/spark/python/pyspark/rdd.pyc in sortByKey(self, ascending, numPartitions, keyfunc)
    610         # an implicit boundary
    611         bounds = [samples[len(samples) * (i + 1) / numPartitions]
--> 612                   for i in range(0, numPartitions - 1)]
    613 
    614         def rangePartitioner(k):

IndexError: list index out of range

In [ ]:
topnodes

TODO: Plot the masses on first 3 steps for top nodes


In [ ]:
x3.take(10)

In [ ]:
topnodes.join(x3).take(10)

In [ ]:
topnodes.join(x4).take(10)

In [39]:
x1.filter(lambda (k,v): k==5325333).collect()[0][1]


Out[39]:
0.005060826858038452

In [40]:
x2.filter(lambda (k,v): k==5325333).collect()[0][1]


Out[40]:
2.0952150439535346e-05

In [54]:
x3.filter(lambda (k,v): k==5325333).collect()[0][1]


Out[54]:
0.0028617634230477737

In [55]:
x4.filter(lambda (k,v): k==5325333).collect()[0][1]


Out[55]:
4.183739247497754e-05

In [56]:
x5.filter(lambda (k,v): k==5325333).collect()[0][1]


Out[56]:
0.0014849847721537237

In [57]:
x6.filter(lambda (k,v): k==5325333).collect()[0][1]


Out[57]:
5.6700736438283e-05

In [62]:
x7.filter(lambda (k,v): k==5325333).collect()[0][1]


Out[62]:
0.0006417256547153903

In [63]:
x8.filter(lambda (k,v): k==5325333).collect()[0][1]


Out[63]:
5.22414501075493e-05

In [ ]: