In [1]:
# create entry points to spark
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

Example dataset


In [2]:
mtcars = spark.read.csv('../../data/mtcars.csv', inferSchema=True, header=True)
# correct first column name
mtcars = mtcars.withColumnRenamed('_c0', 'model')
mtcars.show(5)


+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows

Select Rows by index

First, we need to add index to each rows. The zipWithIndex function zips the RDD elements with their corresponding index and returns the result as a new element.


In [3]:
mtcars.rdd.zipWithIndex().take(5)


Out[3]:
[(Row(model='Mazda RX4', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.62, qsec=16.46, vs=0, am=1, gear=4, carb=4),
  0),
 (Row(model='Mazda RX4 Wag', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.875, qsec=17.02, vs=0, am=1, gear=4, carb=4),
  1),
 (Row(model='Datsun 710', mpg=22.8, cyl=4, disp=108.0, hp=93, drat=3.85, wt=2.32, qsec=18.61, vs=1, am=1, gear=4, carb=1),
  2),
 (Row(model='Hornet 4 Drive', mpg=21.4, cyl=6, disp=258.0, hp=110, drat=3.08, wt=3.215, qsec=19.44, vs=1, am=0, gear=3, carb=1),
  3),
 (Row(model='Hornet Sportabout', mpg=18.7, cyl=8, disp=360.0, hp=175, drat=3.15, wt=3.44, qsec=17.02, vs=0, am=0, gear=3, carb=2),
  4)]

Now we can apply the map function to modify the structure of each element. Assume x is an element from the above RDD object, x has two elements: x[0] and x[1]. x[0] is an Row object, and x[1] is the index, which is an integer. We want to merge these two values to create a list. And we also want the first element in the list is the index.


In [4]:
mtcars.rdd.zipWithIndex().map(lambda x: [x[1]] + list(x[0])).take(5)


Out[4]:
[[0, 'Mazda RX4', 21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4],
 [1, 'Mazda RX4 Wag', 21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4],
 [2, 'Datsun 710', 22.8, 4, 108.0, 93, 3.85, 2.32, 18.61, 1, 1, 4, 1],
 [3, 'Hornet 4 Drive', 21.4, 6, 258.0, 110, 3.08, 3.215, 19.44, 1, 0, 3, 1],
 [4, 'Hornet Sportabout', 18.7, 8, 360.0, 175, 3.15, 3.44, 17.02, 0, 0, 3, 2]]

Let's add column names and save the result.


In [5]:
header = ['index'] + mtcars.columns
mtcars_df = mtcars.rdd.zipWithIndex().map(lambda x: [x[1]] + list(x[0])).toDF(header)

In [6]:
mtcars_df.show(5)


+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    0|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    1|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    2|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|    3|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|    4|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows

After we obtain the index column, we can apply the pyspark.sql.DataFrame.filter function to select rows of the DataFrame. The filter function takes a column of types.BooleanType as input.

Select specific rows


In [7]:
mtcars_df.filter(mtcars_df.index.isin([1,2,4,6,9])).show()


+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    1|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    2|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|    4|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|    6|       Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|    9|         Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+

Select rows between a range


In [8]:
mtcars_df.filter(mtcars_df.index.between(5, 10)).show()


+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|index|     model| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|    5|   Valiant|18.1|  6|225.0|105|2.76|3.46|20.22|  1|  0|   3|   1|
|    6|Duster 360|14.3|  8|360.0|245|3.21|3.57|15.84|  0|  0|   3|   4|
|    7| Merc 240D|24.4|  4|146.7| 62|3.69|3.19| 20.0|  1|  0|   4|   2|
|    8|  Merc 230|22.8|  4|140.8| 95|3.92|3.15| 22.9|  1|  0|   4|   2|
|    9|  Merc 280|19.2|  6|167.6|123|3.92|3.44| 18.3|  1|  0|   4|   4|
|   10| Merc 280C|17.8|  6|167.6|123|3.92|3.44| 18.9|  1|  0|   4|   4|
+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+

Select rows by a cutoff index


In [9]:
mtcars_df.filter(mtcars_df.index < 9).show()


+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    0|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    1|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    2|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|    3|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|    4|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|    5|          Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|    6|       Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|    7|        Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|    8|         Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+


In [10]:
mtcars_df.filter(mtcars_df.index >= 14).show()


+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|              model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|   14| Cadillac Fleetwood|10.4|  8|472.0|205|2.93| 5.25|17.98|  0|  0|   3|   4|
|   15|Lincoln Continental|10.4|  8|460.0|215| 3.0|5.424|17.82|  0|  0|   3|   4|
|   16|  Chrysler Imperial|14.7|  8|440.0|230|3.23|5.345|17.42|  0|  0|   3|   4|
|   17|           Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|   18|        Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
|   19|     Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|
|   20|      Toyota Corona|21.5|  4|120.1| 97| 3.7|2.465|20.01|  1|  0|   3|   1|
|   21|   Dodge Challenger|15.5|  8|318.0|150|2.76| 3.52|16.87|  0|  0|   3|   2|
|   22|        AMC Javelin|15.2|  8|304.0|150|3.15|3.435| 17.3|  0|  0|   3|   2|
|   23|         Camaro Z28|13.3|  8|350.0|245|3.73| 3.84|15.41|  0|  0|   3|   4|
|   24|   Pontiac Firebird|19.2|  8|400.0|175|3.08|3.845|17.05|  0|  0|   3|   2|
|   25|          Fiat X1-9|27.3|  4| 79.0| 66|4.08|1.935| 18.9|  1|  1|   4|   1|
|   26|      Porsche 914-2|26.0|  4|120.3| 91|4.43| 2.14| 16.7|  0|  1|   5|   2|
|   27|       Lotus Europa|30.4|  4| 95.1|113|3.77|1.513| 16.9|  1|  1|   5|   2|
|   28|     Ford Pantera L|15.8|  8|351.0|264|4.22| 3.17| 14.5|  0|  1|   5|   4|
|   29|       Ferrari Dino|19.7|  6|145.0|175|3.62| 2.77| 15.5|  0|  1|   5|   6|
|   30|      Maserati Bora|15.0|  8|301.0|335|3.54| 3.57| 14.6|  0|  1|   5|   8|
|   31|         Volvo 142E|21.4|  4|121.0|109|4.11| 2.78| 18.6|  1|  1|   4|   2|
+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+

Select rows by logical criteria

Example 1: select rows when cyl = 4


In [11]:
mtcars_df.filter(mtcars_df.cyl == 4).show()


+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|         model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    2|    Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|    7|     Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|    8|      Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|   17|      Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|   18|   Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
|   19|Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|
|   20| Toyota Corona|21.5|  4|120.1| 97| 3.7|2.465|20.01|  1|  0|   3|   1|
|   25|     Fiat X1-9|27.3|  4| 79.0| 66|4.08|1.935| 18.9|  1|  1|   4|   1|
|   26| Porsche 914-2|26.0|  4|120.3| 91|4.43| 2.14| 16.7|  0|  1|   5|   2|
|   27|  Lotus Europa|30.4|  4| 95.1|113|3.77|1.513| 16.9|  1|  1|   5|   2|
|   31|    Volvo 142E|21.4|  4|121.0|109|4.11| 2.78| 18.6|  1|  1|   4|   2|
+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+

Example 2: select rows when vs = 1 and am = 1

When the filtering is based on multiple conditions (e.g., vs = 1 and am = 1), we use the conditions to build a new boolean type column. And we filter the DataFrame by the new column.


In [12]:
from pyspark.sql import functions as F

Warning: when passing multiple conditions to the **`when()`** function, each condition has to be within a pair of parentheses


In [13]:
filtering_column = F.when((mtcars_df.vs == 1) & (mtcars_df.am == 1), 1).name('filter_col')
filtering_column


Out[13]:
Column<b'CASE WHEN ((vs = 1) AND (am = 1)) THEN 1 END AS `filter_col`'>

Now we need to add the new column to the original DataFrame. This can be done by applying the select() function to select all original columns as well as the new filtering columns.


In [14]:
all_original_columns = [eval('mtcars_df.' + c) for c in mtcars_df.columns]
all_original_columns


Out[14]:
[Column<b'index'>,
 Column<b'model'>,
 Column<b'mpg'>,
 Column<b'cyl'>,
 Column<b'disp'>,
 Column<b'hp'>,
 Column<b'drat'>,
 Column<b'wt'>,
 Column<b'qsec'>,
 Column<b'vs'>,
 Column<b'am'>,
 Column<b'gear'>,
 Column<b'carb'>]

In [15]:
all_columns = all_original_columns + [filtering_column]
all_columns


Out[15]:
[Column<b'index'>,
 Column<b'model'>,
 Column<b'mpg'>,
 Column<b'cyl'>,
 Column<b'disp'>,
 Column<b'hp'>,
 Column<b'drat'>,
 Column<b'wt'>,
 Column<b'qsec'>,
 Column<b'vs'>,
 Column<b'am'>,
 Column<b'gear'>,
 Column<b'carb'>,
 Column<b'CASE WHEN ((vs = 1) AND (am = 1)) THEN 1 END AS `filter_col`'>]

In [16]:
new_mtcars_df = mtcars_df.select(all_columns)
new_mtcars_df.show()


+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----------+
|index|              model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|filter_col|
+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----------+
|    0|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|      null|
|    1|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|      null|
|    2|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|         1|
|    3|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|      null|
|    4|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|      null|
|    5|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|      null|
|    6|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|      null|
|    7|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|      null|
|    8|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|      null|
|    9|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|      null|
|   10|          Merc 280C|17.8|  6|167.6|123|3.92| 3.44| 18.9|  1|  0|   4|   4|      null|
|   11|         Merc 450SE|16.4|  8|275.8|180|3.07| 4.07| 17.4|  0|  0|   3|   3|      null|
|   12|         Merc 450SL|17.3|  8|275.8|180|3.07| 3.73| 17.6|  0|  0|   3|   3|      null|
|   13|        Merc 450SLC|15.2|  8|275.8|180|3.07| 3.78| 18.0|  0|  0|   3|   3|      null|
|   14| Cadillac Fleetwood|10.4|  8|472.0|205|2.93| 5.25|17.98|  0|  0|   3|   4|      null|
|   15|Lincoln Continental|10.4|  8|460.0|215| 3.0|5.424|17.82|  0|  0|   3|   4|      null|
|   16|  Chrysler Imperial|14.7|  8|440.0|230|3.23|5.345|17.42|  0|  0|   3|   4|      null|
|   17|           Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|         1|
|   18|        Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|         1|
|   19|     Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|         1|
+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----------+
only showing top 20 rows

Now we can filter the DataFrame by the requested conditions. After we filter the DataFrame, we can drop the filtering column.


In [17]:
new_mtcars_df.filter(new_mtcars_df.filter_col == 1).drop('filter_col').show()


+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|         model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    2|    Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   17|      Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|   18|   Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
|   19|Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|
|   25|     Fiat X1-9|27.3|  4| 79.0| 66|4.08|1.935| 18.9|  1|  1|   4|   1|
|   27|  Lotus Europa|30.4|  4| 95.1|113|3.77|1.513| 16.9|  1|  1|   5|   2|
|   31|    Volvo 142E|21.4|  4|121.0|109|4.11| 2.78| 18.6|  1|  1|   4|   2|
+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+