Eigendecomposition for subset of PLD

Setup

before running pyspark run
export SPARK_DRIVER_MEMORY=20g

Get the subset of the graph

In this segment we load the dataset and keep only a tiny subset

Load first partition of PLD


In [1]:
data = sc.textFile('datasets/wdc/split-pld-arc/')

Keep only nodes with ID within a certain range


In [2]:
size = 2e7
small = data.map(
                   lambda line: [int(x) for x in line.split('\t')]
         ).filter(
                   lambda (source, destination): source<size and destination<size
         )

Persist result


In [3]:
small.cache()


Out[3]:
PythonRDD[2] at RDD at PythonRDD.scala:43

Reading all of the PLD graph takes 300 seconds. Disk read time is not the issue here. This time is necessary even when the whole 11GB of the dataset are cached in memory.

For $size = 2e7$ this step takes 253sec (part=367, vecPart=64)


In [ ]:
small.count()

In [ ]:
small.take(10)

Retrieving one edge uniformly at random takes 34 seconds.

Do the eigenvector calculation

Initialize First Iteration

Alternative implementation:

Partition A by row, and then just sum the rows for in-degree use partitionBy()


In [ ]:
vectorPart = 32*2

In [ ]:
x1 = small.map(
                    lambda (source, dest): (dest,1)
           ).reduceByKey(
                    add,
                    numPartitions = vectorPart
           )

In [ ]:
x1 = normalizevalue(x1)
x1.cache()

The first iteration takes 282 seconds.

For $size = 2e7$ this step takes 57sec (vecPart=64)


In [ ]:
#x1.saveAsTextFile('datasets/wdc/results/full-Iteration1')

In [ ]:
#approxjoin = x1.filter(lambda (k,v): v>2).join(small)
#x2 = approxjoin.map(lambda (k,v): (v[1], v[0])).reduceByKey(add)
#x2.take(5)

Exact iteration

For $size = 1e7$ this step takes 48sec (part=367, vecPart=64)

For $size = 2e7$ this step takes 148sec (part=367, vecPart=64)


In [ ]:
def exactiteration(A, x, part=vectorPart):
    return x.join(
                    A
            ).map(
                    lambda (src,(juice,dest)): (dest, juice)
            ).reduceByKey(
                    add,
                    numPartitions = vectorPart
            )

In [ ]:
def normalizevalues(x):
    sm = float(x.values().sum())
    return x.mapValues(
                        lambda v: v/sm
                      )

In [ ]:
x2 = exactiteration(small, x1, part=vectorPart)

In [ ]:
x2 = normalizevalues(x2)

In [ ]:
x2.cache()

In [ ]:
x2.take(10)

Third iteration


In [ ]:
x3 = exactiteration(small, x2, part=vectorPart)

In [ ]:
x3 = normalizevalues(x3)

In [ ]:
x3.cache()

Fourth Iteration


In [ ]:
x4 = exactiteration(small, x3, part=vectorPart)

In [ ]:
x4 = normalizevalue(x4)
x4.cache()

In [ ]:
x4.take(10)

Fifth Iteration


In [ ]:
x5 = exactiteration(small, x4, part=vectorPart)
x5 = normalizevalue(x5)
x5.cache()

Sixth Iteration


In [ ]:
x6 = exactiteration(small, x5, part=vectorPart)
x6 = normalizevalue(x6)
x6.cache()

Show top domains


In [ ]:
names = sc.textFile('datasets/wdc/pld-index')

In [ ]:
def indexline(line):
    parts = line.split('\t')
    return (int(parts[1]), parts[0])

In [ ]:
index = names.map(indexline).filter(lambda (k,v): k<size)

In [ ]:
index.cache()

In [ ]:
topnodes = index.join(x4.filter(lambda (k,v): v>0.0001))

In [ ]:
topnodes.cache()

In [ ]:
topnodes.take(10)

In [ ]:
topnodes.sortBy(lambda (k,v): v[1], ascending=False).take(10)

In [ ]:
topnodes

In [ ]: