In [1]:
# create entry points to spark
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

Merge and split columns

Sometimes we need to merge multiple columns in a Dataframe into one column, or split a column into multiple columns. We can easily achieve this by converting a DataFrame to RDD, applying map functions to manipulate elements, and then converting the RDD back to a DataFrame.

Example data frame


In [2]:
mtcars = spark.read.csv(path='../../data/mtcars.csv',
                        sep=',',
                        encoding='UTF-8',
                        comment=None,
                        header=True, 
                        inferSchema=True)

In [3]:
mtcars.show(n=5)


+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|              _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows


In [4]:
# adjust first column name
colnames = mtcars.columns
colnames[0] = 'model'
mtcars = mtcars.rdd.toDF(colnames)
mtcars.show(5)


+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows

Merge multiple columns

We convert DataFrame to RDD and then apply the map function to merge values and convert elements to Row objects.


In [5]:
from pyspark.sql import Row
mtcars_rdd = mtcars.rdd.map(lambda x: Row(model=x[0], values=x[1:]))
mtcars_rdd.take(5)


Out[5]:
[Row(model='Mazda RX4', values=(21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4)),
 Row(model='Mazda RX4 Wag', values=(21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4)),
 Row(model='Datsun 710', values=(22.8, 4, 108.0, 93, 3.85, 2.32, 18.61, 1, 1, 4, 1)),
 Row(model='Hornet 4 Drive', values=(21.4, 6, 258.0, 110, 3.08, 3.215, 19.44, 1, 0, 3, 1)),
 Row(model='Hornet Sportabout', values=(18.7, 8, 360.0, 175, 3.15, 3.44, 17.02, 0, 0, 3, 2))]

Then we create a new DataFrame from the obtained RDD.


In [6]:
mtcars_df = spark.createDataFrame(mtcars_rdd)
mtcars_df.show(5, truncate=False)


+-----------------+-----------------------------------------------------+
|model            |values                                               |
+-----------------+-----------------------------------------------------+
|Mazda RX4        |[21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4]  |
|Mazda RX4 Wag    |[21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4] |
|Datsun 710       |[22.8, 4, 108.0, 93, 3.85, 2.32, 18.61, 1, 1, 4, 1]  |
|Hornet 4 Drive   |[21.4, 6, 258.0, 110, 3.08, 3.215, 19.44, 1, 0, 3, 1]|
|Hornet Sportabout|[18.7, 8, 360.0, 175, 3.15, 3.44, 17.02, 0, 0, 3, 2] |
+-----------------+-----------------------------------------------------+
only showing top 5 rows

Split one column

We use the above DataFrame as our example data. Again, we need to convert the DataFrame to an RDD to achieve our goal.

Let's split the values column into two columns: x1 and x2. The first 4 values will be in column x1 and the remaining values will be in column x2.


In [7]:
mtcars_rdd_2 = mtcars_df.rdd.map(lambda x: Row(model=x[0], x1=x[1][:5], x2=x[1][5:]))
# convert RDD back to DataFrame
mtcars_df_2 = spark.createDataFrame(mtcars_rdd_2)
mtcars_df_2.show(5, truncate=False)


+-----------------+---------------------------+--------------------------+
|model            |x1                         |x2                        |
+-----------------+---------------------------+--------------------------+
|Mazda RX4        |[21.0, 6, 160.0, 110, 3.9] |[2.62, 16.46, 0, 1, 4, 4] |
|Mazda RX4 Wag    |[21.0, 6, 160.0, 110, 3.9] |[2.875, 17.02, 0, 1, 4, 4]|
|Datsun 710       |[22.8, 4, 108.0, 93, 3.85] |[2.32, 18.61, 1, 1, 4, 1] |
|Hornet 4 Drive   |[21.4, 6, 258.0, 110, 3.08]|[3.215, 19.44, 1, 0, 3, 1]|
|Hornet Sportabout|[18.7, 8, 360.0, 175, 3.15]|[3.44, 17.02, 0, 0, 3, 2] |
+-----------------+---------------------------+--------------------------+
only showing top 5 rows


In [ ]: