In this segment we load the dataset and keep only a tiny subset
In [2]:
#data = sc.textFile('/var/datasets/wdc/pld-arc')
data = sc.textFile('/var/datasets/wdc/x0')
In [3]:
small = data.map(
lambda line: [int(x) for x in line.split('\t')]
).filter(
lambda (source, destination): source<1e4 and destination<1e4
)
In [4]:
small.cache()
Out[4]:
In [5]:
small.count()
Out[5]:
In [6]:
small.take(10)
Out[6]:
In [7]:
p = 1000
In [8]:
from pyspark.accumulators import AccumulatorParam
class VectorAccumulatorParam(AccumulatorParam):
def zero(self, value):
return [0.0] * len(value)
def addInPlace(self, val1, val2):
for i in xrange(len(val1)):
val1[i] += val2[i]
return val1
xnew = sc.accumulator([0.0]*p, VectorAccumulatorParam())
In [9]:
x = sc.broadcast([1.0]*p)
In [17]:
x1 = small.map(lambda (source, dest): (dest,1)).reduceByKey(add)
In [18]:
x1.cache()
Out[18]:
In [19]:
x1.count()
Out[19]:
In [45]:
approxjoin = x1.filter(lambda (k,v): v>2).join(small)
In [47]:
x2 = approxjoin.map(lambda (k,v): (v[1], v[0])).reduceByKey(add)
In [50]:
x2.take(5)
Out[50]:
In [51]:
x2 = x1.join(small).map(lambda (k,v): (v[1], v[0])).reduceByKey(add)
In [52]:
x2.cache()
Out[52]:
In [56]:
x2.count()
Out[56]:
In [57]:
x3 = x2.join(small).map(lambda (k,v): (v[1], v[0])).reduceByKey(add)
In [58]:
x3.cache()
Out[58]:
In [59]:
x3.count()
Out[59]:
In [61]:
x3.filter(lambda (k,v): v>0).take(4)
Out[61]:
In [62]:
x4 = x3.join(small).map(lambda (k,v): (v[1], v[0])).reduceByKey(add)
In [63]:
x4.cache()
Out[63]:
In [64]:
x4.count()
Out[64]:
In [67]:
x4.filter(lambda (k,v): v>10000).take(4)
Out[67]:
In [68]:
names = sc.textFile('/var/datasets/wdc/pld-index')
In [69]:
def indexline(line):
parts = line.split('\t')
return (int(parts[1]), parts[0])
In [70]:
index = names.map(indexline)
In [71]:
index.cache()
Out[71]:
In [73]:
index.take(3)
Out[73]:
In [87]:
topnodes = index.join(x4.filter(lambda (k,v): v>7e5))
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: