Partition the adjacency by row (destination).
Data from
http://webdatacommons.org/hyperlinkgraph/2012-08/download.html
Setup
before running pyspark run
export SPARK_DRIVER_MEMORY=20g
In this segment we load the dataset and keep only a tiny subset
In [1]:
n = 42889798
In [2]:
data = sc.textFile(
'datasets/wdc/split-pld-arc/'
).map(
lambda line: [int(x) for x in line.split('\t')]
)
In [3]:
size = int(1e7)
if size < n:
small = data.filter(
lambda (source, destination): source<size and destination<size
)
else:
small = data
size = n
In [4]:
part = 300
In [11]:
def partitionFunction(k):
return k % part
In [6]:
small = small.partitionBy(part, partitionFunc=partitionFunction)
In [7]:
small.cache()
Out[7]:
Reading all of the PLD graph takes 300 seconds. Disk read time is not the issue here. This time is necessary even when the whole 11GB of the dataset are cached in memory.
For $size = 2e7$ this step takes 253sec (part=367, vecPart=64)
In [8]:
small.count()
Out[8]:
In [7]:
small.take(10)
Out[7]:
Retrieving one edge uniformly at random takes 34 seconds.
Alternative implementation:
Partition A by row, and then just sum the rows for in-degree
use partitionBy()
In [14]:
vectorPart = 32*2
In [9]:
from operator import add
In [10]:
#x0 = range(size).map(lambda k: (k,0.15/float(size)))
#x0 = sc.broadcast(map(lambda k: (k,0.15/float(size)), xrange(size)))
x0 = sc.broadcast([0.15/float(size)]*size)
In [12]:
uniform = sc.parallelize(
map(lambda k: (k,0.15/float(size)), xrange(size))
).partitionBy(
part,
partitionFunc=partitionFunction
)
In [53]:
def iteration(A, bx):
#A=small
#bx = x0
xpart = A.map(
lambda (src,dst): (dst, bx.value[src]),
preservesPartitioning=True
)
sm = xpart.values().sum()
xred = xpart.mapValues(
lambda v: v/(sm/0.85)
)
x1 = xred.union(uniform).reduceByKey(add)
x1d = x1.sortByKey()
return sc.broadcast(x1d.values().collect())
In [ ]:
In [54]:
x1 = iteration(small, x0)
In [55]:
x2 = iteration(small, x1)
In [56]:
x3 = iteration(small, x2)
In [57]:
x4 = iteration(small, x3)
In [58]:
x5 = iteration(small, x4)
In [59]:
x6 = iteration(small, x5)
In [60]:
x7 = iteration(small, x6)
In [61]:
x8 = iteration(small, x7)
In [ ]:
x9 = iteration(small, x8)
In [ ]:
x10 = iteration(small, x9)
In [ ]:
x11 = iteration(small, x10)
In [ ]:
x12 = iteration(small, x11)
In [50]:
In [ ]:
In [15]:
def exactiteration(A, x, part=vectorPart):
return x.join(
A
).map(
lambda (src,(juice,dest)): (dest, juice)
).reduceByKey(
add,
numPartitions = part
)
In [1]:
def normalizevaluesold(x):
sm = float(x.values().sum())
cnt = x.count()
return x.mapValues(
lambda v: 0.15/cnt + 0.85*v/sm
)
In [20]:
def normalizevalues(x, x0):
sm = float(x.values().sum())
return x.mapValues(
lambda v: 0.85*v/sm
).union(
x0
).reduceByKey(
add
)
In [17]:
from operator import add
In [18]:
x1 = exactiteration(small, x0, part=vectorPart)
In [21]:
x1.take(5)
Out[21]:
In [23]:
x1 = normalizevalues(x1,x0)
x1.cache()
Out[23]:
In [69]:
sum(x4.value)
Out[69]:
In [61]:
x1.count()
Out[61]:
For $size = 1e7$ this step takes 48sec (part=367, vecPart=64)
For $size = 2e7$ this step takes 148sec (part=367, vecPart=64)
In [28]:
x2 = exactiteration(small, x1, part=vectorPart)
In [30]:
x2 = normalizevalues(x2,x0)
In [33]:
x2.cache()
Out[33]:
In [41]:
x2.take(10)
Out[41]:
In [42]:
x2.min()
Out[42]:
In [43]:
x3 = exactiteration(small, x2, part=vectorPart)
In [44]:
x3 = normalizevalues(x3, x0)
In [45]:
x3.cache()
Out[45]:
In [47]:
x4 = exactiteration(small, x3, part=vectorPart)
In [50]:
x4 = normalizevalues(x4, x0)
x4.cache()
Out[50]:
In [51]:
x4.take(10)
Out[51]:
In [52]:
x5 = exactiteration(small, x4, part=vectorPart)
x5 = normalizevalues(x5, x0)
x5.cache()
Out[52]:
In [53]:
x6 = exactiteration(small, x5, part=vectorPart)
x6 = normalizevalues(x6,x0)
x6.cache()
Out[53]:
In [60]:
x7 = exactiteration(small, x6, part=vectorPart)
x7 = normalizevalues(x7,x0)
x7.cache()
Out[60]:
In [61]:
x8 = exactiteration(small, x7, part=vectorPart)
x8 = normalizevalues(x8,x0)
x8.cache()
Out[61]:
In [32]:
names = sc.textFile('datasets/wdc/pld-index')
In [33]:
def indexline(line):
parts = line.split('\t')
return (int(parts[1]), parts[0])
In [34]:
index = names.map(indexline).filter(lambda (k,v): k<size)
In [35]:
index.cache()
Out[35]:
In [36]:
topnodes = index.join(x4.filter(lambda (k,v): v>0.0001))
In [37]:
topnodes.cache()
Out[37]:
In [38]:
topnodes.take(10)
Out[38]:
In [39]:
topnodes.sortBy(lambda (k,v): v[1], ascending=False).take(10)
In [ ]:
topnodes
In [ ]:
x3.take(10)
In [ ]:
topnodes.join(x3).take(10)
In [ ]:
topnodes.join(x4).take(10)
In [39]:
x1.filter(lambda (k,v): k==5325333).collect()[0][1]
Out[39]:
In [40]:
x2.filter(lambda (k,v): k==5325333).collect()[0][1]
Out[40]:
In [54]:
x3.filter(lambda (k,v): k==5325333).collect()[0][1]
Out[54]:
In [55]:
x4.filter(lambda (k,v): k==5325333).collect()[0][1]
Out[55]:
In [56]:
x5.filter(lambda (k,v): k==5325333).collect()[0][1]
Out[56]:
In [57]:
x6.filter(lambda (k,v): k==5325333).collect()[0][1]
Out[57]:
In [62]:
x7.filter(lambda (k,v): k==5325333).collect()[0][1]
Out[62]:
In [63]:
x8.filter(lambda (k,v): k==5325333).collect()[0][1]
Out[63]:
In [ ]: