df_shuffle.csv

Day of Week Index
Monday 0
Tuesday 1
Wednesday 2
Thursday 3
Friday 4
Saturday 5
Sunday 6
Hour Index
00:00:00 - 00:59:59 0
23:00:00 - 23:59:59 23

In [1]:
sc


Out[1]:
<pyspark.context.SparkContext at 0x105279b90>

Function


In [2]:
import pyproj
import csv
import shapely.geometry as geom
import fiona
import fiona.crs
import shapely
import rtree
import geopandas as gpd
import numpy as np
import operator
# just for display, not for calculation
import pandas as pd
import datetime

In [3]:
def countLine(partID, records):
    import pyproj
    import csv
    import shapely.geometry as geom
    import fiona
    import fiona.crs
    import shapely
    import rtree
    import geopandas as gpd
    import numpy as np
    import operator
    import pandas as pd
    import datetime
    
    shapefile = '../why_yellow_taxi/Buffer/entr_buffer_100_feet_epsg4269_nad83/entr_buffer_100_feet_epsg4269_nad83.shp'
    entr_buf = gpd.read_file(shapefile)
    entr_buf = entr_buf.to_crs(fiona.crs.from_epsg(2263))
    
    routes = ['Route_' + str(n) for n in range(1, 12)]
    entr2line = []
    for i in xrange(len(entr_buf)):
        lines = []
        for line in list(entr_buf.loc[:,routes].ix[i].dropna().values):
            try:
                line = str(int(line))
            except ValueError:
                pass
            lines.append(line)
        entr2line.append(lines)
    entr_buf['entr2line'] = entr2line
    
    index = rtree.Rtree()
    for idx, geometry in enumerate(entr_buf.geometry):
        index.insert(idx, geometry.bounds)
    

    entr_pair = {}
    pick_entr = {}
    drop_entr = {}
    entr_lines = {}
    
    proj = pyproj.Proj(init='epsg:2263', preserve_units=True)
    
    if partID==0:
        records.next()
    reader = csv.reader(records)
    for row in reader:
        if ((float(row[5])!=0) and float(row[9]!=0)):
            if row[1]:
                wd_h = datetime.datetime.strptime(row[1], '%Y-%m-%d %H:%M:%S')
                wd = wd_h.weekday()
                hour = wd_h.hour
            else:
                wd = None
                hour = None
    
            p = geom.Point(proj(float(row[5]), float(row[6])))
            d = geom.Point(proj(float(row[9]), float(row[10])))
            p_potential = index.intersection((p.x,p.y,p.x,p.y))
            d_potential = index.intersection((d.x,d.y,d.x,d.y))
            p_match = None # The first one match, should be the closest one? No!
            d_match = None
            
            for p_idx in p_potential:
                if entr_buf.geometry[p_idx].contains(p):
                    p_match = p_idx # print 'p',p_idx
                    p_lines = set(entr_buf.entr2line[p_idx])
                    break
            pick_entr[p_match] = pick_entr.get(p_match, 0)+1
            
            for d_idx in d_potential:
                if entr_buf.geometry[d_idx].contains(d):
                    d_match = d_idx # print 'd',d_idx
                    d_lines = set(entr_buf.entr2line[d_idx])
                    break
            drop_entr[d_match] = drop_entr.get(d_match, 0)+1
            
            if ((p_match and d_match) and (p_match != d_match)):
                dirct_lines = tuple(p_lines.intersection(d_lines))
                dirct_lines_wd_h = (dirct_lines, wd, hour)
                if dirct_lines:
                    entr_lines[dirct_lines_wd_h] = entr_lines.get(dirct_lines_wd_h, 0)+1
                if p_match > d_match:
                    pair = (d_match, p_match)
                else:
                    pair = (p_match, d_match)
                entr_pair[pair] = entr_pair.get(pair, 0)+1
                
    return entr_lines.items()

((Line, DayOfWeek, Hour), Count)

Midnight -> 00:00


In [43]:
def mapper(record):
    for key in record[0][0]:
        yield (key, record[0][1], record[0][2]), record[1]
        
def service(record):
    if (record[0][0] == 'B' and (record[0][1] in [5, 6])):
        pass
    elif (record[0][0] == 'W' and (record[0][1] in [5, 6])):
        pass
    elif (record[0][0] == 'C' and (record[0][2] in range(0,6))):
        pass
    elif (record[0][0] == 'B' and (record[0][1] in range(0,6))):
        pass
    elif (record[0][0] == 'S' and (record[0][1] in range(0,6))):
        pass
    elif (record[0][0] == 'W' and (record[0][1] in range(0,6))):
        pass
    else:
        return record
        
rdd = sc.textFile('./df_shuffle.csv')
counts = rdd.mapPartitionsWithIndex(countLine).flatMap(mapper). \
             reduceByKey(lambda x,y: x+y). \
             filter(service). \
             collect()

In [48]:
sorted(t.collect(), key=lambda x: x[1], reverse=True)


Out[48]:
[(('6', 5, 11), 2),
 (('1', 1, 13), 2),
 (('2', 1, 13), 1),
 (('3', 5, 20), 1),
 ((u'E', 3, 10), 1),
 (('1', 4, 17), 1),
 (('2', 4, 6), 1),
 ((u'M', 5, 8), 1),
 ((u'C', 5, 20), 1),
 ((u'A', 4, 17), 1),
 ((u'F', 1, 21), 1),
 ((u'L', 2, 20), 1),
 (('2', 1, 9), 1),
 (('1', 5, 20), 1),
 ((u'E', 5, 20), 1),
 ((u'N', 0, 18), 1),
 ((u'E', 5, 8), 1),
 ((u'C', 3, 10), 1),
 (('1', 3, 14), 1),
 (('1', 5, 2), 1),
 ((u'C', 1, 10), 1),
 ((u'C', 4, 17), 1),
 ((u'R', 0, 18), 1),
 ((u'A', 3, 10), 1),
 ((u'Q', 6, 19), 1),
 (('6', 0, 16), 1),
 (('6', 3, 21), 1),
 ((u'Q', 0, 15), 1),
 ((u'N', 0, 16), 1),
 (('4', 0, 16), 1),
 ((u'R', 0, 16), 1),
 ((u'E', 1, 10), 1),
 ((u'F', 4, 14), 1),
 (('1', 0, 19), 1),
 (('2', 5, 17), 1),
 ((u'A', 5, 2), 1),
 (('1', 3, 16), 1),
 (('1', 4, 3), 1),
 (('3', 3, 14), 1),
 ((u'L', 0, 20), 1),
 ((u'Q', 6, 21), 1),
 ((u'C', 2, 15), 1),
 (('2', 5, 20), 1),
 (('7', 4, 8), 1),
 (('6', 0, 13), 1),
 (('2', 3, 14), 1),
 (('1', 4, 6), 1),
 ((u'N', 0, 15), 1),
 (('1', 0, 12), 1),
 ((u'E', 6, 6), 1),
 ((u'E', 5, 19), 1),
 ((u'L', 4, 11), 1),
 ((u'F', 4, 7), 1),
 (('1', 5, 21), 1),
 ((u'N', 6, 21), 1),
 (('1', 3, 11), 1),
 ((u'R', 6, 21), 1),
 (('1', 5, 23), 1),
 ((u'C', 5, 13), 1),
 ((u'R', 6, 19), 1),
 ((u'A', 5, 19), 1),
 ((u'A', 1, 9), 1),
 (('1', 5, 17), 1),
 (('3', 1, 9), 1),
 ((u'Q', 0, 18), 1),
 (('6', 5, 14), 1),
 ((u'C', 5, 19), 1),
 ((u'A', 6, 6), 1),
 ((u'R', 0, 15), 1),
 (('5', 0, 16), 1),
 (('1', 6, 22), 1),
 ((u'F', 5, 14), 1),
 ((u'Q', 0, 16), 1),
 (('6', 2, 13), 1),
 ((u'A', 5, 13), 1),
 (('3', 4, 6), 1),
 ((u'M', 6, 14), 1),
 ((u'D', 5, 2), 1),
 ((u'C', 6, 6), 1),
 (('3', 1, 13), 1),
 (('3', 5, 17), 1),
 (('1', 0, 14), 1),
 ((u'C', 1, 9), 1),
 (('7', 2, 20), 1),
 ((u'N', 6, 19), 1),
 (('1', 3, 5), 1),
 ((u'E', 5, 13), 1),
 ((u'C', 1, 21), 1)]





In [27]:
# t = counts.filter(lambda x: not ((x[0][0] == '6') and (x[0][1] in [5,6]))). \
#             filter(lambda x: not ((x[0][0] == '1') and (x[0][1] in [5,6])))

In [47]:
# for i in range(len(counts)):
#     if counts[i][0][0] == '6':
#         print 'Day of week:{} ; Hour:{}, Counts:{}'.format(counts[i][0][1], counts[i][0][2], counts[i][1])

In [46]:
# counts_all = 0
# for i in range(len(counts)):
#     if (counts[i][0][0] == '6' and counts[i][0][2] == 13):  # Line 6, 13:00-14:00
#         counts_all += counts[i][1]
#         print 'Day of week:{} ; Hour:{}, Counts:{}'.format(counts[i][0][1], counts[i][0][2], counts[i][1])
# print "Line:{}, Hour:{} - Counts:{}".format('6', '13', counts_all)