EOT CDX file analysis

Working with the already-transformed parquet files.


In [1]:
from datetime import datetime

from pyspark.sql import Row
from pyspark.sql.functions import lit, udf
from pyspark.sql.types import ArrayType, DateType, StringType

In [2]:
df08 = sqlContext.read.parquet("eot2008.parquet")

In [34]:
df12 = sqlContext.read.parquet("eot2012.parquet")

In [3]:
%time df08.count()


CPU times: user 2.4 ms, sys: 196 µs, total: 2.6 ms
Wall time: 7.32 s
Out[3]:
160212140

In [35]:
%time df12.count()


CPU times: user 1.83 ms, sys: 152 µs, total: 1.98 ms
Wall time: 5.96 s
Out[35]:
194066937

In [4]:
df08.dtypes


Out[4]:
[('surt_uri', 'string'),
 ('capture_time', 'string'),
 ('original_uri', 'string'),
 ('mime_type', 'string'),
 ('response_code', 'string'),
 ('hash_sha1', 'string'),
 ('redirect_url', 'string'),
 ('meta_tags', 'string'),
 ('length_compressed', 'string'),
 ('warc_offset', 'string'),
 ('warc_name', 'string')]

Questions for reformatting

Extract date to new Date column


In [ ]:
def extract_date(s):
    return '%s-%s-%s' % (s[0:4], s[4:6], s[6:8])

date_extractor = udf(extract_date, StringType())

In [ ]:
df08_date = df08.withColumn("capture_date", date_extractor(df08["capture_time"]).cast("date"))

In [ ]:
df08_date.take(3)

Extract domain components to columns


In [5]:
def extract_domain(s, level):
    domain = s.split(')')[0].split(',')
    level = int(level)
    if len(domain) > level:
        return domain[level]
    return ''

domain_extractor = udf(extract_domain, StringType())

...this is inelegant...


In [6]:
df08_dom1 = df08.withColumn("dom1", domain_extractor(df08["surt_uri"], lit('0')))
df08_dom2 = df08_dom1.withColumn("dom2", domain_extractor(df08_dom1["surt_uri"], lit('1')))
df08_dom3 = df08_dom2.withColumn("dom3", domain_extractor(df08_dom2["surt_uri"], lit('2')))
df08_dom4 = df08_dom3.withColumn("dom4", domain_extractor(df08_dom3["surt_uri"], lit('3')))

In [7]:
df08_dom4


Out[7]:
DataFrame[surt_uri: string, capture_time: string, original_uri: string, mime_type: string, response_code: string, hash_sha1: string, redirect_url: string, meta_tags: string, length_compressed: string, warc_offset: string, warc_name: string, dom1: string, dom2: string, dom3: string, dom4: string]

Hmm, that worked, but let's just go the sql route for now.


In [12]:
df08.registerTempTable("eot08")

In [39]:
sql = """
    SELECT SUBSTRING(surt_uri, 0, INSTR(surt_uri, ")") - 1) AS surt, COUNT(*) AS count
    FROM eot08
    GROUP BY SUBSTRING(surt_uri, 0, INSTR(surt_uri, ")") - 1)
    ORDER BY count DESC
    """
domains08 = sqlContext.sql(sql)

In [33]:
domains08.rdd.map(lambda x: "\t".join(map(str, x))).coalesce(1).saveAsTextFile("eot08-domains.csv")

In [36]:
df12.registerTempTable("eot12")

In [37]:
sql = """
    SELECT SUBSTRING(surt_uri, 0, INSTR(surt_uri, ")") - 1) AS surt, COUNT(*) AS count
    FROM eot12
    GROUP BY SUBSTRING(surt_uri, 0, INSTR(surt_uri, ")") - 1)
    ORDER BY count DESC
    """
domains12 = sqlContext.sql(sql)

In [38]:
domains12.rdd.map(lambda x: "\t".join(map(str, x))).coalesce(1).saveAsTextFile("eot12-domains.csv")

In [40]:
domains08.count()


Out[40]:
141090

In [52]:
domains08 = domains08.withColumnRenamed("surt", "surt08").withColumnRenamed("count", "count08")

In [41]:
domains12.count()


Out[41]:
353280

In [53]:
domains12 = domains12.withColumnRenamed("surt", "surt12").withColumnRenamed("count", "count12")

In [54]:
domains_combined = domains08.join(domains12, domains08.surt08 == domains12.surt12, 'outer')

In [55]:
domains_combined.dtypes


Out[55]:
[('surt08', 'string'),
 ('count08', 'bigint'),
 ('surt12', 'string'),
 ('count12', 'bigint')]

In [56]:
domains_combined.rdd.map(lambda x: "\t".join(map(str, x))).coalesce(1).saveAsTextFile("combined-domains.csv")

In [57]:
domains_combined.count()


Out[57]:
444186

In [65]:
def either(a, b):
    if a:
        return a
    return b

udf_either = udf(either, StringType())

In [66]:
domains_combined = domains_combined.withColumn("surt", udf_either(domains_combined["surt08"], domains_combined["surt12"]))

In [67]:
domains_combined.take(20)


Out[67]:
[Row(surt08=None, count08=None, surt12='100,99,67,134', count12=31, surt='100,99,67,134'),
 Row(surt08=None, count08=None, surt12='118,72,133,174', count12=13, surt='118,72,133,174'),
 Row(surt08='2,94,223,66', count08=31, surt12='2,94,223,66', count12=37, surt='2,94,223,66'),
 Row(surt08=None, count08=None, surt12='218,120,100,94', count12=3, surt='218,120,100,94'),
 Row(surt08=None, count08=None, surt12='249,58,254,173', count12=3, surt='249,58,254,173'),
 Row(surt08='58,78,12,76', count08=2, surt12=None, count12=None, surt='58,78,12,76'),
 Row(surt08=None, count08=None, surt12='67,254,207,130:7123', count12=2, surt='67,254,207,130:7123'),
 Row(surt08=None, count08=None, surt12='af,afghanistan,cdn', count12=38, surt='af,afghanistan,cdn'),
 Row(surt08='am,circle', count08=50, surt12='am,circle', count12=62, surt='am,circle'),
 Row(surt08=None, count08=None, surt12='ar,com,latinvia', count12=9, surt='ar,com,latinvia'),
 Row(surt08=None, count08=None, surt12='ar,com,tutoloquequieras,com,flickr', count12=2, surt='ar,com,tutoloquequieras,com,flickr'),
 Row(surt08='at,ac,tuwien,atp,magnet', count08=15, surt12=None, count12=None, surt='at,ac,tuwien,atp,magnet'),
 Row(surt08=None, count08=None, surt12='at,oewabox,ichkoche', count12=5, surt='at,oewabox,ichkoche'),
 Row(surt08=None, count08=None, surt12='at,pressreleases', count12=32, surt='at,pressreleases'),
 Row(surt08=None, count08=None, surt12='au,com,fairfax', count12=2, surt='au,com,fairfax'),
 Row(surt08=None, count08=None, surt12='au,com,fishpond', count12=108, surt='au,com,fishpond'),
 Row(surt08=None, count08=None, surt12='au,com,google,books,bks6', count12=55, surt='au,com,google,books,bks6'),
 Row(surt08=None, count08=None, surt12='au,com,lgnews,staging', count12=7, surt='au,com,lgnews,staging'),
 Row(surt08=None, count08=None, surt12='au,com,lwt', count12=5, surt='au,com,lwt'),
 Row(surt08='au,com,milkwoodpermaculture', count08=3, surt12=None, count12=None, surt='au,com,milkwoodpermaculture')]

In [69]:
domains_combined = domains_combined.drop("surt08").drop("surt12")

In [71]:
domains_combined = domains_combined.na.fill(0)

In [72]:
domains_combined.rdd.map(lambda x: "\t".join(map(str, x))).coalesce(1).saveAsTextFile("domains-combined.csv")

In [ ]: