In [1]:
from pyspark import SparkContext
from pyspark.sql import HiveContext, DataFrameWriter
from datetime import datetime
import time
import re

In [6]:
#Put all variables here
iHiveTable = "vconsolsession"
oHiveTable = "vprdloc"

startDate = 20160601
endDate = 20160602

iHiveQuery = "SELECT CONCAT(uid, ','," \
+ " loc_ts_dur) as ev " \
+ "from " + iHiveTable \
+ " where data_dt>=" + str(startDate) \
+ " and data_dt <=" + str(endDate)

In [7]:
#iHiveQuery

In [8]:
sc = SparkContext( 'local', 'pyspark')
hiveContext = HiveContext(sc)

In [9]:
tdf = hiveContext.sql(iHiveQuery)

In [10]:
#tdf.collect()

In [11]:
def predictPeriodDuration(x):
    uidI = 0
    locTsDur = 1
    
    locPrdDur = []
    RX = re.compile('[[()]')
    locTsDurL = RX.sub('', x[locTsDur]).replace(']','').split('|')
    for locTsDur in locTsDurL:
        loc,ts,dur = locTsDur.split(':')
        #Convert ts to period
        t1 = time.gmtime(int(ts))
        prd = ((t1.tm_wday + 1)%7)*24 + t1.tm_hour
        locPrdDur.append((loc,prd,int(dur), int(ts)))

    #Pick the location with max duration in each period
    locPrdDur = sorted(locPrdDur, key=lambda x: (x[1],x[2]), reverse=True)
    sortedPrdL = []
    prevprd = -1
    for loc,prd,dur,ts in locPrdDur:
        if prevprd != prd:
            sortedPrdL.append((x[uidI],loc,prd,dur,ts))
        prevprd = prd

    sortedPrdL = sorted(sortedPrdL, key=lambda x: x[2])
    return sortedPrdL

In [12]:
rdd2 = tdf.select("ev").rdd.map(lambda x: tuple(x.ev.split(','))) \
                            .reduceByKey(lambda a,b: a+b) \
                            .flatMap(predictPeriodDuration)

In [13]:
#rdd2.take(10)

In [14]:
tdf2 = hiveContext.createDataFrame(rdd2, ['uid','loc','prd','dur', 'ts'])

In [15]:
#tdf2.collect()

In [16]:
df_writer = DataFrameWriter(tdf2)
df_writer.insertInto(oHiveTable,overwrite=True)

In [17]:
sc.stop()

In [19]:
#predictPeriodDuration((u'101',
#   u'[(202:1464764702:90)|(201:1464764642:60)|(201:1464764792:50)|(203:1464782682:50)|(202:1464782642:40)]'))