In [1]:
from collections import namedtuple
rawEventsRdd = sc.textFile("/home/axinggu/thesis/code/test/filteredEvents.txt")
EventDataRow = namedtuple("EventDataRow", ["userId", "itemId", "ts", "latitude", "longitude", "city", "day_of_week", "time_of_day" , "event_type"])
def parseRawData(line):
    lineSplit = line.split("\t")
    return EventDataRow(userId=lineSplit[0],
                      itemId=lineSplit[1],
                      ts=int(lineSplit[2]),
                      latitude=float(lineSplit[3]),
                      longitude=float(lineSplit[4]),
                      city=lineSplit[5],
                      day_of_week=int(lineSplit[6]),
                      time_of_day=int(lineSplit[7]),
                      event_type=lineSplit[-1],
    )
eventsRdd = rawEventsRdd.map(parseRawData).cache()
userIdConversionDictionary = eventsRdd.map(lambda x: x.userId).distinct().zipWithIndex().collectAsMap()
userIdConversionDictionaryBroadcast = sc.broadcast(userIdConversionDictionary)
itemIdConversionDictionary = eventsRdd.map(lambda x: x.itemId).distinct().zipWithIndex().collectAsMap()
itemIdConversionDictionaryBroadcast = sc.broadcast(itemIdConversionDictionary)
cityConversionDictionary = eventsRdd.map(lambda x: x.city).distinct().zipWithIndex().collectAsMap()
cityConversionDictionaryBroadcast = sc.broadcast(cityConversionDictionary)

eventsConvertedRdd = eventsRdd.map(lambda x: EventDataRow(
    userId=userIdConversionDictionaryBroadcast.value[x.userId],
    itemId=itemIdConversionDictionaryBroadcast.value[x.itemId],
    ts=x.ts,
    latitude=x.latitude,
    longitude=x.longitude,
    city=cityConversionDictionaryBroadcast.value[x.city],
    day_of_week=x.day_of_week,
    time_of_day=x.time_of_day,
    event_type=x.event_type
    ))
eventsConvertedRdd.take(2)


Out[1]:
[EventDataRow(userId=1, itemId=41, ts=1421521691, latitude=47.23505, longitude=-122.534698, city=9, day_of_week=5, time_of_day=11, event_type=u'App_Opened'),
 EventDataRow(userId=1, itemId=46, ts=1421558502, latitude=47.23505, longitude=-122.534698, city=9, day_of_week=5, time_of_day=21, event_type=u'App_Opened')]

In [2]:
finalRDD = eventsConvertedRdd.map(lambda x: [
    x.userId,(
    x.itemId,
    x.ts,
    x.latitude,
    x.longitude,)
    ])
finalRDD.take(3)


Out[2]:
[[1, (41, 1421521691, 47.23505, -122.534698)],
 [1, (46, 1421558502, 47.23505, -122.534698)],
 [1, (44, 1421017853, 47.237461, -122.530899)]]

In [3]:
groupData = map((lambda (x,y): (x, list(y))), sorted(finalRDD.groupByKey().collect()))

In [7]:
import datetime
from math import radians, cos, sin, asin, sqrt

def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles
    return c * r

def convertLocation(line):
    listGroup = line[1]
    workGroup = [x for x in listGroup if datetime.datetime.fromtimestamp(int(x[1])).hour > 6 and  
            datetime.datetime.fromtimestamp(int(x[1])).hour < 19]
    numNearLocation = []
    i = 0
    for x in workGroup:
        numNearLocation.append(0);
        for y in workGroup:
            if haversine(x[3], x[2], y[3], y[2]) < 0.1:
                numNearLocation[i] = numNearLocation[i] + 1
        i = i + 1
    index_work = numNearLocation.index(max(numNearLocation))
#     workGroup = [(x[0],x[1],x[2],x[3],1) if haversine(x[3], x[2], workGroup[index_work][3], workGroup[index_work][2]) < 0.1 
#                  else (x[0],x[1],x[2],x[3],0) for x in workGroup]
    #workGroup3 = [(x[0],x[1],0)  for x in workGroup if haversine(x[3], x[2], workGroup[index][3], workGroup[index][2]) >= 0.1]        
    
    homeGroup = [x for x in listGroup if datetime.datetime.fromtimestamp(int(x[1])).hour <= 6 or
            datetime.datetime.fromtimestamp(int(x[1])).hour >= 19]
    
    numNearLocation = []
    i = 0
    for x in homeGroup:
        numNearLocation.append(0);
        for y in homeGroup:
            if haversine(x[3], x[2], y[3], y[2]) < 0.1:
                numNearLocation[i] = numNearLocation[i] + 1
        i = i + 1
    index_home = numNearLocation.index(max(numNearLocation))
#     homeGroup = [(x[0],x[1],x[2],x[3],2) if haversine(x[3], x[2], homeGroup[index_home][3], homeGroup[index_home][2]) < 0.1 
#                  else (x[0],x[1],x[2],x[3],0) for x in homeGroup]
    
    listGroup = [(x[0],x[1],x[2],x[3],1) if haversine(x[3], x[2], workGroup[index_work][3], workGroup[index_work][2]) < 0.1 
                 else( 
                    (x[0],x[1],x[2],x[3],2) if haversine(x[3], x[2], homeGroup[index_home][3], homeGroup[index_home][2]) < 0.1 
                    else (x[0],x[1],x[2],x[3],0) 
                    )
                 for x in listGroup]
    
    
    #time = datetime.datetime.fromtimestamp(int(line[1][0][1]))
    #line[1][1] = datetime.datetime.fromtimestamp(int("1284101485")).strftime('%Y-%m-%d %H:%M:%S')
    #return line[0],(workGroup+homeGroup)[:20],listGroup[:20]#,len(workGroup+homeGroup),len(workGroup),len(homeGroup)
    return line[0],listGroup[:20]
final = sc.parallelize(groupData).map(convertLocation)
final.collect()


Out[7]:
[(0,
  [(32, 1421285002, 41.953564, -88.036819, 1),
   (24, 1421123817, 41.96859, -88.151978, 0),
   (26, 1421368655, 41.953526, -88.037025, 1),
   (26, 1421080209, 41.953758, -88.036804, 1),
   (7, 1420992327, 41.953747, -88.036812, 1),
   (26, 1421026674, 41.95348, -88.036957, 1),
   (25, 1419939212, 41.953773, -88.03685, 1),
   (20, 1421142319, 41.953785, -88.036797, 1),
   (25, 1419947132, 41.953484, -88.036652, 1),
   (32, 1421152879, 41.953419, -88.036568, 1),
   (32, 1421300727, 41.954918, -88.119896, 0),
   (26, 1421155363, 41.953365, -88.036873, 1),
   (13, 1421110209, 41.952587, -88.037277, 2),
   (7, 1421020415, 41.953445, -88.036758, 1),
   (26, 1421370765, 41.953526, -88.037025, 1),
   (25, 1421185205, 41.95377, -88.036842, 1),
   (7, 1421142300, 41.953766, -88.03685, 1),
   (34, 1421300685, 41.952835, -88.126961, 0),
   (9, 1421165029, 41.95359, -88.036736, 1),
   (32, 1419986405, 41.953476, -88.036919, 1)]),
 (1,
  [(41, 1421521691, 47.23505, -122.534698, 0),
   (46, 1421558502, 47.23505, -122.534698, 0),
   (44, 1421017853, 47.237461, -122.530899, 1),
   (18, 1421555801, 47.23505, -122.534698, 0),
   (36, 1421555768, 47.23505, -122.534698, 0),
   (33, 1421168731, 47.244587, -122.517036, 0),
   (20, 1421375233, 47.23745, -122.530891, 1),
   (44, 1421096549, 47.112923, -122.422501, 0),
   (39, 1421044862, 47.237453, -122.530876, 1),
   (18, 1421278854, 47.233555, -122.536385, 0),
   (20, 1421078608, 47.252182, -122.504242, 0),
   (44, 1421279108, 47.233555, -122.536385, 0),
   (36, 1421091256, 47.254436, -122.441696, 0),
   (28, 1421102558, 47.107914, -122.427254, 0),
   (43, 1420965030, 47.237564, -122.530746, 1),
   (36, 1421033212, 47.237484, -122.530716, 1),
   (44, 1421147075, 47.233387, -122.536057, 0),
   (46, 1421224673, 47.237434, -122.531052, 1),
   (28, 1421446100, 47.241306, -122.503441, 0),
   (26, 1421318118, 47.243618, -122.513252, 0)])]

In [30]:
23<=6 & 23>17


Out[30]:
False

In [ ]: