In [1]:
%load_ext autoreload
%autoreload 2
import dateutil
import json
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

In [2]:
import os
import sys
import inspect

currentdir = os.path.dirname(
    os.path.abspath(inspect.getfile(inspect.currentframe()))
)
parentdir = os.path.dirname(currentdir)
gpparentdir = os.path.dirname(parentdir)
sys.path.insert(0, gpparentdir)

from src.traces.trace_utils import *

In [3]:
def get_hash_function():
    key = name = input("Key ")
    clear_output()
    def hash_function(s):
        code = hmac.new(key.encode('utf-8'), s.encode('utf-8'), hashlib.sha1)
        s = code.hexdigest()
        return s
    return hash_function


def parse_row(line):
    row = line.strip().split('\t')
    if len(row) !=5:
        return None
    
    d = {'ip': row[0],
         'ua': row[1],
         'requests' : parse_requests(row[3]),
         'geo_data' : row[4]
        }
    return d

def parse_requests(requests):
    ret = []
    for r in requests.split('||'):
        t = r.split('|')
        if len(t) != 3:
            continue
        ret.append({'t': t[0], 'r': t[1], 'p': t[2]})
    ret.sort(key = lambda x: x['t']) # sort by time
    return ret

In [12]:
input_dir = '/user/hive/warehouse/traces.db/test2'
output_dir = '/user/ellery/readers/data/hashed_traces/test2'

In [5]:
hash_function = get_hash_function()

def hash_ip(x):
    x['ip'] = hash_function(x['ip'])
    return x

In [14]:
day = '2016-02-29'
host = 'en.wikipedia.org'
partition = get_partition_name(day, host)

In [25]:
input_partition = os.path.join(input_dir, partition)
output_partition = os.path.join(output_dir, partition )
trace_rdd = sc.textFile(input_partition) \
    .map(parse_row) \
    .filter(lambda x: x is not None) \
    .map(hash_ip) \
    .map(lambda x: json.dumps(x))         
os.system('hadoop fs -rm -r ' + output_partition)
trace_rdd.saveAsTextFile(output_partition)

In [29]:
sc.textFile(output_partition).map(lambda xtake(1)


Out[29]:
['{"requests": [{"r": "-", "t": "2016-02-29 00:17:28", "p": "/wiki/Bad_Company"}, {"r": "-", "t": "2016-02-29 00:17:31", "p": "/wiki/Bad_Company"}, {"r": "-", "t": "2016-02-29 00:22:42", "p": "/wiki/Foreigner"}, {"r": "-", "t": "2016-02-29 00:22:43", "p": "/wiki/Foreigner"}, {"r": "-", "t": "2016-02-29 00:27:40", "p": "/wiki/Thunder"}, {"r": "-", "t": "2016-02-29 00:27:43", "p": "/wiki/Thunder"}, {"r": "-", "t": "2016-02-29 00:33:18", "p": "/wiki/Simple_Minds"}, {"r": "-", "t": "2016-02-29 00:33:22", "p": "/wiki/Simple_Minds"}, {"r": "-", "t": "2016-02-29 00:39:09", "p": "/wiki/Marillion"}, {"r": "-", "t": "2016-02-29 00:39:12", "p": "/wiki/Marillion"}, {"r": "-", "t": "2016-02-29 00:43:31", "p": "/wiki/The_Eagles"}, {"r": "-", "t": "2016-02-29 00:43:36", "p": "/wiki/The_Eagles"}, {"r": "-", "t": "2016-02-29 00:47:27", "p": "/wiki/Dream_Theater"}, {"r": "-", "t": "2016-02-29 00:47:30", "p": "/wiki/Dream_Theater"}, {"r": "-", "t": "2016-02-29 00:51:19", "p": "/wiki/Ugly_Kid_Joe"}, {"r": "-", "t": "2016-02-29 00:51:24", "p": "/wiki/Ugly_Kid_Joe"}, {"r": "-", "t": "2016-02-29 00:55:22", "p": "/wiki/Queen"}, {"r": "-", "t": "2016-02-29 00:55:25", "p": "/wiki/Queen"}], "ip": "24cbe39b2ddcae901bf755f28ace0aa841b65b46", "geo_data": "city\\u0003Bucheon-si\\u0002country_code\\u0003KR\\u0002longitude\\u0003126.7831\\u0002subdivision\\u0003Gyeonggi-do\\u0002timezone\\u0003Asia/Seoul\\u0002postal_code\\u0003Unknown\\u0002continent\\u0003Asia\\u0002latitude\\u000337.4989\\u0002country\\u0003Republic of Korea", "ua": "foobar2000/1.3.6"}']

In [ ]: