In [3]:
from pyspark import SparkContext
from pyspark.sql import Row, SQLContext
from pyspark.sql import HiveContext, DataFrameWriter
from datetime import datetime
from dateutil import tz
import time
import os
In [17]:
#Put all variables here
iHiveTable = "vrawsession"
oHiveTable = "vconsolsession"
iHiveQuery = "SELECT CONCAT(uid, ','," \
+ " start_time, ',' ,dur_loc_seq, ',', " \
+ " data_dt) as ev from " + iHiveTable
In [5]:
#iHiveQuery
In [8]:
#Must do this if running py files independently
sc = SparkContext( 'local', 'pyspark')
hiveContext = HiveContext(sc)
In [18]:
tdf = hiveContext.sql(iHiveQuery)
In [19]:
#tdf.collect()
In [20]:
def toLocDurTuples(line):
DATETIME_FMT="%Y-%m-%d@%H:%M:%S"
uidI = 0
dateI = 3
startTimeI = 1
durLocI = 2
values = line.split(',')
#uid_date becomes the key later
uid_date = values[uidI] + '_' + values[dateI]
#Convert time string to epoch time stamp
dt_with_tz = datetime(*time.strptime(values[startTimeI], DATETIME_FMT)[:6],
tzinfo=tz.tzutc())
delta = (dt_with_tz - datetime(1970,1,1,tzinfo=tz.tzutc()))
startTime = delta.days*86400 + delta.seconds
durLocStrL = values[durLocI].split('][')
durList = map(int, durLocStrL[0][1:].split(':'))
loclist = map(int, durLocStrL[1][0:-1].split(':'))
locTimeL = []
dur = 0;
i = 0
for d in durList:
locTimeL.append((loclist[i], startTime+dur, d))
dur += d
i += 1
return (uid_date, locTimeL)
def tfin(x):
uid_dateI = 0
locTimeDurI = 1
locI = 0
tsI = 1
durI = 2
uid,date = x[uid_dateI].split('_')
data = x[locTimeDurI]
#Sort by timestamp
data = sorted(data, key=lambda tup: tup[tsI])
#Remove redundant locations
data2 = []
maxlen = len(data) - 1
i = 0
while(1):
if (i >= maxlen):
if (i == maxlen):
data2.append(data[i])
break;
if data[i][locI] != data[i+1][locI]:
#locations are different
data2.append(data[i])
i += 1
else:
#locations are same!
if (data[i][tsI] + data[i][durI] == data[i+1][tsI]):
#back to back on same location
data2.append((data[i][locI], \
data[i][tsI], \
data[i][durI] + data[i+1][durI]))
#skip the next entry
i += 2
else:
i += 1
data2 = sorted(data2, key=lambda x: x[2], reverse=True)
data3 = str(data2).replace(', ',':').replace('):',')|')
return((uid, data3, int(date)))
In [21]:
rdd2 = tdf.select("ev").rdd.map(lambda x: toLocDurTuples(x.ev)) \
.reduceByKey(lambda a,b: a+b) \
.map(lambda x: tfin(x))
In [27]:
#rdd2.take(10)
In [23]:
tdf2 = hiveContext.createDataFrame(rdd2, ['uid','loc_ts_dur', 'data_dt'])
In [24]:
#tdf2.collect()
In [25]:
df_writer = DataFrameWriter(tdf2)
df_writer.insertInto(oHiveTable,overwrite=True)
In [26]:
sc.stop()
In [31]:
#toLocDurTuples('101,2016-06-01@12:04:02,[40:50][202:203],20160601')
In [32]:
#tfin(('101_20160601', [(202, 1464782642, 40), (203, 1464782682, 50)]))