In [64]:
from pyspark.sql.types import *
from pyspark.sql import Row

def toTag(n):
    if n < 1000:
        return n
    elif n < 1000000:
        return "{0}K".format(int(round(n/1000.0,0)))
    elif n < 1000000000:
        return "{0}M".format(int(round(n/1000000.0,0)))
    else:
        return "{0}B".format(int(round(n/1000000000.0,0)))

df = sqlContext.read.format('com.databricks.spark.csv')\
    .options(header='true')\
    .load('file:///opt/GISData/TaxiPorto/train.csv')

In [65]:
points = df.select('POLYLINE').flatMap(lambda row: row[0][2:-2].split("],[")).distinct()
points.cache()


Out[65]:
PythonRDD[209] at RDD at PythonRDD.scala:43

In [66]:
points.count()
# 83415287
# 17722273 without duplicates...


Out[66]:
17722273

In [85]:
schema = StructType([StructField('coord', StringType(), True)])
x = 16002000

In [86]:
percentage = x / 17722273.0
sample = points.sample(False, percentage, 42)
n = sample.count()
print(n)
sample.map(lambda x: Row(x)).toDF(schema).write\
    .format('com.databricks.spark.csv')\
    .option('quote', None)\
    .save('output')


16002249

In [87]:
command = "cat output/part-00* > L{0}.csv".format(toTag(n))
os.system(command)
os.system("rm -fR output/")


Out[87]:
0

In [ ]: