In [1]:
%load_ext autoreload
%autoreload 2
import dateutil
import json
from pyspark.sql import SQLContext, Row

In [2]:
import os
import sys
import inspect

currentdir = os.path.dirname(
    os.path.abspath(inspect.getfile(inspect.currentframe()))
)
parentdir = os.path.dirname(currentdir)
gpparentdir = os.path.dirname(parentdir)
sys.path.insert(0, gpparentdir)

from src.traces.trace_utils import *

In [5]:
sqlContext = SQLContext(sc)
table = 'test'
input_dir = '/user/ellery/readers/data/hashed_traces/rs3'
output_dir = '/home/ellery/readers/data/click_traces/rs3'

In [6]:
day = '2016-03-03'
host = 'en.wikipedia.org'

In [7]:
partition = get_partition_name(day, host)
input_partition = os.path.join(input_dir, partition)
output_partition = os.path.join(output_dir, partition)

In [10]:
d_all_clicks = get_all_clicks()

In [23]:
trace_rdd = sc.textFile(input_partition) \
                .map(lambda x: json.loads(x)) \
                .filter(lambda x: len(x) == 4) \
                .map(lambda x: Row(key=x['ip'] + x['ua'], requests=x['requests'], geo_data=x['geo_data']))
            

traceDF = sqlContext.createDataFrame(trace_rdd)
traceDF.registerTempTable("traceDF")

In [24]:
trace_rdd.take(1)


Out[24]:
[Row(geo_data='city\x03Brisbane\x02country_code\x03AU\x02longitude\x03153.0281\x02subdivision\x03Queensland\x02timezone\x03Australia/Brisbane\x02postal_code\x034000\x02continent\x03Oceania\x02latitude\x03-27.4679\x02country\x03Australia', key='60bda35cc9c591ed68c046a193b8ef8599346b45Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36', requests=[{'r': 'https://www.google.com.au/', 't': '2016-03-03 11:39:44', 'p': '/wiki/Christianity_in_Australia'}, {'r': 'https://www.google.com.au/', 't': '2016-03-03 11:52:02', 'p': '/wiki/Christianity_in_Australia'}])]

In [28]:
clickDF = get_click_df(sc,sqlContext, d_all_clicks, day,host, 'clickDF')

In [30]:
query = """
SELECT *
FROM 
    traceDF JOIN clickDF
WHERE clickDF.key = traceDF.key
"""

res = sqlContext.sql(query).collect()

In [29]:
sqlContext.sql("SELECT COUNT(*) FROM clickDF ").collect()


Out[29]:
[Row(_c0=3826)]

In [31]:
len(res)


Out[31]:
4421

In [32]:
res[0]


Out[32]:
Row(geo_data="city\x03Nerekhta\x02country_code\x03RU\x02longitude\x0340.5661\x02subdivision\x03Kostromskaya Oblast'\x02timezone\x03Europe/Moscow\x02postal_code\x03157800\x02continent\x03Europe\x02latitude\x0357.4553\x02country\x03Russia", key='001537b0d0ca14c4e46fce80d0ea2dca370b5ae4Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36', requests=[{'r': 'http://yandex.ru/clck/jsredir?from=yandex.ru%3Bsearch%2F%3Bweb%3B%3B&text=&etext=982.QiH6c3Sto_ax8wG7_c7_BJDEt5r3GNGHFvYMVJC9XMjui82Q2QriJ-x-x_9KdcJK.bcd19b5a5ce9b058ac7ed31acdd5dd657228fb1f&uuid=&state=PEtFfuTeVD5kpHnK9lio9bb4iM1VPfe4W5x0C0-qwflIRTTifi6VAA&data=UlNrNmk5WktYejY4cHFySjRXSWhXSGxvZUlpX00ycGJLT2o3eThmZHI2bHdTdTFrenVrSGZueVZvSmRRaDktTEJxNFlTZzUtS2RLNHVVc0JGcHhPTFQycGVkMGRGUXRDT2w4bUY5TXg4X3dqZ3U2REptQWhMUzQyTjJBQzE3eGJKNDYzQmotLWYzb3dsS0ZtSWhTZzREalRQR3VOblhMQQ&b64e=2&sign=bc943589bbe2c971c8c899d12745ce92&keyno=0&cst=AiuY0DBWFJ5eVd_Onia6xqeWXok_Ow4AsF2UGZl_IvirXbU0iLWJ0nL7GUL5fSV2S5bswqSFF5t0fFRiviz-sYPGAuA1n6Sa5AibBjR11IvzyrwNOuv4fDhb9tzHHAFl0eIcxFMojHHOZB56Bg53dES6_mP8l-zxhMcIDkE4jobdNnm3AmZZCDeAbhrX7AXD5h6Wrg0Ndgcq1EbT_0J1lvzLPUdsHq2KgmG84LqdcbOmtNmuw_Xq2VpG5Ggs9J-7&ref=orjY4mGPRjk5boDnW0uvlrrd71vZw9kp_WG74dv641SQgVWqsIlqVh9cNEbTja_TQpr3SsAoFKGsaq7cVdmvTst6cRScCO9HCT7aXqs28S8WGRze47H-rcJuaUVHDtCr&l10n=ru&cts=1456979232988&mc=3.4713544870139303', 't': '2016-03-03 09:26:56', 'p': '/wiki/Beyond_Good_%26_Evil_(video_game)'}], click_data={'event_surveyInstanceToken': '39fa7c47377fcd5a', 'timestamp': '20160303092704', 'event_pageId': '633739', 'event_pageTitle': 'Beyond_Good_&_Evil_(video_game)'}, key='001537b0d0ca14c4e46fce80d0ea2dca370b5ae4Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36')

In [60]:
type(res)


Out[60]:
list

In [67]:
click_traces = []
for row in res:
    d = {'key': row.key, 'requests': row.requests, 'click_data':row.click_data, 'geo_data': row.geo_data}
    click_traces.append(d)

In [71]:
path = os.path.join(out_dir, fname)
outfile = os.path.join(path, 'data.json')
os.makedirs( path )
json.dump(click_traces, open(outfile, 'w'))