In [1]:
# create entry points to spark
try:
sc.stop()
except:
pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)
In [2]:
mtcars = spark.read.csv('../../data/mtcars.csv', inferSchema=True, header=True)
# correct first column name
mtcars = mtcars.withColumnRenamed('_c0', 'model')
mtcars.show(5)
In [3]:
mtcars.rdd.zipWithIndex().take(5)
Out[3]:
Now we can apply the map function to modify the structure of each element. Assume x is an element from the above RDD object, x has two elements: x[0] and x[1]. x[0] is an Row object, and x[1] is the index, which is an integer. We want to merge these two values to create a list. And we also want the first element in the list is the index.
In [4]:
mtcars.rdd.zipWithIndex().map(lambda x: [x[1]] + list(x[0])).take(5)
Out[4]:
Let's add column names and save the result.
In [5]:
header = ['index'] + mtcars.columns
mtcars_df = mtcars.rdd.zipWithIndex().map(lambda x: [x[1]] + list(x[0])).toDF(header)
In [6]:
mtcars_df.show(5)
After we obtain the index column, we can apply the pyspark.sql.DataFrame.filter function to select rows of the DataFrame. The filter function takes a column of types.BooleanType as input.
In [7]:
mtcars_df.filter(mtcars_df.index.isin([1,2,4,6,9])).show()
In [8]:
mtcars_df.filter(mtcars_df.index.between(5, 10)).show()
In [9]:
mtcars_df.filter(mtcars_df.index < 9).show()
In [10]:
mtcars_df.filter(mtcars_df.index >= 14).show()
Example 1: select rows when cyl = 4
In [11]:
mtcars_df.filter(mtcars_df.cyl == 4).show()
Example 2: select rows when vs = 1 and am = 1
When the filtering is based on multiple conditions (e.g., vs = 1 and am = 1), we use the conditions to build a new boolean type column. And we filter the DataFrame by the new column.
In [12]:
from pyspark.sql import functions as F
Warning: when passing multiple conditions to the **`when()`** function, each condition has to be within a pair of parentheses
In [13]:
filtering_column = F.when((mtcars_df.vs == 1) & (mtcars_df.am == 1), 1).name('filter_col')
filtering_column
Out[13]:
Now we need to add the new column to the original DataFrame. This can be done by applying the select()
function to select all original columns as well as the new filtering columns.
In [14]:
all_original_columns = [eval('mtcars_df.' + c) for c in mtcars_df.columns]
all_original_columns
Out[14]:
In [15]:
all_columns = all_original_columns + [filtering_column]
all_columns
Out[15]:
In [16]:
new_mtcars_df = mtcars_df.select(all_columns)
new_mtcars_df.show()
Now we can filter the DataFrame by the requested conditions. After we filter the DataFrame, we can drop the filtering column.
In [17]:
new_mtcars_df.filter(new_mtcars_df.filter_col == 1).drop('filter_col').show()