In [2]:
sc


Out[2]:
<pyspark.context.SparkContext at 0x7f80376c0fd0>

In [3]:
rawEventsRdd = sc.textFile("/home/mert/yahoo/events.txt")
rawEventsRdd.take(5)


Out[3]:
[u'001e6d8e-cbe7-4374-8c38-f37962a457e9\tair.com.smashatom.bingo\t1421009506\t47.237476\t-122.530884\tTacoma\t6\t12\tApp_Opened',
 u'001e6d8e-cbe7-4374-8c38-f37962a457e9\tcom.android.vending\t1421029924\t47.237476\t-122.530891\tTacoma\t6\t18\tApp_Opened',
 u'001e6d8e-cbe7-4374-8c38-f37962a457e9\tair.com.buffalo_studios.bingorush2\t1421015988\t47.237461\t-122.530899\tTacoma\t6\t14\tApp_Opened',
 u'001e6d8e-cbe7-4374-8c38-f37962a457e9\tcom.facebook.orca\t1421318120\t47.243618\t-122.513252\tTacoma\t3\t2\tApp_Opened',
 u'001e6d8e-cbe7-4374-8c38-f37962a457e9\tcom.jrdcom.android.gallery3d\t1421028599\t47.237442\t-122.530899\tTacoma\t6\t18\tApp_Opened']

In [4]:
from collections import namedtuple

EventDataRow = namedtuple("EventDataRow", ["userId", "itemId", "ts", "latitude", "longitude", "city", "day_of_week", "time_of_day" , "event_type"])

def parseRawData(line):
    lineSplit = line.split("\t")
    return EventDataRow(userId=lineSplit[0],
                      itemId=lineSplit[1],
                      ts=int(lineSplit[2]),
                      latitude=float(lineSplit[3]),
                      longitude=float(lineSplit[4]),
                      city=lineSplit[5],
                      day_of_week=int(lineSplit[6]),
                      time_of_day=int(lineSplit[7]),
                      event_type=lineSplit[-1],
    )
    

eventsRdd = rawEventsRdd.map(parseRawData).cache()
eventsRdd.take(3)


Out[4]:
[EventDataRow(userId=u'001e6d8e-cbe7-4374-8c38-f37962a457e9', itemId=u'air.com.smashatom.bingo', ts=1421009506, latitude=47.237476, longitude=-122.530884, city=u'Tacoma', day_of_week=6, time_of_day=12, event_type=u'App_Opened'),
 EventDataRow(userId=u'001e6d8e-cbe7-4374-8c38-f37962a457e9', itemId=u'com.android.vending', ts=1421029924, latitude=47.237476, longitude=-122.530891, city=u'Tacoma', day_of_week=6, time_of_day=18, event_type=u'App_Opened'),
 EventDataRow(userId=u'001e6d8e-cbe7-4374-8c38-f37962a457e9', itemId=u'air.com.buffalo_studios.bingorush2', ts=1421015988, latitude=47.237461, longitude=-122.530899, city=u'Tacoma', day_of_week=6, time_of_day=14, event_type=u'App_Opened')]

In [5]:
userIdConversionDictionary = eventsRdd.map(lambda x: x.userId).distinct().zipWithIndex().collectAsMap()
userIdConversionDictionaryBroadcast = sc.broadcast(userIdConversionDictionary)
itemIdConversionDictionary = eventsRdd.map(lambda x: x.itemId).distinct().zipWithIndex().collectAsMap()
itemIdConversionDictionaryBroadcast = sc.broadcast(itemIdConversionDictionary)
cityConversionDictionary = eventsRdd.map(lambda x: x.city).distinct().zipWithIndex().collectAsMap()
cityConversionDictionaryBroadcast = sc.broadcast(cityConversionDictionary)

In [6]:
eventsConvertedRdd = eventsRdd.map(lambda x: EventDataRow(
    userId=userIdConversionDictionaryBroadcast.value[x.userId],
    itemId=itemIdConversionDictionaryBroadcast.value[x.itemId],
    ts=x.ts,
    latitude=x.latitude,
    longitude=x.longitude,
    city=cityConversionDictionaryBroadcast.value[x.city],
    day_of_week=x.day_of_week,
    time_of_day=x.time_of_day,
    event_type=x.event_type
    ))

eventsConvertedRdd.take(3)


Out[6]:
[EventDataRow(userId=163385, itemId=77275, ts=1421009506, latitude=47.237476, longitude=-122.530884, city=13209, day_of_week=6, time_of_day=12, event_type=u'App_Opened'),
 EventDataRow(userId=163385, itemId=44053, ts=1421029924, latitude=47.237476, longitude=-122.530891, city=13209, day_of_week=6, time_of_day=18, event_type=u'App_Opened'),
 EventDataRow(userId=163385, itemId=83370, ts=1421015988, latitude=47.237461, longitude=-122.530899, city=13209, day_of_week=6, time_of_day=14, event_type=u'App_Opened')]

In [7]:
finalRDD = eventsConvertedRdd.map(lambda x: [
    x.userId,(
    x.itemId,
    x.ts)
    ])
finalRDD.take(3)
#testRDD = allRDD.sample(False, 0.1, 1234)
#testRDD.count()
#trainRDD = allRDD.filter(lambda x: x and x not in testRDD) it is not working
#trainRDD.count()


Out[7]:
[[163385, (77275, 1421009506)],
 [163385, (44053, 1421029924)],
 [163385, (83370, 1421015988)]]

In [8]:
groupData = finalRDD.groupByKey().map(lambda (x, y): (x, list(y)))
#groupData = map((lambda (x,y): (x, list(y))), sorted(finalRDD.groupByKey().collect()))
#sc.parallelize(groupData).collect()

In [15]:
from random import shuffle

def remove_duplicates(values):
    output = []
    seen = set()
    for value in values:
        # If value has not been encountered yet,
        # ... add it to both list and set.
        if value not in seen:
            output.append(value)
            seen.add(value)
    return output

def lruFunction(line):
    listGroup = line[1]
    #shuffle(listGroup)                  #shuffle the list
    #listGroup = sorted(listGroup,key=lambda x:int(x[1]), reverse=True); 
    listGroup = sorted(listGroup,key=lambda x:int(x[1])); 
    l = len(listGroup) 
    numTrain = l * 8 / 10
    numTest = l - numTrain
    trainList = listGroup[:numTrain]      #0.8 train set
    testList = listGroup[numTrain:]       #0.2 test set
    
    trainList = sorted(trainList,key=lambda x:int(x[1]), reverse=True);  #sort by timestamp with descending
    testList = [t[0] for t in testList]                      #take only id for test set
    RecommenderDuplicate =  [t[0] for t in trainList]        #take only id for train set
    Recommender = remove_duplicates(RecommenderDuplicate)    #remove duplicate
    
    if len(Recommender) > 4:
        finalRecommender = Recommender[:5]
    else:    
        finalRecommender = [-1,-1,-1,-1,-1]
        numRec = len(finalRecommender)
        finalRecommender[:numRec] = Recommender
    
    scores = 0
    numHit = 0
    if len(t) ==0 or len(finalRecommender) <5:
        scores = 0
    else:
        for t in testList:
            if t == finalRecommender[0]:
                scores = scores+1.0
                numHit = numHit+1
            elif t == finalRecommender[1]:
                scores = scores+1.0                
                numHit = numHit+1
            elif t == finalRecommender[2]:
                scores = scores+1.0
                numHit = numHit+1
            elif t == finalRecommender[3]:
                scores = scores+1.0
                numHit = numHit+1
            elif t == finalRecommender[4]:
                scores = scores+1.0
                numHit = numHit+1
            else:
                 numHit = numHit+1
#         scores = scores / numTest
        if numHit != 0:
            scores = scores / numHit
        else:
            scores = 0
    return scores
    #return testList[0],testList[1],testList[2],testList[3],testList[4]
    #if len(Recommender) > 4:
    #    return line[0], Recommender[0], Recommender[1], Recommender[2], Recommender[3], Recommender[4]
    #return line[0],trainList[0],trainList[1],trainList[2],trainList[3],trainList[4],trainList[5],trainList[6],trainList[7]
    #return l,len(trainList),len(testList) 
finalScore = groupData.map(lruFunction)
finalScore.mean()


Out[15]:
0.44746548532394254

In [18]:
def mruFunction(line):
    listGroup = line[1]
    #shuffle(listGroup)                  #shuffle the list
    #listGroup = sorted(listGroup,key=lambda x:int(x[1]), reverse=True); 
    listGroup = sorted(listGroup,key=lambda x:int(x[1])); 
    l = len(listGroup) 
    numTrain = l * 8 / 10
    numTest = l - numTrain
    trainList = listGroup[:numTrain]      #0.8 train set
    testList = listGroup[numTrain:]       #0.2 test set
    
    #trainList = sorted(trainList,key=lambda x:int(x[1]), reverse=True);  #sort by timestamp with descending
    testList = [t[0] for t in testList]                      #take only id for test set
    RecommenderDuplicate =  [t[0] for t in trainList]        #take only id for train set
    Recommender = remove_duplicates(RecommenderDuplicate)    #remove duplicate
    
    if len(Recommender) > 4:
        finalRecommender = Recommender[:5]
    else:
        finalRecommender = [-1,-1,-1,-1,-1]
        numRec = len(finalRecommender)
        finalRecommender[:numRec] = Recommender
    
    scores = 0
    numHit = 0
    if len(t) ==0 or len(finalRecommender) <5:
        scores = 0
    else:
        for t in testList:
            if t == finalRecommender[0]:
                scores = scores+1.0
                numHit = numHit+1
            elif t == finalRecommender[1]:
                scores = scores+1.0
                numHit = numHit+1
            elif t == finalRecommender[2]:
                scores = scores+1.0
                numHit = numHit+1
            elif t == finalRecommender[3]:
                scores = scores+1.0
                numHit = numHit+1
            elif t == finalRecommender[4]:
                scores = scores+1.0
                numHit = numHit+1
            else:
                numHit = numHit+1
        #scores = scores / numTest
        if numHit != 0:
            scores = scores / numHit
        else:
            scores = 0
    #return scores
    return scores
    #return testList[0],testList[1],testList[2],testList[3],testList[4]
    #if len(Recommender) > 4:
    #    return line[0], Recommender[0], Recommender[1], Recommender[2], Recommender[3], Recommender[4]
    #return line[0],trainList[0],trainList[1],trainList[2],trainList[3],trainList[4],trainList[5],trainList[6],trainList[7]
    #return l,len(trainList),len(testList) 
finalScore = groupData.map(mruFunction)
finalScore.mean()#,finalScore.count()


Out[18]:
0.3765235665432925

In [21]:
from collections import Counter
def mruFunction(line):
    listGroup = line[1]
    #shuffle(listGroup)                  #shuffle the list
    #listGroup = sorted(listGroup,key=lambda x:int(x[1]), reverse=True); 
    listGroup = sorted(listGroup,key=lambda x:int(x[1])); 
    l = len(listGroup) 
    numTrain = l * 8 / 10
    numTest = l - numTrain
    trainList = listGroup[:numTrain]      #0.9 train set
    testList = listGroup[numTrain:]       #0.1 test set
    
    testList = [t[0] for t in testList]                      #take only id for test set
    RecommenderDuplicate =  [t[0] for t in trainList]        #take only id for train set
    #Recommender = remove_duplicates(RecommenderDuplicate)    #remove duplicate
    Recommender = Counter(RecommenderDuplicate).most_common()
    Recommender =  [t[0] for t in Recommender] 

    if len(Recommender) > 4:
        finalRecommender = Recommender[:5]
    else:
        finalRecommender = [-1,-1,-1,-1,-1]
        numRec = len(finalRecommender)
        finalRecommender[:numRec] = Recommender
    
    scores = 0
    numHit = 0
    if len(t) ==0 or len(finalRecommender) <5:
        scores = 0
    else:
        for t in testList:
            if t == finalRecommender[0]:
                scores = scores+1.0
                numHit = numHit+1
            elif t == finalRecommender[1]:
                scores = scores+0.8
                numHit = numHit+1
            elif t == finalRecommender[2]:
                scores = scores+0.6
                numHit = numHit+1
            elif t == finalRecommender[3]:
                scores = scores+0.4
                numHit = numHit+1
            elif t == finalRecommender[4]:
                scores = scores+0.2
                numHit = numHit+1
#             else:
#                 numHit = numHit+1    
        #scores = scores / numTest
        if numHit != 0:
            scores = scores / numHit
        else:
            scores = 0
    return scores
    #return Recommender
    #return testList[0],testList[1],testList[2],testList[3],testList[4]
    #if len(Recommender) > 4:
    #    return line[0], Recommender[0], Recommender[1], Recommender[2], Recommender[3], Recommender[4]
    #return line[0],trainList[0],trainList[1],trainList[2],trainList[3],trainList[4],trainList[5],trainList[6],trainList[7]
    #return l,len(trainList),len(testList) 
finalScore = groupData.map(mruFunction)
finalScore.mean()


Out[21]:
0.6885997714189959

In [ ]: