In [11]:
from collections import namedtuple
rawEventsRdd = sc.textFile("/home/mert/yahoo/events.txt")
EventDataRow = namedtuple("EventDataRow", ["userId", "itemId", "ts", "latitude", "longitude", "city", "day_of_week", "time_of_day" , "event_type"])
def parseRawData(line):
    lineSplit = line.split("\t")
    return EventDataRow(userId=lineSplit[0],
                      itemId=lineSplit[1],
                      ts=int(lineSplit[2]),
                      latitude=float(lineSplit[3]),
                      longitude=float(lineSplit[4]),
                      city=lineSplit[5],
                      day_of_week=int(lineSplit[6]),
                      time_of_day=int(lineSplit[7]),
                      event_type=lineSplit[-1],
    )
eventsRdd = rawEventsRdd.map(parseRawData).cache()
userIdConversionDictionary = eventsRdd.map(lambda x: x.userId).distinct().zipWithIndex().collectAsMap()
userIdConversionDictionaryBroadcast = sc.broadcast(userIdConversionDictionary)
itemIdConversionDictionary = eventsRdd.map(lambda x: x.itemId).distinct().zipWithIndex().collectAsMap()
itemIdConversionDictionaryBroadcast = sc.broadcast(itemIdConversionDictionary)
cityConversionDictionary = eventsRdd.map(lambda x: x.city).distinct().zipWithIndex().collectAsMap()
cityConversionDictionaryBroadcast = sc.broadcast(cityConversionDictionary)

eventsConvertedRdd = eventsRdd.map(lambda x: EventDataRow(
    userId=userIdConversionDictionaryBroadcast.value[x.userId],
    itemId=itemIdConversionDictionaryBroadcast.value[x.itemId],
    ts=x.ts,
    latitude=x.latitude,
    longitude=x.longitude,
    city=cityConversionDictionaryBroadcast.value[x.city],
    day_of_week=x.day_of_week,
    time_of_day=x.time_of_day,
    event_type=x.event_type
    ))
eventsConvertedRdd.take(2)


---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-11-d7728d16274b> in <module>()
     15     )
     16 eventsRdd = rawEventsRdd.map(parseRawData).cache()
---> 17 userIdConversionDictionary = eventsRdd.map(lambda x: x.userId).distinct().zipWithIndex().collectAsMap()
     18 userIdConversionDictionaryBroadcast = sc.broadcast(userIdConversionDictionary)
     19 itemIdConversionDictionary = eventsRdd.map(lambda x: x.itemId).distinct().zipWithIndex().collectAsMap()

/home/mert/spark-1.4.1-bin-hadoop2.6/python/pyspark/rdd.pyc in zipWithIndex(self)
   2065         starts = [0]
   2066         if self.getNumPartitions() > 1:
-> 2067             nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect()
   2068             for i in range(len(nums) - 1):
   2069                 starts.append(starts[-1] + nums[i])

/home/mert/spark-1.4.1-bin-hadoop2.6/python/pyspark/rdd.pyc in collect(self)
    755         """
    756         with SCCallSiteSync(self.context) as css:
--> 757             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    758         return list(_load_from_socket(port, self._jrdd_deserializer))
    759 

/home/mert/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    534             END_COMMAND_PART
    535 
--> 536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
    538                 self.target_id, self.name)

/home/mert/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry)
    362         connection = self._get_connection()
    363         try:
--> 364             response = connection.send_command(command)
    365             self._give_back_connection(connection)
    366         except Py4JNetworkError:

/home/mert/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in send_command(self, command)
    471         try:
    472             self.socket.sendall(command.encode('utf-8'))
--> 473             answer = smart_decode(self.stream.readline()[:-1])
    474             logger.debug("Answer received: {0}".format(answer))
    475             # Happens when a the other end is dead. There might be an empty

/usr/lib/python2.7/socket.pyc in readline(self, size)
    432                     try:
    433                         while data != "\n":
--> 434                             data = recv(1)
    435                             if not data:
    436                                 break

KeyboardInterrupt: 

In [ ]:
finalRDD = eventsConvertedRdd.map(lambda x: [
    x.userId,(
    x.itemId,
    x.ts,
    x.latitude,
    x.longitude,)
    ])
finalRDD.take(3)
#groupData = map((lambda (x,y): (x, list(y))), sorted(finalRDD.groupByKey().collect()))
#groupData = map((lambda (x,y): (x, sorted(list(y),key=lambda a: a[1]))), sorted(finalRDD.groupByKey()))
groupData = finalRDD.groupByKey().map(lambda (x,y): (x, sorted(list(y),key=lambda a: a[1])))

In [ ]:
from math import radians, cos, sin, asin, sqrt

def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles
    return c * r
def detectMovement(x):
    data = x[1]
    newData = [(data[0][0], data[0][1], data[0][2], data[0][3], 1)]
    for i in xrange(1,len(data)):
        event = data[i]
        distance = haversine(event[3],event[2], data[i-1][3], data[i-1][2]) * 1000 #in meters
        time_difference = event[1] - newData[i-1][1] #in seconds
        moving = 1 #not available 
        if time_difference <= 300: #if 2 consecutive events are more than 300 seconds away, the movement is not available
            velocity =  distance/time_difference if time_difference > 0 else -1
            if velocity < 0:
                moving = 1; #not available
            elif velocity >= 0 and velocity <= 1:
                moving = 2  #standing still
            elif velocity <=2.4:
                moving = 3 #walking spead
            else:
                moving = 4 #faster
        newData.append((event[0],event[1],event[2],event[3], moving))
    return (x[0], newData)
    #return x
#print haversine(elem[0][1][2][1],elem[0][1][1][1],elem[6][1][2][1],elem[6][1][1][1])
groupData = groupData.map(detectMovement).cache()

#groupData.take(1)

In [ ]:
import datetime
from math import radians, cos, sin, asin, sqrt

def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles
    return c * r

def convertLocation(line):
    listGroup = line[1]
    workGroup = [x for x in listGroup if datetime.datetime.fromtimestamp(int(x[1])).hour >= 6 and  
            datetime.datetime.fromtimestamp(int(x[1])).hour <= 18]
    numNearLocation = []
    i = 0
    for x in workGroup:
        numNearLocation.append(0);
        for y in workGroup:
            if haversine(x[3], x[2], y[3], y[2]) < 0.1:
                numNearLocation[i] = numNearLocation[i] + 1
        i = i + 1
    if len(numNearLocation) > 0:
        index_work = numNearLocation.index(max(numNearLocation))
    else:
        index_work = -1
#     workGroup = [(x[0],x[1],x[2],x[3],1) if haversine(x[3], x[2], workGroup[index_work][3], workGroup[index_work][2]) < 0.1 
#                  else (x[0],x[1],x[2],x[3],0) for x in workGroup]
    #workGroup3 = [(x[0],x[1],0)  for x in workGroup if haversine(x[3], x[2], workGroup[index][3], workGroup[index][2]) >= 0.1]        
    
    homeGroup = [x for x in listGroup if datetime.datetime.fromtimestamp(int(x[1])).hour < 6 or
            datetime.datetime.fromtimestamp(int(x[1])).hour > 18]
    
    numNearLocation = []
    i = 0
    for x in homeGroup:
        numNearLocation.append(0);
        for y in homeGroup:
            if haversine(x[3], x[2], y[3], y[2]) < 0.1:
                numNearLocation[i] = numNearLocation[i] + 1
        i = i + 1
    if len(numNearLocation) > 0:
        index_home = numNearLocation.index(max(numNearLocation))
    else:
        index_home = -1
#     homeGroup = [(x[0],x[1],x[2],x[3],2) if haversine(x[3], x[2], homeGroup[index_home][3], homeGroup[index_home][2]) < 0.1 
#                  else (x[0],x[1],x[2],x[3],0) for x in homeGroup]
    
    if index_home != -1 and index_work != -1:
        listGroup = [(x[0],x[1],x[4],1) if haversine(x[3], x[2], workGroup[index_work][3], workGroup[index_work][2]) < 0.01 
                 else( 
                    (x[0],x[1],x[4],2) if haversine(x[3], x[2], homeGroup[index_home][3], homeGroup[index_home][2]) < 0.01
                    else (x[0],x[1],x[4],3) 
                    )
                 for x in listGroup]
    else:
        listGroup = [(x[0],x[1],x[4],3)
                 for x in listGroup]
    
    
    listGroup = [(x[0],x[2],x[3],1) if datetime.datetime.fromtimestamp(int(x[1])).hour >= 8 and
                datetime.datetime.fromtimestamp(int(x[1])).hour <= 13
                    else(
                      (x[0],x[2],x[3],2) if datetime.datetime.fromtimestamp(int(x[1])).hour >= 13 and
                        datetime.datetime.fromtimestamp(int(x[1])).hour <= 18
                      else (x[0],x[2],x[3],3)
                    )
                for x in listGroup]
    
    
    #time = datetime.datetime.fromtimestamp(int(line[1][0][1]))
    #line[1][1] = datetime.datetime.fromtimestamp(int("1284101485")).strftime('%Y-%m-%d %H:%M:%S')
    #return line[0],(workGroup+homeGroup)[:20],listGroup[:20]#,len(workGroup+homeGroup),len(workGroup),len(homeGroup)
    return line[0],listGroup
final = groupData.map(convertLocation)

In [ ]:
from random import shuffle

def topFiveSortedList(oldList, context, probability):
    for x in range(0,len(oldList)-1):
        if x == 0 and oldList[x][1] < probability:
            oldList[x] = (context,probability)
            if oldList[x+1][1] < oldList[x][1]:
                temp = oldList[x+1] 
                oldList[x+1] = oldList[x]
                oldList[x] = temp 
            else:
                break
        elif oldList[x+1][1] < oldList[x][1]:
            temp = oldList[x+1] 
            oldList[x+1] = oldList[x]
            oldList[x] = temp 
        else:
            break
    #return sorted(oldList,key=lambda x: -x[1])
    return oldList      

def remove_duplicates(values):
    output = []
    seen = set()
    for value in values:
        # If value has not been encountered yet,
        # ... add it to both list and set.
        if value not in seen:
            output.append(value)
            seen.add(value)
    return output

def bayesian(line):
    listGroup = line[1]
    #shuffle(listGroup)                  #shuffle the list
    l = len(listGroup) 
    numTrain = l * 8 / 10
    numTest = l - numTrain
    trainList = listGroup[:numTrain]      #0.8 train set
    testList = listGroup[numTrain:]       #0.2 test set
    
    #trainRDD = sc.parallelize(trainList).count()
    newTestList = []
    for t in testList:
        context = [x for x in trainList if x[1]==t[1] and x[2]==t[2] and x[3]==t[3]]
        numContext = float(len(context))
        if numTrain != 0:
            p_context = numContext/numTrain  #P(C1i, C2j, C3k)
        else:
            p_context = 0
        p_app = [(-1,0),(-1,0),(-1,0),(-1,0),(-1,0)]
        context_no_duplicate = remove_duplicates(context)
        for c in context_no_duplicate:
            appi = [x for x in trainList if x[0]==c[0]]
            numAppi = float(len(appi))
            if numTrain != 0:
                p_appi = numAppi/numTrain
            else:
                p_appi = 0
            contextAppi = [x for x in trainList if x[0]==c[0] and x[1]==c[1] and x[2]==c[2] and x[3]==c[3]]
            if numAppi != 0:    #P(C1i, C2j, C3k | APPid)
                p_contextAppi = len(contextAppi)/numAppi 
            else:
                p_contextAppi = 0
            if p_context != 0:  #P(APPid | C1i,C2j,C3k = P(C1i, C2j, C3k | APPid)  P(APPid) /P(C1i, C2j, C3k)
                p = p_contextAppi * p_appi / p_context
            else:
                p = 0
            p_app = topFiveSortedList(p_app,c[0],p)
        p_app = sorted(p_app,key=lambda x: -x[1])
        app_rec = map(lambda x:x[0],p_app[:5])
        newTestList.append((t[0],app_rec))
    scores = 0
    numHit = 0

    for t in newTestList:
        if t[0] == t[1][0]:
            scores = scores+1
            numHit = numHit+1
        elif t[0] == t[1][1]:
            scores = scores+0.8
            numHit = numHit+1
        elif t[0] == t[1][2]:
            scores = scores+0.6
            numHit = numHit+1
        elif t[0] == t[1][3]:
            scores = scores+0.4
            numHit = numHit+1
        elif t[0] == t[1][4]:
            scores = scores+0.2
            numHit = numHit+1
#         else:
#             numHit = numHit+1
    #scores = scores / numTest
    if numHit != 0:
        scores = scores / numHit
    else:
        scores = 0
    #return newTestList[:20]
    return scores
result = final.map(bayesian)
result.mean()

In [57]:



Out[57]:
[(1, 1, 1, 1), (1, 1, 1, 2)]

In [23]:



Out[23]:
[1, 1, 1]

In [ ]: