In [1]:
sc
Out[1]:
In [2]:
import pyproj
import csv
import shapely.geometry as geom
import fiona
import fiona.crs
import shapely
import rtree
import geopandas as gpd
import numpy as np
import operator
import pandas as pd
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
In [5]:
# rdd = sc.textFile('./entr_buffer.csv')
# df = pyspark.sql.DataFrame?
# df = spark.read.csv('./entr_buffer.csv', header=True)
# # df.take(3)
In [10]:
def countLine(partID, records):
import pyproj
import csv
import shapely.geometry as geom
import fiona
import fiona.crs
import shapely
import rtree
import geopandas as gpd
import numpy as np
import operator
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
index = rtree.Rtree()
for idx, geometry in enumerate(entr_buf.geometry):
index.insert(idx, geometry.bounds)
entr_lines = {}
proj = pyproj.Proj(init='epsg:2263', preserve_units=True)
if partID==0:
records.next()
reader = csv.reader(records)
for row in reader:
if ((float(row[5])!=0) and float(row[9]!=0)):
p = geom.Point(proj(float(row[5]), float(row[6])))
d = geom.Point(proj(float(row[9]), float(row[10])))
p_potential = index.intersection((p.x,p.y,p.x,p.y))
d_potential = index.intersection((d.x,d.y,d.x,d.y))
p_match = None # The first one match, should be the closest one? No!
d_match = None
for p_idx in p_potential:
if entr_buf.geometry[p_idx].contains(p):
p_match = p_idx # print 'p',p_idx
p_lines = set(entr_buf.lines[p_idx])
break
for d_idx in d_potential:
if entr_buf.geometry[d_idx].contains(d):
d_match = d_idx # print 'd',d_idx
d_lines = set(entr_buf.lines[d_idx])
break
if ((p_match and d_match) and (p_match != d_match)):
dirct_lines = tuple(p_lines.intersection(d_lines))
if dirct_lines:
entr_lines[dirct_lines] = entr_lines.get(dirct_lines, 0)+1
return entr_lines.items()
In [2]:
from shapely.geometry import Point
import pyproj
import geopandas as gpd
proj = pyproj.Proj(init='epsg:2263', preserve_units=True)
entr_points = sqlContext.read.load('../why_yellow_taxi/Data/2016_(May)_New_York_City_Subway_Station_Entrances.json', \
format='json', header=True, inferSchema=True).collect()[0].asDict()['features']
routes = ['route_'+str(i) for i in range(1,12)]
entr_geo = gpd.GeoDataFrame(columns=['geometry', 'lines'])
for i in range(len(entr_points)):
entr_coor = entr_points[i].asDict()['geometry'].asDict()['coordinates']
entr_buffer = Point(proj(float(entr_coor[0]), float(entr_coor[1]))).buffer(100)
entr_prop = entr_points[i].asDict()['properties'].asDict()
entr_lines = [entr_prop[r] for r in routes if entr_prop[r]]
entr_geo = entr_geo.append({'geometry':entr_buffer, 'lines':entr_lines}, ignore_index=True)
In [3]:
global entr_buf
entr_buf = entr_geo
In [11]:
def mapper(record):
for key in record[0]:
yield key, record[1]
if __name__ == '__main__':
#sc = SparkContext(appName="bigdata_project")
#taxi_csv = 'hdfs:///user/st1671/data/yellow_tripdata_2016-01.csv'
rdd = sc.textFile('./df_shuffle.csv')
counts = rdd.mapPartitionsWithIndex(countLine).flatMap(mapper).reduceByKey(lambda x,y: x+y).collect()
#counts.saveAsTextFile('/user/st1671/bigdata_proj/output')
In [7]:
counts
Out[7]: