In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

Example data


In [3]:
import pandas as pd
pdf = pd.DataFrame({
        'x1': ['a','a','b','b', 'b', 'c'],
        'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],
        'x3': [1, 1, 2, 2, 2, 4],
        'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],
        'y1': [1, 0, 1, 0, 0, 1],
        'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']
    })
df = spark.createDataFrame(pdf)
df.show()


+---+------+---+---+---+---+
| x1|    x2| x3| x4| y1| y2|
+---+------+---+---+---+---+
|  a| apple|  1|2.4|  1|yes|
|  a|orange|  1|2.5|  0| no|
|  b|orange|  2|3.5|  1| no|
|  b|orange|  2|1.4|  0|yes|
|  b| peach|  2|2.1|  0|yes|
|  c| peach|  4|1.5|  1|yes|
+---+------+---+---+---+---+

VectorAssembler

To fit a ML model in pyspark, we need to combine all feature columns into one single column of vectors: the featuresCol. The VectorAssembler can be used to combine multiple OneHotEncoder columns and other continuous variable columns into one single column.

The example below shows how to combine three OneHotEncoder columns and one numeric column into a featureCol column.

StringIndex and OneHotEncode categorical columns


In [4]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

In [5]:
all_stages = [StringIndexer(inputCol=c, outputCol='idx_' + c) for c in ['x1', 'x2', 'x3']] + \
             [OneHotEncoder(inputCol='idx_' + c, outputCol='ohe_' + c) for c in ['x1', 'x2', 'x3']]
all_stages


Out[5]:
[StringIndexer_4432aeb7f8b4c9e5fad5,
 StringIndexer_40329fc3347ca61a760a,
 StringIndexer_470783ab8b97d80f06b0,
 OneHotEncoder_44af91c8eb1417ffd215,
 OneHotEncoder_43319000e21f77b48397,
 OneHotEncoder_41de9c9da21d8f8985a3]

In [6]:
df_new = Pipeline(stages=all_stages).fit(df).transform(df)
df_new.show()


+---+------+---+---+---+---+------+------+------+-------------+-------------+-------------+
| x1|    x2| x3| x4| y1| y2|idx_x1|idx_x2|idx_x3|       ohe_x1|       ohe_x2|       ohe_x3|
+---+------+---+---+---+---+------+------+------+-------------+-------------+-------------+
|  a| apple|  1|2.4|  1|yes|   1.0|   2.0|   1.0|(2,[1],[1.0])|    (2,[],[])|(2,[1],[1.0])|
|  a|orange|  1|2.5|  0| no|   1.0|   0.0|   1.0|(2,[1],[1.0])|(2,[0],[1.0])|(2,[1],[1.0])|
|  b|orange|  2|3.5|  1| no|   0.0|   0.0|   0.0|(2,[0],[1.0])|(2,[0],[1.0])|(2,[0],[1.0])|
|  b|orange|  2|1.4|  0|yes|   0.0|   0.0|   0.0|(2,[0],[1.0])|(2,[0],[1.0])|(2,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|   0.0|   1.0|   0.0|(2,[0],[1.0])|(2,[1],[1.0])|(2,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|   2.0|   1.0|   2.0|    (2,[],[])|(2,[1],[1.0])|    (2,[],[])|
+---+------+---+---+---+---+------+------+------+-------------+-------------+-------------+

Assemble feature columns into one single feacturesCol with VectorAssembler


In [7]:
df_assembled = VectorAssembler(inputCols=['ohe_x1', 'ohe_x2', 'ohe_x3', 'x4'], outputCol='featuresCol')\
    .transform(df_new)\
    .drop('idx_x1', 'idx_x2', 'idx_x3')
df_assembled.show(truncate=False)


+---+------+---+---+---+---+-------------+-------------+-------------+-----------------------------+
|x1 |x2    |x3 |x4 |y1 |y2 |ohe_x1       |ohe_x2       |ohe_x3       |featuresCol                  |
+---+------+---+---+---+---+-------------+-------------+-------------+-----------------------------+
|a  |apple |1  |2.4|1  |yes|(2,[1],[1.0])|(2,[],[])    |(2,[1],[1.0])|(7,[1,5,6],[1.0,1.0,2.4])    |
|a  |orange|1  |2.5|0  |no |(2,[1],[1.0])|(2,[0],[1.0])|(2,[1],[1.0])|[0.0,1.0,1.0,0.0,0.0,1.0,2.5]|
|b  |orange|2  |3.5|1  |no |(2,[0],[1.0])|(2,[0],[1.0])|(2,[0],[1.0])|[1.0,0.0,1.0,0.0,1.0,0.0,3.5]|
|b  |orange|2  |1.4|0  |yes|(2,[0],[1.0])|(2,[0],[1.0])|(2,[0],[1.0])|[1.0,0.0,1.0,0.0,1.0,0.0,1.4]|
|b  |peach |2  |2.1|0  |yes|(2,[0],[1.0])|(2,[1],[1.0])|(2,[0],[1.0])|[1.0,0.0,0.0,1.0,1.0,0.0,2.1]|
|c  |peach |4  |1.5|1  |yes|(2,[],[])    |(2,[1],[1.0])|(2,[],[])    |(7,[3,6],[1.0,1.5])          |
+---+------+---+---+---+---+-------------+-------------+-------------+-----------------------------+

Convert sparse vectors in featuresCol to dense vectors


In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.linalg import SparseVector, DenseVector

In [9]:
def dense_features_col(x):
    return(x.toArray().dtype)
dense_features_col_udf = udf(dense_features_col, returnType=StringType())

In [10]:
df_assembled.rdd.map(lambda x: x['featuresCol']).take(4)


Out[10]:
[SparseVector(7, {1: 1.0, 5: 1.0, 6: 2.4}),
 DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 2.5]),
 DenseVector([1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 3.5]),
 DenseVector([1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.4])]

In [11]:
df_assembled.rdd.map(lambda x: list(x['featuresCol'].toArray())).take(5)


Out[11]:
[[0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 2.3999999999999999],
 [0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 2.5],
 [1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 3.5],
 [1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.3999999999999999],
 [1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 2.1000000000000001]]

In [ ]: