In [ ]:
import shlex

from pyspark.sql import Row
log_file = sc.textFile("../data/log_file.txt")

log_file.takeSample(True, 5)

In [ ]:
splits = log_file.map(lambda row: shlex.split(row))
splits.cache()
splits.takeSample(True, 5)

In [ ]:
def create_schema(row):
  ip = row[0]
  date = row[1].replace('[', '').replace(']', '')
  tokens = row[2].split(' ')
  protocol = tokens[0]
  url = tokens[1].split('?')[0]
  status = row[3]
  time = None if row[4] == '-' else int(row[4]) 
  return Row(ip=ip, date=date, protocol=protocol, url=url, status=status, time=time)

In [ ]:
schema_DF = splits.map(create_schema).toDF()
schema_DF.take(5)

In [ ]:
sqlCtx.registerDataFrameAsTable(schema_DF, 'logs')
sample = sqlCtx.sql('SELECT * FROM logs LIMIT 10').collect()
for row in sample:
    print row

In [ ]:
# find 10 most popular url's
url_access = sqlCtx.sql('''SELECT url, count(*) as counts FROM logs GROUP BY url
  ORDER BY counts DESC LIMIT 10''').collect()
for row in url_access:
    print row

In [ ]:
# 10 highest traffic sources
ip_access = sqlCtx.sql("SELECT ip, count(*) as counts FROM logs GROUP BY ip ORDER BY counts DESC LIMIT 10").collect()
for row in ip_access:
    print row

In [ ]:
# same operation without sparkSQL
ip_access_direct = schema_DF.map(lambda row: (row.ip, 1)).reduceByKey(lambda a,b: a+b).map(
 lambda r: (r[1], r[0])).sortByKey(ascending=False).map(lambda r: (r[1], r[0])).collect()
for row in ip_access[:10]:
    print row

In [ ]:
step1 = schema_DF.map(lambda row: (row.ip, 1))
step1.take(5)

In [ ]:
step2 = step1.reduceByKey(lambda a,b: a+b)
step2.take(5)

In [ ]:
step3 = step2.map(lambda r: (r[1], r[0]))
step3.take(5)

In [ ]:
step4 = step3.sortByKey(ascending=False)
step4.take(5)

In [ ]:
step5 = step4.map(lambda r: (r[1], r[0]))
step5.take(10)

In [ ]:
from time import time
pq_path = '../data/log_{}.pq'.format(int(time()))
schema_DF.saveAsParquetFile(pq_path)

In [ ]:
parquetFile = sqlCtx.parquetFile(pq_path)
parquetFile.registerTempTable('parquetTable')

ip_access = sqlCtx.sql("SELECT ip, count(*) as counts FROM parquetTable GROUP BY ip ORDER BY counts DESC LIMIT 10").collect()
for row in ip_access:
    print row

In [ ]: