Eigendecomposition for subset of PLD

Get the subset of the graph

In this segment we load the dataset and keep only a tiny subset

Load first partition of PLD


In [2]:
#data = sc.textFile('/var/datasets/wdc/pld-arc')
data = sc.textFile('/var/datasets/wdc/x0')

Keep only nodes with ID less than 1e4


In [3]:
small = data.map(
                   lambda line: [int(x) for x in line.split('\t')]
         ).filter(
                   lambda (source, destination): source<1e4 and destination<1e4
         )

Persist result


In [4]:
small.cache()


Out[4]:
PythonRDD[2] at RDD at PythonRDD.scala:37

In [5]:
small.count()


Out[5]:
2419

In [6]:
small.take(10)


Out[6]:
[[70, 105],
 [86, 2106],
 [107, 103],
 [115, 903],
 [115, 6975],
 [201, 2211],
 [201, 4185],
 [201, 4271],
 [201, 4490],
 [201, 4491]]

Do the eigenvector calculation

Initialize First Iteration


In [7]:
p = 1000

In [8]:
from pyspark.accumulators import AccumulatorParam
class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return [0.0] * len(value)
    def addInPlace(self, val1, val2):
        for i in xrange(len(val1)):
             val1[i] += val2[i]
        return val1
xnew = sc.accumulator([0.0]*p, VectorAccumulatorParam())

In [9]:
x = sc.broadcast([1.0]*p)

In [17]:
x1 = small.map(lambda (source, dest): (dest,1)).reduceByKey(add)

In [18]:
x1.cache()


Out[18]:
PythonRDD[9] at RDD at PythonRDD.scala:37

In [19]:
x1.count()


Out[19]:
746

Approximate iteration


In [45]:
approxjoin = x1.filter(lambda (k,v): v>2).join(small)

In [47]:
x2 = approxjoin.map(lambda (k,v): (v[1], v[0])).reduceByKey(add)

In [50]:
x2.take(5)


Out[50]:
[(7424, 43), (8640, 8), (6368, 870), (7448, 37), (1304, 25)]

Exact iteration


In [51]:
x2 = x1.join(small).map(lambda (k,v): (v[1], v[0])).reduceByKey(add)

In [52]:
x2.cache()


Out[52]:
PythonRDD[124] at RDD at PythonRDD.scala:37

In [56]:
x2.count()


Out[56]:
498

Third iteration


In [57]:
x3 = x2.join(small).map(lambda (k,v): (v[1], v[0])).reduceByKey(add)

In [58]:
x3.cache()


Out[58]:
PythonRDD[138] at RDD at PythonRDD.scala:37

In [59]:
x3.count()


Out[59]:
447

In [61]:
x3.filter(lambda (k,v): v>0).take(4)


Out[61]:
[(6696, 2117), (6352, 25289), (6368, 25289), (6880, 2064)]

Fourth Iteration


In [62]:
x4 = x3.join(small).map(lambda (k,v): (v[1], v[0])).reduceByKey(add)

In [63]:
x4.cache()


Out[63]:
PythonRDD[153] at RDD at PythonRDD.scala:37

In [64]:
x4.count()


Out[64]:
445

Show top domains


In [67]:
x4.filter(lambda (k,v): v>10000).take(4)


Out[67]:
[(6696, 25411), (6352, 734252), (6368, 734252), (6880, 24852)]

In [68]:
names = sc.textFile('/var/datasets/wdc/pld-index')

In [69]:
def indexline(line):
    parts = line.split('\t')
    return (int(parts[1]), parts[0])

In [70]:
index = names.map(indexline)

In [71]:
index.cache()


Out[71]:
PythonRDD[160] at RDD at PythonRDD.scala:37

In [73]:
index.take(3)


Out[73]:
[(0, u'0-------------------------------------------------------------0.com'),
 (1, u'0-------------------------------------------------------------0.dk'),
 (2, u'0-----0.org')]

In [87]:
topnodes = index.join(x4.filter(lambda (k,v): v>7e5))


ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/migish/workspace/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 424, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
---------------------------------------------------------------------------
Py4JNetworkError                          Traceback (most recent call last)
<ipython-input-87-a8a9e4be9c66> in <module>()
----> 1 topnodes = index.join(x4.filter(lambda (k,v): v>7e5))

/home/migish/workspace/spark/python/pyspark/rdd.pyc in join(self, other, numPartitions)
   1038         [('a', (1, 2)), ('a', (1, 3))]
   1039         """
-> 1040         return python_join(self, other, numPartitions)
   1041 
   1042     def leftOuterJoin(self, other, numPartitions=None):

/home/migish/workspace/spark/python/pyspark/join.pyc in python_join(rdd, other, numPartitions)
     49                 wbuf.append(v)
     50         return [(v, w) for v in vbuf for w in wbuf]
---> 51     return _do_python_join(rdd, other, numPartitions, dispatch)
     52 
     53 

/home/migish/workspace/spark/python/pyspark/join.pyc in _do_python_join(rdd, other, numPartitions, dispatch)
     37     vs = rdd.map(lambda (k, v): (k, (1, v)))
     38     ws = other.map(lambda (k, v): (k, (2, v)))
---> 39     return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x : dispatch(x.__iter__()))
     40 
     41 

/home/migish/workspace/spark/python/pyspark/rdd.pyc in union(self, other)
    429         """
    430         if self._jrdd_deserializer == other._jrdd_deserializer:
--> 431             rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
    432                       self._jrdd_deserializer)
    433             return rdd

/home/migish/workspace/spark/python/pyspark/rdd.pyc in _jrdd(self)
   1441         broadcast_vars = ListConverter().convert(
   1442             [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
-> 1443             self.ctx._gateway._gateway_client)
   1444         self.ctx._pickled_broadcast_vars.clear()
   1445         class_tag = self._prev_jrdd.classTag()

/home/migish/workspace/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_collections.py in convert(self, object, gateway_client)
    475     def convert(self, object, gateway_client):
    476         ArrayList = JavaClass('java.util.ArrayList', gateway_client)
--> 477         java_list = ArrayList()
    478         for element in object:
    479             java_list.add(element)

/home/migish/workspace/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    665             args_command +\
    666             END_COMMAND_PART
--> 667         answer = self._gateway_client.send_command(command)
    668         return_value = get_return_value(answer, self._gateway_client, None,
    669                 self._fqn)

/home/migish/workspace/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry)
    359          the Py4J protocol.
    360         """
--> 361         connection = self._get_connection()
    362         try:
    363             response = connection.send_command(command)

/home/migish/workspace/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in _get_connection(self)
    315             connection = self.deque.pop()
    316         except Exception:
--> 317             connection = self._create_connection()
    318         return connection
    319 

/home/migish/workspace/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in _create_connection(self)
    322         connection = GatewayConnection(self.address, self.port,
    323                 self.auto_close, self.gateway_property)
--> 324         connection.start()
    325         return connection
    326 

/home/migish/workspace/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in start(self)
    429                 'server'
    430             logger.exception(msg)
--> 431             raise Py4JNetworkError(msg)
    432 
    433     def close(self):

Py4JNetworkError: An error occurred while trying to connect to the Java server

In [ ]:


In [ ]:

TODO: Plot the masses on first 3 steps for top nodes


In [ ]:


In [ ]:


In [ ]: