In [1]:
sc


Out[1]:
<pyspark.context.SparkContext at 0x105267b90>

In [2]:
import pyproj
import csv
import shapely.geometry as geom
import fiona
import fiona.crs
import shapely
import rtree
import geopandas as gpd
import numpy as np
import operator
import pandas as pd
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [5]:
# rdd = sc.textFile('./entr_buffer.csv')
# df = pyspark.sql.DataFrame?
# df = spark.read.csv('./entr_buffer.csv', header=True)
# # df.take(3)

In [10]:
def countLine(partID, records):
    import pyproj
    import csv
    import shapely.geometry as geom
    import fiona
    import fiona.crs
    import shapely
    import rtree
    import geopandas as gpd
    import numpy as np
    import operator
    import pandas as pd
    from pyspark import SparkContext
    from pyspark.sql import SQLContext

    index = rtree.Rtree()
    for idx, geometry in enumerate(entr_buf.geometry):
        index.insert(idx, geometry.bounds)

    entr_lines = {}
    proj = pyproj.Proj(init='epsg:2263', preserve_units=True)

    if partID==0:
        records.next()
    reader = csv.reader(records)

    for row in reader:
        if ((float(row[5])!=0) and float(row[9]!=0)):
            p = geom.Point(proj(float(row[5]), float(row[6])))
            d = geom.Point(proj(float(row[9]), float(row[10])))
            p_potential = index.intersection((p.x,p.y,p.x,p.y))
            d_potential = index.intersection((d.x,d.y,d.x,d.y))
            p_match = None # The first one match, should be the closest one? No!
            d_match = None

            for p_idx in p_potential:
                if entr_buf.geometry[p_idx].contains(p):
                    p_match = p_idx # print 'p',p_idx
                    p_lines = set(entr_buf.lines[p_idx])
                    break

            for d_idx in d_potential:
                if entr_buf.geometry[d_idx].contains(d):
                    d_match = d_idx # print 'd',d_idx
                    d_lines = set(entr_buf.lines[d_idx])
                    break

            if ((p_match and d_match) and (p_match != d_match)):
                dirct_lines = tuple(p_lines.intersection(d_lines))
                if dirct_lines:
                    entr_lines[dirct_lines] = entr_lines.get(dirct_lines, 0)+1
    return entr_lines.items()




In [2]:
from shapely.geometry import Point
import pyproj
import geopandas as gpd
proj = pyproj.Proj(init='epsg:2263', preserve_units=True)

entr_points = sqlContext.read.load('../why_yellow_taxi/Data/2016_(May)_New_York_City_Subway_Station_Entrances.json', \
                                format='json', header=True, inferSchema=True).collect()[0].asDict()['features']
routes = ['route_'+str(i) for i in range(1,12)]
entr_geo = gpd.GeoDataFrame(columns=['geometry', 'lines'])


for i in range(len(entr_points)):
    entr_coor = entr_points[i].asDict()['geometry'].asDict()['coordinates']
    entr_buffer = Point(proj(float(entr_coor[0]), float(entr_coor[1]))).buffer(100)
    entr_prop = entr_points[i].asDict()['properties'].asDict()
    entr_lines = [entr_prop[r] for r in routes if entr_prop[r]]
    entr_geo = entr_geo.append({'geometry':entr_buffer, 'lines':entr_lines}, ignore_index=True)

In [3]:
global entr_buf
entr_buf = entr_geo





In [11]:
def mapper(record):
    for key in record[0]:
        yield key, record[1]

if __name__ == '__main__':
    #sc = SparkContext(appName="bigdata_project")
    #taxi_csv = 'hdfs:///user/st1671/data/yellow_tripdata_2016-01.csv'
    rdd = sc.textFile('./df_shuffle.csv')
    counts = rdd.mapPartitionsWithIndex(countLine).flatMap(mapper).reduceByKey(lambda x,y: x+y).collect()
    #counts.saveAsTextFile('/user/st1671/bigdata_proj/output')

In [7]:
counts


Out[7]:
[(u'A', 7),
 (u'Q', 5),
 (u'C', 11),
 (u'E', 7),
 (u'M', 2),
 (u'1', 12),
 (u'3', 2),
 (u'5', 1),
 (u'7', 1),
 (u'B', 1),
 (u'D', 1),
 (u'F', 4),
 (u'L', 3),
 (u'N', 5),
 (u'R', 5),
 (u'4', 1),
 (u'6', 7),
 (u'2', 2)]