In [1]:
from pyspark.sql import Row

In [2]:
points = sc.textFile("/opt/GISData/Geolife_Trajectories_1.3/beijing.csv")\
.map(lambda line: line.split(","))\
.map(lambda p: Row(x=float(p[0]), y=float(p[1])))

In [3]:
points.take(5)


Out[3]:
[Row(x=1.044024, y=103.5263),
 Row(x=1.044028, y=103.524919),
 Row(x=1.04413, y=103.523515),
 Row(x=1.044208, y=103.780915),
 Row(x=1.044237, y=103.522101)]

In [4]:
points = points.toDF()
points.show()


+--------+----------+
|       x|         y|
+--------+----------+
|1.044024|  103.5263|
|1.044028|103.524919|
| 1.04413|103.523515|
|1.044208|103.780915|
|1.044237|103.522101|
|1.044306|103.527684|
|1.044355|103.520695|
| 1.04449|103.519302|
|1.044508|103.529049|
|1.044685|103.530384|
|1.044709|103.517923|
| 1.04478|103.779929|
|1.044874|103.531761|
|1.044995|103.516561|
|1.045075|103.533127|
|1.045309|103.515218|
|1.045314|103.534466|
|1.045346|103.778954|
|1.045535|103.535827|
|1.045645|103.513892|
+--------+----------+
only showing top 20 rows


In [5]:
points.registerTempTable("points")
#points.cache()

In [6]:
sql = "SELECT * FROM points WHERE POINT(x, y) IN CIRCLERANGE(POINT(39.9042, 116.4074), 1)"
buffer = sqlContext.sql(sql).cache()
print(buffer.count())

# 18015599


18015599

In [7]:
#buffer_sample = buffer.sample(False, 0.06, 42)
#buffer_sample.toPandas().to_csv('P1M.csv')
#print(buffer_sample.count())

# 1083011

In [8]:
#buffer_sample = buffer.sample(False, 0.112, 42)
#buffer_sample.toPandas().to_csv('P2M.csv')
#print(buffer_sample.count())

# 2018904

In [9]:
#buffer_sample = buffer.sample(False, 0.225, 42)
#buffer_sample.toPandas().to_csv('P4M.csv')
#print(buffer_sample.count())

# 4052491

In [10]:
#buffer_sample = buffer.sample(False, 0.445, 42)
#buffer_sample.toPandas().to_csv('P8M.csv')
#print(buffer_sample.count())

# 8015746

In [16]:
#buffer_sample = buffer.sample(False, 0.889, 42)
#buffer_sample.write.format('com.databricks.spark.csv').save('P16M.csv')
#print(buffer_sample.count())

# 16015244

In [34]:
import numpy as np

for i in np.arange(0.0005,0.00555,0.00005):
    buffer_sample = buffer.sample(False, i, 42)
    c = buffer_sample.count()
    if (c / 1000) % 10 == 0:
        print("{0}: {1}".format(i,c))


0.0006: 10830
0.0011: 20065
0.00115: 20967
0.0017: 30882
0.00225: 40865
0.0028: 50838
0.00335: 60670
0.0039: 70473
0.00445: 80379
0.005: 90294
0.00555: 100149

In [25]:
from pyproj import Proj, transform

def tranformCoords(row):
    inProj = Proj(init='epsg:4326')
    outProj = Proj(init='epsg:4799')
    x2,y2 = transform(inProj,outProj, row.y, row.x)
    return Row(x=float(x2), y=float(y2))

In [27]:
for i in [0.0006,0.0011,0.0017,0.00225,0.0028,0.00335,0.0039,0.00445,0.005,0.00555]:
    buffer_sample = buffer.sample(False, i, 42)
    c = buffer_sample.count()
    name = "P{1}K.csv".format(i,c/1000)
    
    # buffer_sample.write.format('com.databricks.spark.csv').save(name)
    buffer_sample.map(tranformCoords).toDF().toPandas().to_csv(name, header=None)
    print(name)


P10K.csv
P20K.csv
P30K.csv
P40K.csv
P50K.csv
P60K.csv
P70K.csv
P80K.csv
P90K.csv
P100K.csv

In [ ]: