In [1]:
from pyspark.sql import Row
In [2]:
points = sc.textFile("/opt/GISData/Geolife_Trajectories_1.3/beijing.csv")\
.map(lambda line: line.split(","))\
.map(lambda p: Row(x=float(p[0]), y=float(p[1])))
In [3]:
points.take(5)
Out[3]:
In [4]:
points = points.toDF()
points.show()
In [5]:
points.registerTempTable("points")
#points.cache()
In [6]:
sql = "SELECT * FROM points WHERE POINT(x, y) IN CIRCLERANGE(POINT(39.9042, 116.4074), 1)"
buffer = sqlContext.sql(sql).cache()
print(buffer.count())
# 18015599
In [7]:
#buffer_sample = buffer.sample(False, 0.06, 42)
#buffer_sample.toPandas().to_csv('P1M.csv')
#print(buffer_sample.count())
# 1083011
In [8]:
#buffer_sample = buffer.sample(False, 0.112, 42)
#buffer_sample.toPandas().to_csv('P2M.csv')
#print(buffer_sample.count())
# 2018904
In [9]:
#buffer_sample = buffer.sample(False, 0.225, 42)
#buffer_sample.toPandas().to_csv('P4M.csv')
#print(buffer_sample.count())
# 4052491
In [10]:
#buffer_sample = buffer.sample(False, 0.445, 42)
#buffer_sample.toPandas().to_csv('P8M.csv')
#print(buffer_sample.count())
# 8015746
In [16]:
#buffer_sample = buffer.sample(False, 0.889, 42)
#buffer_sample.write.format('com.databricks.spark.csv').save('P16M.csv')
#print(buffer_sample.count())
# 16015244
In [34]:
import numpy as np
for i in np.arange(0.0005,0.00555,0.00005):
buffer_sample = buffer.sample(False, i, 42)
c = buffer_sample.count()
if (c / 1000) % 10 == 0:
print("{0}: {1}".format(i,c))
In [25]:
from pyproj import Proj, transform
def tranformCoords(row):
inProj = Proj(init='epsg:4326')
outProj = Proj(init='epsg:4799')
x2,y2 = transform(inProj,outProj, row.y, row.x)
return Row(x=float(x2), y=float(y2))
In [27]:
for i in [0.0006,0.0011,0.0017,0.00225,0.0028,0.00335,0.0039,0.00445,0.005,0.00555]:
buffer_sample = buffer.sample(False, i, 42)
c = buffer_sample.count()
name = "P{1}K.csv".format(i,c/1000)
# buffer_sample.write.format('com.databricks.spark.csv').save(name)
buffer_sample.map(tranformCoords).toDF().toPandas().to_csv(name, header=None)
print(name)
In [ ]: