In [1]:
from pyspark.sql import Row

In [4]:
points = sc.textFile("/opt/GISData/Geolife_Trajectories_1.3/beijing.csv")\
.map(lambda line: line.split(","))\
.map(lambda p: Row(x=float(p[0]), y=float(p[1])))

In [5]:
points.take(5)


Out[5]:
[Row(x=1.044024, y=103.5263),
 Row(x=1.044028, y=103.524919),
 Row(x=1.04413, y=103.523515),
 Row(x=1.044208, y=103.780915),
 Row(x=1.044237, y=103.522101)]

In [6]:
points = points.toDF()
points.show()


+--------+----------+
|       x|         y|
+--------+----------+
|1.044024|  103.5263|
|1.044028|103.524919|
| 1.04413|103.523515|
|1.044208|103.780915|
|1.044237|103.522101|
|1.044306|103.527684|
|1.044355|103.520695|
| 1.04449|103.519302|
|1.044508|103.529049|
|1.044685|103.530384|
|1.044709|103.517923|
| 1.04478|103.779929|
|1.044874|103.531761|
|1.044995|103.516561|
|1.045075|103.533127|
|1.045309|103.515218|
|1.045314|103.534466|
|1.045346|103.778954|
|1.045535|103.535827|
|1.045645|103.513892|
+--------+----------+
only showing top 20 rows


In [10]:
points.registerTempTable("points")
sql = "SELECT * FROM points WHERE POINT(x, y) IN CIRCLERANGE(POINT(39.9042, 116.4074), 0.005)"
buffer = sqlContext.sql(sql)
buffer.count()


Out[10]:
15177

In [15]:
buffer.toPandas().to_csv('sample.csv')

In [ ]: