In [1]:
from collections import namedtuple
rawEventsRdd = sc.textFile("/home/axinggu/thesis/code/test/filteredEvents.txt")
EventDataRow = namedtuple("EventDataRow", ["userId", "itemId", "ts", "latitude", "longitude", "city", "day_of_week", "time_of_day" , "event_type"])
def parseRawData(line):
lineSplit = line.split("\t")
return EventDataRow(userId=lineSplit[0],
itemId=lineSplit[1],
ts=int(lineSplit[2]),
latitude=float(lineSplit[3]),
longitude=float(lineSplit[4]),
city=lineSplit[5],
day_of_week=int(lineSplit[6]),
time_of_day=int(lineSplit[7]),
event_type=lineSplit[-1],
)
eventsRdd = rawEventsRdd.map(parseRawData).cache()
userIdConversionDictionary = eventsRdd.map(lambda x: x.userId).distinct().zipWithIndex().collectAsMap()
userIdConversionDictionaryBroadcast = sc.broadcast(userIdConversionDictionary)
itemIdConversionDictionary = eventsRdd.map(lambda x: x.itemId).distinct().zipWithIndex().collectAsMap()
itemIdConversionDictionaryBroadcast = sc.broadcast(itemIdConversionDictionary)
cityConversionDictionary = eventsRdd.map(lambda x: x.city).distinct().zipWithIndex().collectAsMap()
cityConversionDictionaryBroadcast = sc.broadcast(cityConversionDictionary)
eventsConvertedRdd = eventsRdd.map(lambda x: EventDataRow(
userId=userIdConversionDictionaryBroadcast.value[x.userId],
itemId=itemIdConversionDictionaryBroadcast.value[x.itemId],
ts=x.ts,
latitude=x.latitude,
longitude=x.longitude,
city=cityConversionDictionaryBroadcast.value[x.city],
day_of_week=x.day_of_week,
time_of_day=x.time_of_day,
event_type=x.event_type
))
eventsConvertedRdd.take(2)
Out[1]:
In [2]:
finalRDD = eventsConvertedRdd.map(lambda x: [
x.userId,(
x.itemId,
x.ts,
x.latitude,
x.longitude,)
])
finalRDD.take(3)
Out[2]:
In [3]:
groupData = map((lambda (x,y): (x, list(y))), sorted(finalRDD.groupByKey().collect()))
In [7]:
import datetime
from math import radians, cos, sin, asin, sqrt
def haversine(lon1, lat1, lon2, lat2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6371 # Radius of earth in kilometers. Use 3956 for miles
return c * r
def convertLocation(line):
listGroup = line[1]
workGroup = [x for x in listGroup if datetime.datetime.fromtimestamp(int(x[1])).hour > 6 and
datetime.datetime.fromtimestamp(int(x[1])).hour < 19]
numNearLocation = []
i = 0
for x in workGroup:
numNearLocation.append(0);
for y in workGroup:
if haversine(x[3], x[2], y[3], y[2]) < 0.1:
numNearLocation[i] = numNearLocation[i] + 1
i = i + 1
index_work = numNearLocation.index(max(numNearLocation))
# workGroup = [(x[0],x[1],x[2],x[3],1) if haversine(x[3], x[2], workGroup[index_work][3], workGroup[index_work][2]) < 0.1
# else (x[0],x[1],x[2],x[3],0) for x in workGroup]
#workGroup3 = [(x[0],x[1],0) for x in workGroup if haversine(x[3], x[2], workGroup[index][3], workGroup[index][2]) >= 0.1]
homeGroup = [x for x in listGroup if datetime.datetime.fromtimestamp(int(x[1])).hour <= 6 or
datetime.datetime.fromtimestamp(int(x[1])).hour >= 19]
numNearLocation = []
i = 0
for x in homeGroup:
numNearLocation.append(0);
for y in homeGroup:
if haversine(x[3], x[2], y[3], y[2]) < 0.1:
numNearLocation[i] = numNearLocation[i] + 1
i = i + 1
index_home = numNearLocation.index(max(numNearLocation))
# homeGroup = [(x[0],x[1],x[2],x[3],2) if haversine(x[3], x[2], homeGroup[index_home][3], homeGroup[index_home][2]) < 0.1
# else (x[0],x[1],x[2],x[3],0) for x in homeGroup]
listGroup = [(x[0],x[1],x[2],x[3],1) if haversine(x[3], x[2], workGroup[index_work][3], workGroup[index_work][2]) < 0.1
else(
(x[0],x[1],x[2],x[3],2) if haversine(x[3], x[2], homeGroup[index_home][3], homeGroup[index_home][2]) < 0.1
else (x[0],x[1],x[2],x[3],0)
)
for x in listGroup]
#time = datetime.datetime.fromtimestamp(int(line[1][0][1]))
#line[1][1] = datetime.datetime.fromtimestamp(int("1284101485")).strftime('%Y-%m-%d %H:%M:%S')
#return line[0],(workGroup+homeGroup)[:20],listGroup[:20]#,len(workGroup+homeGroup),len(workGroup),len(homeGroup)
return line[0],listGroup[:20]
final = sc.parallelize(groupData).map(convertLocation)
final.collect()
Out[7]:
In [30]:
23<=6 & 23>17
Out[30]:
In [ ]: