In [23]:
from pyspark.sql import Row
dataset = "/opt/Datasets/Porto/L16M.csv"
n = 16000000

In [24]:
from pyproj import Proj, transform

def transformCoords(row):
    inProj = Proj(init='epsg:4326')
    outProj = Proj(init='epsg:27493')
    x2,y2 = transform(inProj,outProj, row.y, row.x)
    return Row(x=float(x2), y=float(y2))

def toTag(n):
    if n < 1000:
        return n
    elif n < 1000000:
        return "{0}K".format(int(round(n/1000.0,0)))
    elif n < 1000000000:
        return "{0}M".format(int(round(n/1000000.0,0)))
    else:
        return "{0}B".format(int(round(n/1000000000.0,0)))

In [25]:
points = sc.textFile(dataset)\
.map(lambda line: line.split(","))\
.map(lambda p: Row(x=float(p[0]), y=float(p[1])))

In [26]:
points.take(5)


Out[26]:
[Row(x=-8.605494, y=41.14494),
 Row(x=-8.133354, y=40.69557),
 Row(x=-8.614071, y=41.192802),
 Row(x=-8.632008, y=41.127642),
 Row(x=-8.550198, y=41.147649)]

In [27]:
points = points.map(transformCoords).toDF()
points.show()


+------------------+-------------------+
|                 x|                  y|
+------------------+-------------------+
| 6198085.758550276| -5839016.033341593|
| 6137611.601999928| -5749242.519873822|
| 6205664.395728792| -5841771.187125915|
| 6194515.689023598| -5842903.860308644|
| 6200077.172430606| -5829969.901172479|
| 6199290.635809554| -5837395.002401751|
|  6192050.28750824| -5831530.214108369|
| 6200528.378192357| -5829395.810651346|
| 6199574.506995042| -5842395.485457325|
| 6198669.900246182| -5839432.246923726|
| 6203016.406688144|-5838976.7848646315|
| 6197230.330007319| -5840138.977202125|
| 6221118.796135499| -5730317.541721915|
| 6208121.500695292| -5803981.882391075|
|6201878.9455778925| -5854353.307730269|
| 6201770.846683889| -5848051.774533869|
| 6194607.200758824| -5843368.531342733|
| 6211806.201324201| -5845041.417050544|
| 6199328.490411013| -5847848.070830555|
|6199426.4456327725|  -5841853.63513153|
+------------------+-------------------+
only showing top 20 rows


In [28]:
points.write.format('com.databricks.spark.csv').save('/opt/Datasets/Porto/output')
command = "cat /opt/Datasets/Porto/output/part-00* > /opt/Datasets/Porto/P{0}.csv".format(toTag(n))
os.system(command)
os.system("rm -fR /opt/Datasets/Porto/output/")


Out[28]:
0