Create SparkContext, SparkSession instances


In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

Read tabular data


In [3]:
mtcars = spark.read.csv('../../data/mtcars.csv', header=True, inferSchema=True)

In [4]:
mtcars.show(3)


+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|   Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 3 rows

Rename individual column


In [5]:
mtcars = mtcars.withColumnRenamed('_c0', 'rown_ames')
mtcars.show(3)


+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    rown_ames| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|   Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 3 rows

Rename multple columns


In [6]:
new_col_names = [ 'x_' + x for x in mtcars.columns]
new_col_names


Out[6]:
['x_rown_ames',
 'x_mpg',
 'x_cyl',
 'x_disp',
 'x_hp',
 'x_drat',
 'x_wt',
 'x_qsec',
 'x_vs',
 'x_am',
 'x_gear',
 'x_carb']

In [7]:
mtcars = mtcars.rdd.toDF(new_col_names)
mtcars.show(3)


+-------------+-----+-----+------+----+------+-----+------+----+----+------+------+
|  x_rown_ames|x_mpg|x_cyl|x_disp|x_hp|x_drat| x_wt|x_qsec|x_vs|x_am|x_gear|x_carb|
+-------------+-----+-----+------+----+------+-----+------+----+----+------+------+
|    Mazda RX4| 21.0|    6| 160.0| 110|   3.9| 2.62| 16.46|   0|   1|     4|     4|
|Mazda RX4 Wag| 21.0|    6| 160.0| 110|   3.9|2.875| 17.02|   0|   1|     4|     4|
|   Datsun 710| 22.8|    4| 108.0|  93|  3.85| 2.32| 18.61|   1|   1|     4|     1|
+-------------+-----+-----+------+----+------+-----+------+----+----+------+------+
only showing top 3 rows

Read non-tabular data


In [8]:
twitter = sc.textFile('../../data/twitter.txt')
twitter.take(5)


Out[8]:
['Fresh install of XP on new computer. Sweet relief! fuck vista\t1018769417\t1.0',
 'Well. Now I know where to go when I want my knives. #ChiChevySXSW http://post.ly/RvDl\t10284216536\t1.0',
 '"Literally six weeks before I can take off ""SSC Chair"" off my email. Its like the torturous 4th mile before everything stops hurting."\t10298589026\t1.0',
 'Mitsubishi i MiEV - Wikipedia, the free encyclopedia - http://goo.gl/xipe Cutest car ever!\t109017669432377344\t1.0',
 "'Cheap Eats in SLP' - http://t.co/4w8gRp7\t109642968603963392\t1.0"]

Export data


In [9]:
from pyspark.sql import DataFrameWriter

Before we write the data into a file, we need to coalesce the data into one sinle partition. Otherwise, there will be multiple output files.


In [10]:
mtcars = mtcars.coalesce(numPartitions=1)

In [12]:
mtcars.write.csv('data/saved-mtcars', header=True)

In [13]:
twitter = twitter.coalesce(numPartitions=1)

In [14]:
twitter.saveAsTextFile('data/saved-twitter')

In [ ]: