In [1]:
sc


Out[1]:
<pyspark.context.SparkContext at 0x7f8f606ce250>

In [2]:
from pyspark.sql import HiveContext, Row
sqlContext = HiveContext(sc)

In [3]:
import block as blk
import struct
import io

In [4]:
%load_ext autoreload
%autoreload 2

In [5]:
import hashlib
h1 = hashlib.sha256("this is a test").hexdigest()
print(h1)
h2 = hashlib.sha256(h1).hexdigest()
print(h2)


2e99758548972a8e8822ad47fa1017ff72f06f3ff6a016851f45c398732bc50c
34610a7dc634395a3f5f8b5cbcae0be10604358f04acb8f3fd63a9e9369b83d9

In [6]:
print(sc.version)
print(sc.defaultParallelism)


1.2.0
4

In [8]:
# compute a block-hash
ver = 1       
prev_block = "0000000000000000000000000000000000000000000000000000000000000000"
mrkl_root = "4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b"
time_ = 0x495fab29
bits = 0x1d00ffff
nonce = 0x7c2bac1d

header = ( struct.pack("<L", ver) + prev_block.decode('hex')[::-1] +
    mrkl_root.decode('hex')[::-1] + struct.pack("<LLL", time_, bits, nonce))
hash = hashlib.sha256(hashlib.sha256(header).digest()).digest() 
print(hash[::-1].encode('hex'))


000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f

In [9]:
# parse a single .dat file
blocks = blk.parseBlockFile("/home/jeff/prj/bitcoin-block-data/blk00000.dat")

In [10]:
blks = sc.binaryFiles("/home/jeff/prj/bitcoin-block-data/")

In [11]:
print(blks.count())


2

In [12]:
def printFirstByte(f):
    print(f.read(10))

In [13]:
print(blks.keys().first())


file:/home/jeff/prj/bitcoin-block-data/blk00000.dat

In [14]:
parsed = blks.flatMap(lambda x: blk.parseBlockBytes(io.BytesIO(x[1])))

In [15]:
print(parsed.count())


Traceback (most recent call last):
  File "/usr/lib/python2.7/SocketServer.py", line 295, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python2.7/SocketServer.py", line 321, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python2.7/SocketServer.py", line 334, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python2.7/SocketServer.py", line 649, in __init__
    self.handle()
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/accumulators.py", line 246, in handle
    num_updates = read_int(self.rfile)
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/serializers.py", line 511, in read_int
    raise EOFError
EOFError
ERROR:py4j.java_gateway:Error while sending or receiving.
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 479, in send_command
    raise Py4JError("Answer from Java side is empty")
Py4JError: Answer from Java side is empty
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 35765)
----------------------------------------
---------------------------------------------------------------------------
Py4JNetworkError                          Traceback (most recent call last)
<ipython-input-15-49587a4b379d> in <module>()
----> 1 print(parsed.count())

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/rdd.pyc in count(self)
    818         3
    819         """
--> 820         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
    821 
    822     def stats(self):

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/rdd.pyc in sum(self)
    809         6.0
    810         """
--> 811         return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
    812 
    813     def count(self):

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/rdd.pyc in reduce(self, f)
    714             yield reduce(f, iterator, initial)
    715 
--> 716         vals = self.mapPartitions(func).collect()
    717         if vals:
    718             return reduce(f, vals)

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/rdd.pyc in collect(self)
    675         """
    676         with SCCallSiteSync(self.context) as css:
--> 677             bytesInJava = self._jrdd.collect().iterator()
    678         return list(self._collect_iterator_through_file(bytesInJava))
    679 

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/traceback_utils.pyc in __exit__(self, type, value, tb)
     76         SCCallSiteSync._spark_stack_depth -= 1
     77         if SCCallSiteSync._spark_stack_depth == 0:
---> 78             self._context._jsc.setCallSite(None)

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    534             END_COMMAND_PART
    535 
--> 536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
    538                 self.target_id, self.name)

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry)
    360          the Py4J protocol.
    361         """
--> 362         connection = self._get_connection()
    363         try:
    364             response = connection.send_command(command)

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in _get_connection(self)
    316             connection = self.deque.pop()
    317         except Exception:
--> 318             connection = self._create_connection()
    319         return connection
    320 

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in _create_connection(self)
    323         connection = GatewayConnection(self.address, self.port,
    324                 self.auto_close, self.gateway_property)
--> 325         connection.start()
    326         return connection
    327 

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in start(self)
    430                 'server'
    431             logger.exception(msg)
--> 432             raise Py4JNetworkError(msg)
    433 
    434     def close(self):

Py4JNetworkError: An error occurred while trying to connect to the Java server
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/java_gateway.py", line 103, in run
    sys.stderr.write(line)
  File "/usr/local/lib/python2.7/dist-packages/IPython/kernel/zmq/iostream.py", line 205, in write
    self._buffer.write(string)
ValueError: I/O operation on closed file


In [15]:
parsed.first().printBlock()


magic_no:	0xd9b4bef9
size:    	285 bytes
Block header:	
		Version: 1 
		PreviousHash: 0000000000000000000000000000000000000000000000000000000000000000 
		Merkle: 4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b 
		Time: 495fab29 
		Bits: 1d00ffff 
		Nonce: 7c2bac1d 
		Prefix: 0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c 
		BlockHash: 000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f 
		
Transactions: 	1
==================================================
 TX NUMBER: 1
==================================================
Version: 1
Inputs count: 1
---Inputs---
PrevHash: 0000000000000000000000000000000000000000000000000000000000000000 
Prev Tx out index: 4294967295 
Txin Script Len: 77 
scriptSig: 04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73 
Sequence: ffffffff
Outputs count: 1
---Outputs---
Value (satoshis): 5000000000 (50.000000 btc)
Txout Script Len: 67
scriptPubKey: 4104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac
Lock_time:       0



In [8]:
print(len(blocks))


119965

In [9]:
blocks[0].printBlock()
blocks[100000].printBlock()


magic_no:	0xd9b4bef9
size:    	285 bytes
Block header:	
		Version: 1 
		PreviousHash: 0000000000000000000000000000000000000000000000000000000000000000 
		Merkle: 4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b 
		Time: 495fab29 
		Bits: 1d00ffff 
		Nonce: 7c2bac1d 
		Prefix: 0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c 
		BlockHash: 000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f 
		
Transactions: 	1
==================================================
 TX NUMBER: 1
==================================================
Version: 1
Inputs count: 1
---Inputs---
PrevHash: 0000000000000000000000000000000000000000000000000000000000000000 
Prev Tx out index: 4294967295 
Txin Script Len: 77 
scriptSig: 04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73 
Sequence: ffffffff
Outputs count: 1
---Outputs---
Value (satoshis): 5000000000 (50.000000 btc)
Txout Script Len: 67
scriptPubKey: 4104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac
Lock_time:       0


magic_no:	0xd9b4bef9
size:    	957 bytes
Block header:	
		Version: 1 
		PreviousHash: 000000000002d01c1fccc21636b607dfd930d31d01c3a62104612a1719011250 
		Merkle: f3e94742aca4b5ef85488dc37c06c3282295ffec960994b2c0d5ac2a25a95766 
		Time: 4d1b2237 
		Bits: 1b04864c 
		Nonce: 10572b0f 
		Prefix: 0100000050120119172a610421a6c3011dd330d9df07b63616c2cc1f1cd00200000000006657a9252aacd5c0b2940996ecff952228c3067cc38d4885efb5a4ac4247e9f337221b4d4c86041b0f2b5710 
		BlockHash: 000000000003ba27aa200b1cecaad478d2b00432346c3f1f3986da1afd33e506 
		
Transactions: 	4
==================================================
 TX NUMBER: 1
==================================================
Version: 1
Inputs count: 1
---Inputs---
PrevHash: 0000000000000000000000000000000000000000000000000000000000000000 
Prev Tx out index: 4294967295 
Txin Script Len: 8 
scriptSig: 044c86041b020602 
Sequence: ffffffff
Outputs count: 1
---Outputs---
Value (satoshis): 5000000000 (50.000000 btc)
Txout Script Len: 67
scriptPubKey: 41041b0e8c2567c12536aa13357b79a073dc4444acb83c4ec7a0e2f99dd7457516c5817242da796924ca4e99947d087fedf9ce467cb9f7c6287078f801df276fdf84ac
Lock_time:       0


==================================================
 TX NUMBER: 2
==================================================
Version: 1
Inputs count: 1
---Inputs---
PrevHash: 87a157f3fd88ac7907c05fc55e271dc4acdc5605d187d646604ca8c0e9382e03 
Prev Tx out index: 0 
Txin Script Len: 140 
scriptSig: 493046022100c352d3dd993a981beba4a63ad15c209275ca9470abfcd57da93b58e4eb5dce82022100840792bc1f456062819f15d33ee7055cf7b5ee1af1ebcc6028d9cdb1c3af7748014104f46db5e9d61a9dc27b8d64ad23e7383a4e6ca164593c2527c038c0857eb67ee8e825dca65046b82c9331586c82e0fd1f633f25f87c161bc6f8a630121df2b3d3 
Sequence: ffffffff
Outputs count: 2
---Outputs---
Value (satoshis): 556000000 (5.560000 btc)
Txout Script Len: 25
scriptPubKey: 76a914c398efa9c392ba6013c5e04ee729755ef7f58b3288ac
Value (satoshis): 4444000000 (44.440000 btc)
Txout Script Len: 25
scriptPubKey: 76a914948c765a6914d43f2a7ac177da2c2f6b52de3d7c88ac
Lock_time:       0


==================================================
 TX NUMBER: 3
==================================================
Version: 1
Inputs count: 1
---Inputs---
PrevHash: cf4e2978d0611ce46592e02d7e7daf8627a316ab69759a9f3df109a7f2bf3ec3 
Prev Tx out index: 1 
Txin Script Len: 138 
scriptSig: 4730440220032d30df5ee6f57fa46cddb5eb8d0d9fe8de6b342d27942ae90a3231e0ba333e02203deee8060fdc70230a7f5b4ad7d7bc3e628cbe219a886b84269eaeb81e26b4fe014104ae31c31bf91278d99b8377a35bbce5b27d9fff15456839e919453fc7b3f721f0ba403ff96c9deeb680e5fd341c0fc3a7b90da4631ee39560639db462e9cb850f 
Sequence: ffffffff
Outputs count: 2
---Outputs---
Value (satoshis): 1000000 (0.010000 btc)
Txout Script Len: 25
scriptPubKey: 76a914b0dcbf97eabf4404e31d952477ce822dadbe7e1088ac
Value (satoshis): 299000000 (2.990000 btc)
Txout Script Len: 25
scriptPubKey: 76a9146b1281eec25ab4e1e0793ff4e08ab1abb3409cd988ac
Lock_time:       0


==================================================
 TX NUMBER: 4
==================================================
Version: 1
Inputs count: 1
---Inputs---
PrevHash: f4515fed3dc4a19b90a317b9840c243bac26114cf637522373a7d486b372600b 
Prev Tx out index: 0 
Txin Script Len: 140 
scriptSig: 493046022100bb1ad26df930a51cce110cf44f7a48c3c561fd977500b1ae5d6b6fd13d0b3f4a022100c5b42951acedff14abba2736fd574bdb465f3e6f8da12e2c5303954aca7f78f3014104a7135bfe824c97ecc01ec7d7e336185c81e2aa2c41ab175407c09484ce9694b44953fcb751206564a9c24dd094d42fdbfdd5aad3e063ce6af4cfaaea4ea14fbb 
Sequence: ffffffff
Outputs count: 1
---Outputs---
Value (satoshis): 1000000 (0.010000 btc)
Txout Script Len: 25
scriptPubKey: 76a91439aa3d569e06a1d7926dc4be1193c99bf2eb9ee088ac
Lock_time:       0



In [7]:
# create a spark partition for each .dat file
files = ["blk00000.dat","blk00001.dat"]
blockFileNames = sc.parallelize(files,len(files))

In [8]:
blocksRDD = blockFileNames.flatMap(lambda x: blk.parseBlockFile(x)). \
                keyBy(lambda x: (x.getBlockHash(),x.getBlockPrevHash(),x.getBlockDifficulty())).repartition(4)

In [9]:
print(blockFileNames.glom().collect())


[['blk00000.dat'], ['blk00001.dat']]

In [10]:
print(blocksRDD.count())


131237

In [11]:
tmp = blocksRDD.take(2)

In [12]:
print(tmp)


[(('00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048', '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f', 486604799), <block.Block object at 0x7effa8571690>), (('000000006a625f06636b8bb6ac7b960a8d03705d1ace08b1a19da3fdcc99ddbd', '00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048', 486604799), <block.Block object at 0x7effa85717d0>)]

In [13]:
headers = blocksRDD.keys().collect()  # returns dict{key = blockhash, value = prev-blockhash}

In [14]:
from collections import defaultdict
headersMap = defaultdict(list)
for h in headers:
    headersMap[h[1]].append((h[0],h[2]))

In [15]:
len(headers)


Out[15]:
131237

In [16]:
len(set(headersMap.keys()))


Out[16]:
131237

In [17]:
# set the genesis header hash
genesisPrev = '0000000000000000000000000000000000000000000000000000000000000000'
genesisHash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
genesisDifficulty = 0x1d00ffff

In [18]:
import graph_tool as gt
import graph_tool.util as gtutil

In [19]:
# construct the header graph
headerChain = gt.Graph() # key = block hash, value = list of conneced nodes
v_hash = headerChain.new_vertex_property("string")
v_difficulty = headerChain.new_vertex_property("int")
curHash = genesisHash # the header hash we are looking for in prevHash

# add the vertex
v = headerChain.add_vertex()
v_hash[v] = curHash
v_difficulty[v] = genesisDifficulty


def generateBlockGraph(vertex,graph,vhashes,vdifficulty,blockMap):
    # uses depth-first search to construct graph
    vList = []
    discovered = set()
    vList.append(vertex)
    while (len(vList) > 0):
        vertex = vList.pop()
        if (vertex not in discovered): # only process the vertex if is has not been discovered
            discovered.add(vertex);
            # find blocks above this one and connect them up
            if blockMap.has_key(vhashes[vertex]):
                nextHeaders = blockMap[vhashes[vertex]] # returns a list of connected headers
                for h in nextHeaders:
                    v = graph.add_vertex() # add vertex
                    vhashes[v] = h[0]  # set the property
                    vdifficulty[v] = h[1]
                    graph.add_edge(vertex,v) # add the edge
                    vList.append(v)
    return (graph, vhashes, vdifficulty)

res = generateBlockGraph(v, headerChain, v_hash, v_difficulty, headersMap)
headerChain = res[0]
v_hash = res[1]
v_difficulty = res[2]

In [20]:
# plot the graph (takes too long!)
#gd.graph_draw(headerChain, vertex_text=g.vertex_index,vertex_font_size=8, output_size=(200, 200), output="block-chain.png")

In [21]:
print(res)
print(v_hash[headerChain.vertex(0)])
print(v_hash[headerChain.vertex(1)])
print(v_difficulty[headerChain.vertex(0)])
print(v_difficulty[headerChain.vertex(1)])


(<Graph object, directed, with 131237 vertices and 131236 edges at 0x7eff88584cd0>, <PropertyMap object with key type 'Vertex' and value type 'string', for Graph 0x7eff88584cd0, at 0x7eff88584c90>, <PropertyMap object with key type 'Vertex' and value type 'int32_t', for Graph 0x7eff88584cd0, at 0x7eff88584d50, with values:
[486604799 486604799 486604799 ..., 437461381 437461381 437461381]>)
000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f
00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048
486604799
486604799

In [22]:
# get the leaf nodes
headerChain.set_vertex_filter(None)
v_leafs = headerChain.new_vertex_property("bool")
leafs = []
for v in headerChain.vertices():
    if (v.out_degree() == 0):
        v_leafs[v] = True
        leafs.append(v)
    else:
        v_leafs[v] = False
        
print([v_hash[v] for v in leafs])


['0000000000000177ca06002a5d22214a0ba3122da467ffd7d70e2f61836e4198']

In [23]:
# find longest chain
# TO DO: compute length based on difficulty instead of number of blocks
headerChain.set_vertex_filter(None)
blockCount = []
for leaf in leafs:
    # count the number of verticies in this chain
    count = 0
    v = leaf
    while (v.in_degree() > 0):
        count = count + 1
        v = v.in_neighbours().next() # relies on the fact that nodes have only one input
    blockCount.append(count)
    
print("blockCounts: " + str(blockCount))
mainLeaf = leafs[blockCount.index(max(blockCount))]
print("main leaf index: " + str(mainLeaf))


blockCounts: [131236]
main leaf index: 131236

In [24]:
# make a propety map for the main block chain
on_chain = headerChain.new_vertex_property("bool",vals=False)
v = mainLeaf
on_chain[v] = True
while (v.in_degree() > 0):
    v = v.in_neighbours().next() # relies on the fact that nodes have only one input
    on_chain[v] = True

In [25]:
# assign block numbers
block_index = headerChain.new_vertex_property("int",vals = -1)
headerChain.set_vertex_filter(on_chain)
count = 0
v = gtutil.find_vertex(headerChain,v_hash,genesisHash)[0]
while (v.out_degree() > 0):
    block_index[v] = count  #first block has index 0
    count = count + 1
    v = v.out_neighbours().next() # relies on the fact that nodes have only one input

In [26]:
# print the first 25 block headers with block index
v = gtutil.find_vertex(headerChain,v_hash,genesisHash)[0]
count = 0
while (count < 25):
    curHash = v_hash[v]
    blkIdx = block_index[v]
    print("Block Hash: " + str(curHash) + " Block Index: " + str(blkIdx))
    count = count + 1
    v = v.out_neighbours().next() # relies on the fact that nodes have only one input


Block Hash: 000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f Block Index: 0
Block Hash: 00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048 Block Index: 1
Block Hash: 000000006a625f06636b8bb6ac7b960a8d03705d1ace08b1a19da3fdcc99ddbd Block Index: 2
Block Hash: 0000000082b5015589a3fdf2d4baff403e6f0be035a5d9742c1cae6295464449 Block Index: 3
Block Hash: 000000004ebadb55ee9096c9a2f8880e09da59c0d68b1c228da88e48844a1485 Block Index: 4
Block Hash: 000000009b7262315dbf071787ad3656097b892abffd1f95a1a022f896f533fc Block Index: 5
Block Hash: 000000003031a0e73735690c5a1ff2a4be82553b2a12b776fbd3a215dc8f778d Block Index: 6
Block Hash: 0000000071966c2b1d065fd446b1e485b2c9d9594acd2007ccbd5441cfc89444 Block Index: 7
Block Hash: 00000000408c48f847aa786c2268fc3e6ec2af68e8468a34a28c61b7f1de0dc6 Block Index: 8
Block Hash: 000000008d9dc510f23c2657fc4f67bea30078cc05a90eb89e84cc475c080805 Block Index: 9
Block Hash: 000000002c05cc2e78923c34df87fd108b22221ac6076c18f3ade378a4d915e9 Block Index: 10
Block Hash: 0000000097be56d606cdd9c54b04d4747e957d3608abe69198c661f2add73073 Block Index: 11
Block Hash: 0000000027c2488e2510d1acf4369787784fa20ee084c258b58d9fbd43802b5e Block Index: 12
Block Hash: 000000005c51de2031a895adc145ee2242e919a01c6d61fb222a54a54b4d3089 Block Index: 13
Block Hash: 0000000080f17a0c5a67f663a9bc9969eb37e81666d9321125f0e293656f8a37 Block Index: 14
Block Hash: 00000000b3322c8c3ef7d2cf6da009a776e6a99ee65ec5a32f3f345712238473 Block Index: 15
Block Hash: 00000000174a25bb399b009cc8deff1c4b3ea84df7e93affaaf60dc3416cc4f5 Block Index: 16
Block Hash: 000000003ff1d0d70147acfbef5d6a87460ff5bcfce807c2d5b6f0a66bfdf809 Block Index: 17
Block Hash: 000000008693e98cf893e4c85a446b410bb4dfa129bd1be582c09ed3f0261116 Block Index: 18
Block Hash: 00000000841cb802ca97cf20fb9470480cae9e5daa5d06b4a18ae2d5dd7f186f Block Index: 19
Block Hash: 0000000067a97a2a37b8f190a17f0221e9c3f4fa824ddffdc2e205eae834c8d7 Block Index: 20
Block Hash: 000000006f016342d1275be946166cff975c8b27542de70a7113ac6d1ef3294f Block Index: 21
Block Hash: 0000000098b58d427a10c860335a21c1a9a7639e96c3d6f1a03d8c8c885b5e3b Block Index: 22
Block Hash: 000000000cd339982e556dfffa9de94744a4135c53eeef15b7bcc9bdeb9c2182 Block Index: 23
Block Hash: 00000000fc051fbbce89a487e811a5d4319d209785ea4f4b27fc83770d1e415f Block Index: 24

In [27]:
# create a dict with key = headerHash and value = (blockIdx, on_chain)
headerChain.set_vertex_filter(None)
blockDict = {}
for v in headerChain.vertices():
    blockDict[v_hash[v]] = (block_index[v], on_chain[v])

In [28]:
# define a map function to insert block index and on_chain into RDD
def genAddMeta(metaDict):
    '''constructs a closure to use as a mapper function'''
    
    def addMeta(el):
        blkHash = el[0][0]
        meta = metaDict[blkHash]
        newEl = list(el)
        newEl.append(meta[0])  #block_index
        newEl.append(meta[1])  #on_chain
        return tuple(newEl)
    
    return addMeta

In [29]:
# add meta data to the RDD
addMetaFn = genAddMeta(blockDict)
blocksRDDv2 = blocksRDD.map(addMetaFn)
blocksRDDv2.count()


----init: sc._temp_dir ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e-----------------
----init: f ----<open file '<fdopen>', mode 'w+b' at 0x7eff8d74ef60>-----------------
----init: _path ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmp4NGuAd-----------------
----unpersist: self._path: ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmp4NGuAd-----------------
Out[29]:
131237

In [30]:
tmp = blocksRDDv2.take(5)
print(tmp)
print(tmp[0][0])
print(tmp[0][1])
print(tmp[0][2])


----init: sc._temp_dir ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e-----------------
----init: f ----<open file '<fdopen>', mode 'w+b' at 0x7eff8d74eb70>-----------------
----init: _path ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpgT186p-----------------
----init: sc._temp_dir ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e-----------------
----init: f ----<open file '<fdopen>', mode 'w+b' at 0x7eff8d74eb70>-----------------
----init: _path ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpjgzXzB-----------------
----unpersist: self._path: ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpjgzXzB-----------------
[(('00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048', '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f', 486604799), <block.Block object at 0x7eff885849d0>, 1, 1), (('000000006a625f06636b8bb6ac7b960a8d03705d1ace08b1a19da3fdcc99ddbd', '00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048', 486604799), <block.Block object at 0x7eff88584bd0>, 2, 1), (('000000009700ff3494f215c412cd8c0ceabf1deb0df03ce39bcfc223b769d3c4', '00000000bc919cfb64f62de736d55cf79e3d535b474ace256b4fbb56073f64db', 486604799), <block.Block object at 0x7eff88584dd0>, 31, 1), (('00000000e5cb7c6c273547b0c9421b01e23310ed83f934b96270f35a4d66f6e3', '000000009700ff3494f215c412cd8c0ceabf1deb0df03ce39bcfc223b769d3c4', 486604799), <block.Block object at 0x7eff88584f50>, 32, 1), (('00000000a87073ea3d7af299e02a434598b9c92094afa552e0711afcc0857962', '00000000e5cb7c6c273547b0c9421b01e23310ed83f934b96270f35a4d66f6e3', 486604799), <block.Block object at 0x7eff8522d0d0>, 33, 1)]
('00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048', '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f', 486604799)
<block.Block object at 0x7eff885849d0>
1

In [31]:
blocksRDDv2.getNumPartitions()


Out[31]:
4

In [32]:
blocksRDDv2.count()


----init: sc._temp_dir ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e-----------------
----init: f ----<open file '<fdopen>', mode 'w+b' at 0x7eff8d74ef60>-----------------
----init: _path ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpmHUUSF-----------------
----unpersist: self._path: ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpmHUUSF-----------------
Out[32]:
131237

In [33]:
%%bash
rm -r blocksRDDv2.txt

In [34]:
blocksRDDv2.saveAsPickleFile('blocksRDDv2.txt')

In [35]:
blocksRDDv3 = sc.pickleFile('blocksRDDv2.txt')

In [36]:
blocksRDDv3.count()


Out[36]:
131237

In [37]:
tmp = blocksRDDv3.take(2)
print(tmp[0][0])
print(tmp[1][1])


('00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048', '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f', 486604799)
<block.Block object at 0x7eff8522df90>

In [38]:
# convert to schema RDD
def toTxs(x):
    '''Return a list of transactions for each input block'''
    txs = []
    blk = x[1]
    numTxs = blk.getNumTxs()
    for i in range(numTxs):
        currentDict = {}
        currentDict = blk.updateTxDict(i,currentDict)
        currentDict['block_index'] = x[2]
        currentDict['on_chain'] = x[3]
        txs.append(Row(**currentDict))
    return txs

txRDD = blocksRDDv2.flatMap(lambda x: toTxs(x))
txRDD.count()


----init: sc._temp_dir ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e-----------------
----init: f ----<open file '<fdopen>', mode 'w+b' at 0x7eff8d74ea50>-----------------
----init: _path ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpQ73H3j-----------------
----unpersist: self._path: ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpQ73H3j-----------------
Out[38]:
754445

In [39]:
%%bash
rm -r txRDD.txt

In [40]:
txRDD.saveAsPickleFile("txRDD.txt")


----init: sc._temp_dir ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e-----------------
----init: f ----<open file '<fdopen>', mode 'w+b' at 0x7eff9463d150>-----------------
----init: _path ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpH_jzhc-----------------

In [41]:
schTxRDD = sqlContext.inferSchema(txRDD)
schTxRDD.registerTempTable("schTxRDD")


----init: sc._temp_dir ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e-----------------
----init: f ----<open file '<fdopen>', mode 'w+b' at 0x7eff9463d150>-----------------
----init: _path ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmp78xKoe-----------------
----unpersist: self._path: ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmp78xKoe-----------------
----init: sc._temp_dir ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e-----------------
----init: f ----<open file '<fdopen>', mode 'w+b' at 0x7eff9463d150>-----------------
----init: _path ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpZgY5gE-----------------
----init: sc._temp_dir ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e-----------------
----init: f ----<open file '<fdopen>', mode 'w+b' at 0x7eff9463d150>-----------------
----init: _path ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpjhQOMJ-----------------
----unpersist: self._path: ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpjhQOMJ-----------------
----init: sc._temp_dir ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e-----------------
----init: f ----<open file '<fdopen>', mode 'w+b' at 0x7eff9463d150>-----------------
----init: _path ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpfilSxo-----------------
----unpersist: self._path: ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpfilSxo-----------------
----unpersist: self._path: ----/tmp/spark-2f62ae62-7af9-40fd-a0ff-0e4c02ceb74e/tmpZgY5gE-----------------

In [42]:
schTxRDD.getNumPartitions()


Out[42]:
4

In [43]:
%%bash
rm -r 'schTxRDD.txt'


rm: cannot remove ‘schTxRDD.txt’: No such file or directory

In [44]:
schTxRDD.getStorageLevel()


Out[44]:
StorageLevel(False, False, False, False, 1)

In [45]:
schTxRDD.count()


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-45-4d609412cd08> in <module>()
----> 1 schTxRDD.count()

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/sql.pyc in count(self)
   1959         True
   1960         """
-> 1961         return self._jschema_rdd.count()
   1962 
   1963     def collect(self):

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o174.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 1 times, most recent failure: Lost task 0.0 in stage 26.0 (TID 52, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py", line 99, in main
    command = pickleSer.loads(command.value)
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/broadcast.py", line 112, in value
    self._value = self.load(self._path)
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/broadcast.py", line 92, in load
    with open(path, 'rb', 1 << 20) as f:
IOError: [Errno 2] No such file or directory: '/home/jeff/spark-scratch/spark-7fe17099-83e8-48f6-b46a-8925b025c395/tmpINRFSG'

	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
	at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:56)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

In [73]:
print(schTxRDD.schemaString())


root
 |-- bits: integer (nullable = true)
 |-- block_index: integer (nullable = true)
 |-- blockhash: string (nullable = true)
 |-- blockprefix: string (nullable = true)
 |-- blocksize: integer (nullable = true)
 |-- in_cnt: integer (nullable = true)
 |-- lock_time: integer (nullable = true)
 |-- magic_no: integer (nullable = true)
 |-- merklehash: string (nullable = true)
 |-- nonce: integer (nullable = true)
 |-- on_chain: integer (nullable = true)
 |-- out_cnt: integer (nullable = true)
 |-- prevhash: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- transaction_cnt: integer (nullable = true)
 |-- txIn_prevhash: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- txIn_prevtx_out_idx: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- txIn_scriptSig: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- txIn_sequence_no: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- txIn_txin_script_len: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- txOut_scriptPubKey: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- txOut_script_len: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- txOut_value: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- tx_version: integer (nullable = true)
 |-- version: integer (nullable = true)


In [48]:
schTxRDD.saveAsPickleFile('schTxRDD.txt')


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-48-a4534ba96a14> in <module>()
----> 1 schTxRDD.saveAsPickleFile('schTxRDD.txt')

/home/jeff/spark-1.2.1-bin-hadoop2.4/python/pyspark/rdd.py in saveAsPickleFile(self, path, batchSize)
   1265         else:
   1266             ser = BatchedSerializer(PickleSerializer(), batchSize)
-> 1267         self._reserialize(ser)._jrdd.saveAsObjectFile(path)
   1268 
   1269     def saveAsTextFile(self, path):

/home/jeff/spark-1.2.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/jeff/spark-1.2.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o197.saveAsObjectFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 24.0 failed 1 times, most recent failure: Lost task 2.0 in stage 24.0 (TID 35, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/jeff/spark-1.2.1-bin-hadoop2.4/python/pyspark/worker.py", line 92, in main
    command = pickleSer.loads(command.value)
  File "/home/jeff/spark-1.2.1-bin-hadoop2.4/python/pyspark/broadcast.py", line 106, in value
    self._value = self.load(self._path)
  File "/home/jeff/spark-1.2.1-bin-hadoop2.4/python/pyspark/broadcast.py", line 87, in load
    with open(path, 'rb', 1 << 20) as f:
IOError: [Errno 2] No such file or directory: '/home/jeff/spark-scratch/spark-13d28025-ac44-475f-a4cf-c265b55e6d19/spark-22fa63f0-2823-48bf-bfe4-28f7e5de418c/tmpzxB1wy'

	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
	at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:120)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
	at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242)
	at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
	at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1550)
	at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

In [73]:
%%bash
rm -r /home/jeff/prj/bitcoin-analytics/temp.parquet

In [74]:
schTxRDD.saveAsParquetFile('temp.parquet')


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-74-bd3cff4ebbed> in <module>()
----> 1 schTxRDD.saveAsParquetFile('temp.parquet')

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/sql.pyc in saveAsParquetFile(self, path)
   1901         True
   1902         """
-> 1903         self._jschema_rdd.saveAsParquetFile(path)
   1904 
   1905     def registerTempTable(self, name):

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o273.saveAsParquetFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 47.0 failed 1 times, most recent failure: Lost task 0.0 in stage 47.0 (TID 69, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py", line 92, in main
    if isinstance(command, Broadcast):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/broadcast.py", line 112, in value
    self._value = self.load(self._path)
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/broadcast.py", line 92, in load
    with open(path, 'rb', 1 << 20) as f:
IOError: [Errno 2] No such file or directory: '/home/jeff/spark-scratch/spark-b1eee178-7bfd-4442-961b-80055c4aab79/tmpgffDkv'

	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
	at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
	at org.apache.spark.scheduler.Task.run(Task.scala:56)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

In [47]:
print(res.take(1))


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-47-a4d1e36b34f0> in <module>()
----> 1 print(res.take(1))

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/sql.pyc in take(self, num)
   1994         [Row(field1=1, field2=u'row1'), Row(field1=2, field2=u'row2')]
   1995         """
-> 1996         return self.limit(num).collect()
   1997 
   1998     # Convert each object in the RDD to a Row with the right class

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/sql.pyc in collect(self)
   1976         """
   1977         with SCCallSiteSync(self.context) as css:
-> 1978             bytesInJava = self._jschema_rdd.baseSchemaRDD().collectToPython().iterator()
   1979         cls = _create_cls(self.schema())
   1980         return map(cls, self._collect_iterator_through_file(bytesInJava))

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/jeff/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o183.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 24.0 failed 1 times, most recent failure: Lost task 0.0 in stage 24.0 (TID 45, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py", line 92, in main
    command = pickleSer.loads(command.value)
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/broadcast.py", line 113, in value
    if self._jbroadcast is None:
  File "/home/jeff/spark-1.2.0-bin-hadoop2.4/python/pyspark/broadcast.py", line 94, in load
    try:
IOError: [Errno 2] No such file or directory: '/home/jeff/spark-scratch/spark-4c103b7f-93fc-473e-8a0b-7d629f325f53/tmpUWgmUY'

	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
	at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
	at org.apache.spark.scheduler.Task.run(Task.scala:56)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

In [74]:
tmp = {}
tmp['key1'] = []
tmp['key2'] = 'cat'

In [77]:
print(tmp.get('key1',[]).append(10))


None

In [78]:
tmp.get('key3',[])


Out[78]:
[]

In [80]:
print(tmp.get('key3',[]).append(10))


None

In [81]:
test = [].append(10)

In [83]:
print(test)


None

In [85]:
tmp.get('key1').append(90)

In [86]:
print(tmp)


{'key2': 'cat', 'key1': [10, 10, 90]}

In [87]:
tmp.get('key100',[]).append(80)
print(tmp)


{'key2': 'cat', 'key1': [10, 10, 90]}

In [ ]:
tmp.get(')