In [2]:
sc
Out[2]:
In [3]:
rawEventsRdd = sc.textFile("/home/mert/yahoo/events.txt")
rawEventsRdd.take(5)
Out[3]:
In [4]:
from collections import namedtuple
EventDataRow = namedtuple("EventDataRow", ["userId", "itemId", "ts", "latitude", "longitude", "city", "day_of_week", "time_of_day" , "event_type"])
def parseRawData(line):
lineSplit = line.split("\t")
return EventDataRow(userId=lineSplit[0],
itemId=lineSplit[1],
ts=int(lineSplit[2]),
latitude=float(lineSplit[3]),
longitude=float(lineSplit[4]),
city=lineSplit[5],
day_of_week=int(lineSplit[6]),
time_of_day=int(lineSplit[7]),
event_type=lineSplit[-1],
)
eventsRdd = rawEventsRdd.map(parseRawData).cache()
eventsRdd.take(3)
Out[4]:
In [5]:
userIdConversionDictionary = eventsRdd.map(lambda x: x.userId).distinct().zipWithIndex().collectAsMap()
userIdConversionDictionaryBroadcast = sc.broadcast(userIdConversionDictionary)
itemIdConversionDictionary = eventsRdd.map(lambda x: x.itemId).distinct().zipWithIndex().collectAsMap()
itemIdConversionDictionaryBroadcast = sc.broadcast(itemIdConversionDictionary)
cityConversionDictionary = eventsRdd.map(lambda x: x.city).distinct().zipWithIndex().collectAsMap()
cityConversionDictionaryBroadcast = sc.broadcast(cityConversionDictionary)
In [6]:
eventsConvertedRdd = eventsRdd.map(lambda x: EventDataRow(
userId=userIdConversionDictionaryBroadcast.value[x.userId],
itemId=itemIdConversionDictionaryBroadcast.value[x.itemId],
ts=x.ts,
latitude=x.latitude,
longitude=x.longitude,
city=cityConversionDictionaryBroadcast.value[x.city],
day_of_week=x.day_of_week,
time_of_day=x.time_of_day,
event_type=x.event_type
))
eventsConvertedRdd.take(3)
Out[6]:
In [7]:
finalRDD = eventsConvertedRdd.map(lambda x: [
x.userId,(
x.itemId,
x.ts)
])
finalRDD.take(3)
#testRDD = allRDD.sample(False, 0.1, 1234)
#testRDD.count()
#trainRDD = allRDD.filter(lambda x: x and x not in testRDD) it is not working
#trainRDD.count()
Out[7]:
In [8]:
groupData = finalRDD.groupByKey().map(lambda (x, y): (x, list(y)))
#groupData = map((lambda (x,y): (x, list(y))), sorted(finalRDD.groupByKey().collect()))
#sc.parallelize(groupData).collect()
In [15]:
from random import shuffle
def remove_duplicates(values):
output = []
seen = set()
for value in values:
# If value has not been encountered yet,
# ... add it to both list and set.
if value not in seen:
output.append(value)
seen.add(value)
return output
def lruFunction(line):
listGroup = line[1]
#shuffle(listGroup) #shuffle the list
#listGroup = sorted(listGroup,key=lambda x:int(x[1]), reverse=True);
listGroup = sorted(listGroup,key=lambda x:int(x[1]));
l = len(listGroup)
numTrain = l * 8 / 10
numTest = l - numTrain
trainList = listGroup[:numTrain] #0.8 train set
testList = listGroup[numTrain:] #0.2 test set
trainList = sorted(trainList,key=lambda x:int(x[1]), reverse=True); #sort by timestamp with descending
testList = [t[0] for t in testList] #take only id for test set
RecommenderDuplicate = [t[0] for t in trainList] #take only id for train set
Recommender = remove_duplicates(RecommenderDuplicate) #remove duplicate
if len(Recommender) > 4:
finalRecommender = Recommender[:5]
else:
finalRecommender = [-1,-1,-1,-1,-1]
numRec = len(finalRecommender)
finalRecommender[:numRec] = Recommender
scores = 0
numHit = 0
if len(t) ==0 or len(finalRecommender) <5:
scores = 0
else:
for t in testList:
if t == finalRecommender[0]:
scores = scores+1.0
numHit = numHit+1
elif t == finalRecommender[1]:
scores = scores+1.0
numHit = numHit+1
elif t == finalRecommender[2]:
scores = scores+1.0
numHit = numHit+1
elif t == finalRecommender[3]:
scores = scores+1.0
numHit = numHit+1
elif t == finalRecommender[4]:
scores = scores+1.0
numHit = numHit+1
else:
numHit = numHit+1
# scores = scores / numTest
if numHit != 0:
scores = scores / numHit
else:
scores = 0
return scores
#return testList[0],testList[1],testList[2],testList[3],testList[4]
#if len(Recommender) > 4:
# return line[0], Recommender[0], Recommender[1], Recommender[2], Recommender[3], Recommender[4]
#return line[0],trainList[0],trainList[1],trainList[2],trainList[3],trainList[4],trainList[5],trainList[6],trainList[7]
#return l,len(trainList),len(testList)
finalScore = groupData.map(lruFunction)
finalScore.mean()
Out[15]:
In [18]:
def mruFunction(line):
listGroup = line[1]
#shuffle(listGroup) #shuffle the list
#listGroup = sorted(listGroup,key=lambda x:int(x[1]), reverse=True);
listGroup = sorted(listGroup,key=lambda x:int(x[1]));
l = len(listGroup)
numTrain = l * 8 / 10
numTest = l - numTrain
trainList = listGroup[:numTrain] #0.8 train set
testList = listGroup[numTrain:] #0.2 test set
#trainList = sorted(trainList,key=lambda x:int(x[1]), reverse=True); #sort by timestamp with descending
testList = [t[0] for t in testList] #take only id for test set
RecommenderDuplicate = [t[0] for t in trainList] #take only id for train set
Recommender = remove_duplicates(RecommenderDuplicate) #remove duplicate
if len(Recommender) > 4:
finalRecommender = Recommender[:5]
else:
finalRecommender = [-1,-1,-1,-1,-1]
numRec = len(finalRecommender)
finalRecommender[:numRec] = Recommender
scores = 0
numHit = 0
if len(t) ==0 or len(finalRecommender) <5:
scores = 0
else:
for t in testList:
if t == finalRecommender[0]:
scores = scores+1.0
numHit = numHit+1
elif t == finalRecommender[1]:
scores = scores+1.0
numHit = numHit+1
elif t == finalRecommender[2]:
scores = scores+1.0
numHit = numHit+1
elif t == finalRecommender[3]:
scores = scores+1.0
numHit = numHit+1
elif t == finalRecommender[4]:
scores = scores+1.0
numHit = numHit+1
else:
numHit = numHit+1
#scores = scores / numTest
if numHit != 0:
scores = scores / numHit
else:
scores = 0
#return scores
return scores
#return testList[0],testList[1],testList[2],testList[3],testList[4]
#if len(Recommender) > 4:
# return line[0], Recommender[0], Recommender[1], Recommender[2], Recommender[3], Recommender[4]
#return line[0],trainList[0],trainList[1],trainList[2],trainList[3],trainList[4],trainList[5],trainList[6],trainList[7]
#return l,len(trainList),len(testList)
finalScore = groupData.map(mruFunction)
finalScore.mean()#,finalScore.count()
Out[18]:
In [21]:
from collections import Counter
def mruFunction(line):
listGroup = line[1]
#shuffle(listGroup) #shuffle the list
#listGroup = sorted(listGroup,key=lambda x:int(x[1]), reverse=True);
listGroup = sorted(listGroup,key=lambda x:int(x[1]));
l = len(listGroup)
numTrain = l * 8 / 10
numTest = l - numTrain
trainList = listGroup[:numTrain] #0.9 train set
testList = listGroup[numTrain:] #0.1 test set
testList = [t[0] for t in testList] #take only id for test set
RecommenderDuplicate = [t[0] for t in trainList] #take only id for train set
#Recommender = remove_duplicates(RecommenderDuplicate) #remove duplicate
Recommender = Counter(RecommenderDuplicate).most_common()
Recommender = [t[0] for t in Recommender]
if len(Recommender) > 4:
finalRecommender = Recommender[:5]
else:
finalRecommender = [-1,-1,-1,-1,-1]
numRec = len(finalRecommender)
finalRecommender[:numRec] = Recommender
scores = 0
numHit = 0
if len(t) ==0 or len(finalRecommender) <5:
scores = 0
else:
for t in testList:
if t == finalRecommender[0]:
scores = scores+1.0
numHit = numHit+1
elif t == finalRecommender[1]:
scores = scores+0.8
numHit = numHit+1
elif t == finalRecommender[2]:
scores = scores+0.6
numHit = numHit+1
elif t == finalRecommender[3]:
scores = scores+0.4
numHit = numHit+1
elif t == finalRecommender[4]:
scores = scores+0.2
numHit = numHit+1
# else:
# numHit = numHit+1
#scores = scores / numTest
if numHit != 0:
scores = scores / numHit
else:
scores = 0
return scores
#return Recommender
#return testList[0],testList[1],testList[2],testList[3],testList[4]
#if len(Recommender) > 4:
# return line[0], Recommender[0], Recommender[1], Recommender[2], Recommender[3], Recommender[4]
#return line[0],trainList[0],trainList[1],trainList[2],trainList[3],trainList[4],trainList[5],trainList[6],trainList[7]
#return l,len(trainList),len(testList)
finalScore = groupData.map(mruFunction)
finalScore.mean()
Out[21]:
In [ ]: