In [1]:
from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.functions import lit, udf
from pyspark.sql.types import ArrayType, DateType, StringType
In [2]:
df08 = sqlContext.read.parquet("eot2008.parquet")
In [34]:
df12 = sqlContext.read.parquet("eot2012.parquet")
In [3]:
%time df08.count()
Out[3]:
In [35]:
%time df12.count()
Out[35]:
In [4]:
df08.dtypes
Out[4]:
In [ ]:
def extract_date(s):
return '%s-%s-%s' % (s[0:4], s[4:6], s[6:8])
date_extractor = udf(extract_date, StringType())
In [ ]:
df08_date = df08.withColumn("capture_date", date_extractor(df08["capture_time"]).cast("date"))
In [ ]:
df08_date.take(3)
In [5]:
def extract_domain(s, level):
domain = s.split(')')[0].split(',')
level = int(level)
if len(domain) > level:
return domain[level]
return ''
domain_extractor = udf(extract_domain, StringType())
...this is inelegant...
In [6]:
df08_dom1 = df08.withColumn("dom1", domain_extractor(df08["surt_uri"], lit('0')))
df08_dom2 = df08_dom1.withColumn("dom2", domain_extractor(df08_dom1["surt_uri"], lit('1')))
df08_dom3 = df08_dom2.withColumn("dom3", domain_extractor(df08_dom2["surt_uri"], lit('2')))
df08_dom4 = df08_dom3.withColumn("dom4", domain_extractor(df08_dom3["surt_uri"], lit('3')))
In [7]:
df08_dom4
Out[7]:
Hmm, that worked, but let's just go the sql route for now.
In [12]:
df08.registerTempTable("eot08")
In [39]:
sql = """
SELECT SUBSTRING(surt_uri, 0, INSTR(surt_uri, ")") - 1) AS surt, COUNT(*) AS count
FROM eot08
GROUP BY SUBSTRING(surt_uri, 0, INSTR(surt_uri, ")") - 1)
ORDER BY count DESC
"""
domains08 = sqlContext.sql(sql)
In [33]:
domains08.rdd.map(lambda x: "\t".join(map(str, x))).coalesce(1).saveAsTextFile("eot08-domains.csv")
In [36]:
df12.registerTempTable("eot12")
In [37]:
sql = """
SELECT SUBSTRING(surt_uri, 0, INSTR(surt_uri, ")") - 1) AS surt, COUNT(*) AS count
FROM eot12
GROUP BY SUBSTRING(surt_uri, 0, INSTR(surt_uri, ")") - 1)
ORDER BY count DESC
"""
domains12 = sqlContext.sql(sql)
In [38]:
domains12.rdd.map(lambda x: "\t".join(map(str, x))).coalesce(1).saveAsTextFile("eot12-domains.csv")
In [40]:
domains08.count()
Out[40]:
In [52]:
domains08 = domains08.withColumnRenamed("surt", "surt08").withColumnRenamed("count", "count08")
In [41]:
domains12.count()
Out[41]:
In [53]:
domains12 = domains12.withColumnRenamed("surt", "surt12").withColumnRenamed("count", "count12")
In [54]:
domains_combined = domains08.join(domains12, domains08.surt08 == domains12.surt12, 'outer')
In [55]:
domains_combined.dtypes
Out[55]:
In [56]:
domains_combined.rdd.map(lambda x: "\t".join(map(str, x))).coalesce(1).saveAsTextFile("combined-domains.csv")
In [57]:
domains_combined.count()
Out[57]:
In [65]:
def either(a, b):
if a:
return a
return b
udf_either = udf(either, StringType())
In [66]:
domains_combined = domains_combined.withColumn("surt", udf_either(domains_combined["surt08"], domains_combined["surt12"]))
In [67]:
domains_combined.take(20)
Out[67]:
In [69]:
domains_combined = domains_combined.drop("surt08").drop("surt12")
In [71]:
domains_combined = domains_combined.na.fill(0)
In [72]:
domains_combined.rdd.map(lambda x: "\t".join(map(str, x))).coalesce(1).saveAsTextFile("domains-combined.csv")
In [ ]: