In [1]:
# create entry points to spark
try:
sc.stop()
except:
pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)
In [2]:
mtcars = spark.read.csv(path='../../data/mtcars.csv',
sep=',',
encoding='UTF-8',
comment=None,
header=True,
inferSchema=True)
In [3]:
mtcars.rdd.take(2)
Out[3]:
With an RDD object, we can apply a set of mapping functions, such as map, mapValues, flatMap, flatMapValues and a lot of other methods that come from RDD.
In [4]:
mtcars_map = mtcars.rdd.map(lambda x: (x['_c0'], x['mpg']))
mtcars_map.take(5)
Out[4]:
In [5]:
mtcars_mapvalues = mtcars_map.mapValues(lambda x: [x, x * 10])
mtcars_mapvalues.take(5)
Out[5]:
In [6]:
rdd_raw = sc.textFile('../../data/mtcars.csv')
rdd_raw.take(5)
Out[6]:
In [7]:
header = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] == 'mpg').collect()[0]
header[0] = 'model'
header
Out[7]:
In [8]:
rdd = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] != 'mpg')
rdd.take(2)
Out[8]:
First we define a function which takes a list of column names and a list of values and create a Row of key-value pairs. Since keys in an Row object are variable names, we can’t simply pass a dictionary to the Row() function. We can think of a dictionary as an argument list and use the ** to unpack the argument list.
See an example.
In [9]:
from pyspark.sql import Row
my_dict = dict(zip(['a', 'b', 'c'], range(1, 4)))
Row(**my_dict)
Out[9]:
In [10]:
def list_to_row(keys, values):
row_dict = dict(zip(keys, values))
return Row(**row_dict)
In [11]:
rdd_rows = rdd.map(lambda x: list_to_row(header, x))
rdd_rows.take(3)
Out[11]:
Now we can convert the RDD to a DataFrame.
In [12]:
df = spark.createDataFrame(rdd_rows)
df.show(5)
In [ ]: