In [2]:
!ls -lh eot2012_surt_index.cdx*
Note: Spark can typically load *.gz files just fine, but that support comes from Hive integration, which seems to be missing here. So gunzip first.
In [6]:
!gunzip eot2012_surt_index.cdx.gz
Load in the unzipped file, filetering out any line that starts with a blank or has essentially no content.
In [1]:
eot2012 = sc.textFile("eot2012_surt_index.cdx") \
.filter(lambda line: line[0] != ' ') \
.filter(lambda line: len(line)>1) \
.map(lambda line: line.split(" ")) \
Prep a dataframe from the RDD, naming columns appropriately.
In [4]:
df = sqlContext.createDataFrame(eot2012)
In [5]:
df = df.withColumnRenamed("_1", "surt_uri") \
.withColumnRenamed("_2", "capture_time") \
.withColumnRenamed("_3", "original_uri") \
.withColumnRenamed("_4", "mime_type") \
.withColumnRenamed("_5", "response_code") \
.withColumnRenamed("_6", "hash_sha1") \
.withColumnRenamed("_7", "redirect_url") \
.withColumnRenamed("_8", "meta_tags") \
.withColumnRenamed("_9", "length_compressed") \
.withColumnRenamed("_10", "warc_offset") \
.withColumnRenamed("_11", "warc_name") \
Write out as Parquet.
In [6]:
df.write.parquet("eot2012.parquet")
In [7]:
!du -hs eot2012.parquet
This final step took 15 minutes on an r3.4xlarge, using 3.9 hours of compute time.