In [2]:
sc


Out[2]:
<pyspark.context.SparkContext at 0x105268b90>

In [3]:
import pyproj
import csv
import shapely.geometry as geom
import fiona
import fiona.crs
import shapely
import rtree
import geopandas as gpd
import numpy as np
import operator
# just for display, not for calculation
import pandas as pd

In [4]:
# small sample
taxi = pd.read_pickle('../why_yellow_taxi/Data/df_shuffle.pkl')
del taxi['Unnamed: 0']
taxi.to_csv('df_shuffle.csv', index=False)
taxi = pd.read_csv('./df_shuffle.csv')
print taxi.columns
taxi.head(2)
# test5 = taxi.head()[['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']]


Index([u'VendorID', u'tpep_pickup_datetime', u'tpep_dropoff_datetime',
       u'passenger_count', u'trip_distance', u'pickup_longitude',
       u'pickup_latitude', u'RatecodeID', u'store_and_fwd_flag',
       u'dropoff_longitude', u'dropoff_latitude', u'payment_type',
       u'fare_amount', u'extra', u'mta_tax', u'tip_amount', u'tolls_amount',
       u'improvement_surcharge', u'total_amount'],
      dtype='object')
Out[4]:
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RatecodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount
0 2 2016-01-19 09:57:48 2016-01-19 10:15:50 3 1.16 -73.984688 40.759109 1 N -74.000557 40.757912 1 12.0 0.0 0.5 2.56 0.0 0.3 15.36
1 2 2016-01-15 04:00:16 2016-01-15 04:02:03 1 0.76 -73.988274 40.731468 1 N -73.981339 40.741234 2 4.5 0.5 0.5 0.00 0.0 0.3 5.80

In [5]:
shapefile = '../why_yellow_taxi/Buffer/entr_buffer_100_feet_epsg4269_nad83/entr_buffer_100_feet_epsg4269_nad83.shp'
entr_buf = gpd.read_file(shapefile)
entr_buf.crs


Out[5]:
{u'datum': u'NAD83',
 u'lat_0': 40.1666666667,
 u'lat_1': 41.0333333333,
 u'lat_2': 40.6666666667,
 u'lon_0': -74,
 u'no_defs': True,
 u'proj': u'lcc',
 u'units': u'us-ft',
 u'x_0': 300000.0000000001,
 u'y_0': 0}

In [6]:
entr_buf.head(2)


Out[6]:
ADA ADA_Notes Corner Division East_West_ Entrance_T Entry Exit_Only Free_Cross GEOID ... Route_7 Route_8 Route_9 Staff_Hour Staffing Station_La Station_Lo Station_Na Vending geometry
0 FALSE None NW BMT 23rd Ave Stair YES None TRUE 36081 ... None NaN NaN None FULL 40.775036 -73.912034 Ditmars Blvd YES POLYGON ((1008702.708067201 221696.7163773818,...
1 FALSE None NE BMT 23rd Ave Stair YES None TRUE 36081 ... None NaN NaN None FULL 40.775036 -73.912034 Ditmars Blvd YES POLYGON ((1008681.505385144 221573.1859671536,...

2 rows × 33 columns


In [37]:
entr_buf = entr_buf.to_crs(fiona.crs.from_epsg(2263))
routes = ['Route_' + str(n) for n in range(1, 12)]
entr2line = []
for i in xrange(len(entr_buf)):
    #lines = [line for line in list(entr_buf.loc[:,routes].ix[i].dropna().values)]
    lines = []
    for line in list(entr_buf.loc[:,routes].ix[i].dropna().values):
        try:
            line = str(int(line))
        except ValueError:
            pass
        lines.append(line)
    entr2line.append(lines)
entr_buf['entr2line'] = entr2line

entr_buf.head(2)


Out[37]:
ADA ADA_Notes Corner Division East_West_ Entrance_T Entry Exit_Only Free_Cross GEOID ... Route_8 Route_9 Staff_Hour Staffing Station_La Station_Lo Station_Na Vending geometry entr2line
0 FALSE None NW BMT 23rd Ave Stair YES None TRUE 36081 ... NaN NaN None FULL 40.775036 -73.912034 Ditmars Blvd YES POLYGON ((1008702.708067203 221696.7163890629,... [N, Q]
1 FALSE None NE BMT 23rd Ave Stair YES None TRUE 36081 ... NaN NaN None FULL 40.775036 -73.912034 Ditmars Blvd YES POLYGON ((1008681.505385145 221573.1859788439,... [N, Q]

2 rows × 34 columns


In [8]:
len(taxi)


Out[8]:
10900

In [9]:
len(entr_buf)


Out[9]:
1868

In [11]:
index = rtree.Rtree()
for idx, geometry in enumerate(entr_buf.geometry):
    index.insert(idx, geometry.bounds)

In [44]:
proj = pyproj.Proj(init='epsg:2263', preserve_units=True) 
#?preserve_units=True

entr_pair = {}
pick_entr = {}
drop_entr = {}
entr_lines = {}

with open('./df_shuffle.csv', 'rb') as fi:
    reader = csv.reader(fi)
    print reader.next()
    for row in reader:
        if ((float(row[5])!=0) and float(row[9]!=0)):
            p = geom.Point(proj(float(row[5]), float(row[6])))
            d = geom.Point(proj(float(row[9]), float(row[10])))
            p_potential = index.intersection((p.x,p.y,p.x,p.y))
            d_potential = index.intersection((d.x,d.y,d.x,d.y))
            # print p_potential
            p_match = None # The first one match, should be the closest one? No!
            d_match = None
            
            for p_idx in p_potential:
                if entr_buf.geometry[p_idx].contains(p):
                    p_match = p_idx # print 'p',p_idx
                    p_lines = set(entr_buf.entr2line[p_idx])
                    break
            pick_entr[p_match] = pick_entr.get(p_match, 0)+1
            
            for d_idx in d_potential:
                if entr_buf.geometry[d_idx].contains(d):
                    d_match = d_idx # print 'd',d_idx
                    d_lines = set(entr_buf.entr2line[d_idx])
                    break
            drop_entr[d_match] = drop_entr.get(d_match, 0)+1
            
            if ((p_match and d_match) and (p_match != d_match)):
                dirct_lines = tuple(p_lines.intersection(d_lines))
                if dirct_lines:
                    entr_lines[dirct_lines] = entr_lines.get(dirct_lines, 0)+1
                if p_match > d_match:
                    pair = (d_match, p_match)
                else:
                    pair = (p_match, d_match)
                entr_pair[pair] = entr_pair.get(pair, 0)+1


['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'RatecodeID', 'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']

In [45]:
print len(taxi)

print np.unique(pick_entr.keys()).shape
print sum(pick_entr.values())

print np.unique(drop_entr.keys()).shape
print sum(drop_entr.values())


10900
(438,)
10748
(402,)
10748

In [46]:
print sum(entr_pair.values())
sorted(entr_pair.items(), key=operator.itemgetter(1), reverse=True)[:10]


119
Out[46]:
[((1138, 1549), 2),
 ((1732, 1802), 1),
 ((779, 1865), 1),
 ((329, 759), 1),
 ((316, 512), 1),
 ((365, 991), 1),
 ((1131, 1677), 1),
 ((311, 945), 1),
 ((1549, 1733), 1),
 ((746, 1816), 1)]

In [47]:
entr_buf.columns.values


Out[47]:
array([u'ADA', u'ADA_Notes', u'Corner', u'Division', u'East_West_',
       u'Entrance_T', u'Entry', u'Exit_Only', u'Free_Cross', u'GEOID',
       u'Latitude', u'Line', u'Longitude', u'NAMELSAD', u'North_Sout',
       u'Route_1', u'Route_10', u'Route_11', u'Route_2', u'Route_3',
       u'Route_4', u'Route_5', u'Route_6', u'Route_7', u'Route_8',
       u'Route_9', u'Staff_Hour', u'Staffing', u'Station_La',
       u'Station_Lo', u'Station_Na', u'Vending', 'geometry', 'entr2line'], dtype=object)

In [48]:
entr_lines


Out[48]:
{('1',): 11,
 ('1', '3', '2'): 5,
 ('6',): 6,
 ('7',): 2,
 (u'A', '1', u'C'): 1,
 (u'A', '1', u'C', u'B', u'D'): 1,
 (u'A', u'C', '2', '3'): 1,
 (u'A', u'C', u'E'): 4,
 (u'C',): 2,
 (u'C', u'E'): 2,
 (u'E', u'M'): 1,
 (u'F',): 4,
 (u'L',): 3,
 (u'M',): 1,
 (u'N', u'Q', u'R', '5', '4', '6'): 1,
 (u'Q', u'R', u'N'): 4}

In [ ]:

Function


In [50]:
def countLine(partID, records):
    import pyproj
    import csv
    import shapely.geometry as geom
    import fiona
    import fiona.crs
    import shapely
    import rtree
    import geopandas as gpd
    import numpy as np
    import operator
    import pandas as pd
    
    taxi = pd.read_pickle('../why_yellow_taxi/Data/df_shuffle.pkl')
    del taxi['Unnamed: 0']
    shapefile = '../why_yellow_taxi/Buffer/entr_buffer_100_feet_epsg4269_nad83/entr_buffer_100_feet_epsg4269_nad83.shp'
    entr_buf = gpd.read_file(shapefile)
    entr_buf = entr_buf.to_crs(fiona.crs.from_epsg(2263))
    
    routes = ['Route_' + str(n) for n in range(1, 12)]
    entr2line = []
    for i in xrange(len(entr_buf)):
        lines = []
        for line in list(entr_buf.loc[:,routes].ix[i].dropna().values):
            try:
                line = str(int(line))
            except ValueError:
                pass
            lines.append(line)
        entr2line.append(lines)
    entr_buf['entr2line'] = entr2line
    
    index = rtree.Rtree()
    for idx, geometry in enumerate(entr_buf.geometry):
        index.insert(idx, geometry.bounds)
    

    entr_pair = {}
    pick_entr = {}
    drop_entr = {}
    entr_lines = {}
    
    proj = pyproj.Proj(init='epsg:2263', preserve_units=True)
    
    if partID==0:
        records.next()
    reader = csv.reader(records)
    for row in reader:
        if ((float(row[5])!=0) and float(row[9]!=0)):
            p = geom.Point(proj(float(row[5]), float(row[6])))
            d = geom.Point(proj(float(row[9]), float(row[10])))
            p_potential = index.intersection((p.x,p.y,p.x,p.y))
            d_potential = index.intersection((d.x,d.y,d.x,d.y))
            p_match = None # The first one match, should be the closest one? No!
            d_match = None
            
            for p_idx in p_potential:
                if entr_buf.geometry[p_idx].contains(p):
                    p_match = p_idx # print 'p',p_idx
                    p_lines = set(entr_buf.entr2line[p_idx])
                    break
            pick_entr[p_match] = pick_entr.get(p_match, 0)+1
            
            for d_idx in d_potential:
                if entr_buf.geometry[d_idx].contains(d):
                    d_match = d_idx # print 'd',d_idx
                    d_lines = set(entr_buf.entr2line[d_idx])
                    break
            drop_entr[d_match] = drop_entr.get(d_match, 0)+1
            
            if ((p_match and d_match) and (p_match != d_match)):
                dirct_lines = tuple(p_lines.intersection(d_lines))
                if dirct_lines:
                    entr_lines[dirct_lines] = entr_lines.get(dirct_lines, 0)+1
                if p_match > d_match:
                    pair = (d_match, p_match)
                else:
                    pair = (p_match, d_match)
                entr_pair[pair] = entr_pair.get(pair, 0)+1
                
    return entr_lines.items()

In [70]:
def mapper(record):
    for key in record[0]:
        yield key, record[1]

rdd = sc.textFile('./df_shuffle.csv')
counts = rdd.mapPartitionsWithIndex(countLine).flatMap(mapper).reduceByKey(lambda x,y: x+y).collect()

In [71]:
counts


Out[71]:
[(u'A', 7),
 (u'Q', 5),
 (u'C', 11),
 (u'E', 7),
 (u'M', 2),
 ('1', 18),
 ('3', 6),
 ('5', 1),
 ('7', 2),
 (u'B', 1),
 (u'D', 1),
 (u'F', 4),
 (u'L', 3),
 (u'N', 5),
 ('2', 6),
 ('4', 1),
 ('6', 7),
 (u'R', 5)]