Converting incoming CDX files to Parquet

Quick look at file sizes:


In [2]:
!ls -lh eot2012_surt_index.cdx*


-rw-rw-r-- 1 ubuntu ubuntu 9.5G May 18 17:56 eot2012_surt_index.cdx.gz
-rw-rw-r-- 1 ubuntu ubuntu   60 May 18 17:50 eot2012_surt_index.cdx.gz.md5

Note: Spark can typically load *.gz files just fine, but that support comes from Hive integration, which seems to be missing here. So gunzip first.


In [6]:
!gunzip eot2012_surt_index.cdx.gz

Load in the unzipped file, filetering out any line that starts with a blank or has essentially no content.


In [1]:
eot2012 = sc.textFile("eot2012_surt_index.cdx") \
  .filter(lambda line: line[0] != ' ') \
  .filter(lambda line: len(line)>1) \
  .map(lambda line: line.split(" ")) \

Prep a dataframe from the RDD, naming columns appropriately.


In [4]:
df = sqlContext.createDataFrame(eot2012)

In [5]:
df = df.withColumnRenamed("_1", "surt_uri") \
    .withColumnRenamed("_2", "capture_time") \
    .withColumnRenamed("_3", "original_uri") \
    .withColumnRenamed("_4", "mime_type") \
    .withColumnRenamed("_5", "response_code") \
    .withColumnRenamed("_6", "hash_sha1") \
    .withColumnRenamed("_7", "redirect_url") \
    .withColumnRenamed("_8", "meta_tags") \
    .withColumnRenamed("_9", "length_compressed") \
    .withColumnRenamed("_10", "warc_offset") \
    .withColumnRenamed("_11", "warc_name") \

Write out as Parquet.


In [6]:
df.write.parquet("eot2012.parquet")

In [7]:
!du -hs eot2012.parquet


8.1G	eot2012.parquet

This final step took 15 minutes on an r3.4xlarge, using 3.9 hours of compute time.