In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

Example data


In [3]:
import pandas as pd
pdf = pd.DataFrame({
        'x1': ['a','a','b','b', 'b', 'c'],
        'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],
        'x3': [1, 1, 2, 2, 2, 4],
        'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],
        'y1': [1, 0, 1, 0, 0, 1],
        'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']
    })
df = spark.createDataFrame(pdf)
df.show()


+---+------+---+---+---+---+
| x1|    x2| x3| x4| y1| y2|
+---+------+---+---+---+---+
|  a| apple|  1|2.4|  1|yes|
|  a|orange|  1|2.5|  0| no|
|  b|orange|  2|3.5|  1| no|
|  b|orange|  2|1.4|  0|yes|
|  b| peach|  2|2.1|  0|yes|
|  c| peach|  4|1.5|  1|yes|
+---+------+---+---+---+---+

StringIndexer

StringIndexer maps a string column to a index column that will be treated as a categorical column by spark. The indices start with 0 and are ordered by label frequencies. If it is a numerical column, the column will first be casted to a string column and then indexed by StringIndexer.

There are three steps to implement the StringIndexer

  1. Build the StringIndexer model: specify the input column and output column names.
  2. Learn the StringIndexer model: fit the model with your data.
  3. Execute the indexing: call the transform function to execute the indexing process.

Example: StringIndex column "x1"


In [4]:
from pyspark.ml.feature import StringIndexer

# build indexer
string_indexer = StringIndexer(inputCol='x1', outputCol='indexed_x1')

# learn the model
string_indexer_model = string_indexer.fit(df)

# transform the data
df_stringindexer = string_indexer_model.transform(df)

# resulting df
df_stringindexer.show()


+---+------+---+---+---+---+----------+
| x1|    x2| x3| x4| y1| y2|indexed_x1|
+---+------+---+---+---+---+----------+
|  a| apple|  1|2.4|  1|yes|       1.0|
|  a|orange|  1|2.5|  0| no|       1.0|
|  b|orange|  2|3.5|  1| no|       0.0|
|  b|orange|  2|1.4|  0|yes|       0.0|
|  b| peach|  2|2.1|  0|yes|       0.0|
|  c| peach|  4|1.5|  1|yes|       2.0|
+---+------+---+---+---+---+----------+

From the result above, we can see that (a, b, c) in column x1 are converted to (1.0, 0.0, 2.0). They are ordered by their frequencies in column x1.

OneHotEncoder

OneHotEncoder converts each categories of a StringIndexed column to a sparse vector. Each sparse vector has at most one single active elements that indicate the category index.


In [7]:
df_ohe = df.select('x1')
df_ohe.show()


+---+
| x1|
+---+
|  a|
|  a|
|  b|
|  b|
|  b|
|  c|
+---+

StringIndex column 'x1'


In [14]:
df_x1_indexed = StringIndexer(inputCol='x1', outputCol='indexed_x1').fit(df_ohe).transform(df_ohe)
df_x1_indexed.show()


+---+----------+
| x1|indexed_x1|
+---+----------+
|  a|       1.0|
|  a|       1.0|
|  b|       0.0|
|  b|       0.0|
|  b|       0.0|
|  c|       2.0|
+---+----------+

'x1' has three categories: 'a', 'b' and 'c', which corresponding string indices 1.0, 0.0 and 2.0, respectively.

Mapping string indices to sparse vectors

  • Encoding format: 'string index': ['string indices vector size', 'index of string index in string indices vector', 1.0 ]

Here the string indices vector is [0.0, 1.0, 2.0]. Therefore, the mapping between string indices and sparse vectors are:

  • 0.0: [3, [0], [1.0]]
  • 1.0: [3, [1], [1.0]]
  • 2.0: [3, [2], [1.0]]

After we convert all sparse vectors to dense vectors, we get:


In [43]:
from pyspark.ml.linalg import DenseVector, SparseVector, DenseMatrix, SparseMatrix
x = [SparseVector(3, {0: 1.0}).toArray()] + \
    [SparseVector(3, {1: 1.0}).toArray()] + \
    [SparseVector(3, {2: 1.0}).toArray()]

import numpy as np
np.array(x)


Out[43]:
array([[ 1.,  0.,  0.],
       [ 0.,  1.,  0.],
       [ 0.,  0.,  1.]])

The obtained matrix is exactly the matrix that we would use to represent our categorical variable in a statistical class.

One more step to go

OneHotEncoder by default will drop the last category. So the string indices vector becomes [0.0, 1.0], and the mappings between string indices and sparse vectors are:

  • 0.0: [2, [0], [1.0]]
  • 1.0: [2, [1], [1.0]]
  • 2.0: [2, [], []]

We use a sparse vector that has no active element(basically all elements are 0's) to represent the last category.

Verify

OneHotEncode column 'indexed_x1'


In [46]:
from pyspark.ml.feature import OneHotEncoder

In [47]:
OneHotEncoder(inputCol='indexed_x1', outputCol='encoded_x1').transform(df_x1_indexed).show()


+---+----------+-------------+
| x1|indexed_x1|   encoded_x1|
+---+----------+-------------+
|  a|       1.0|(2,[1],[1.0])|
|  a|       1.0|(2,[1],[1.0])|
|  b|       0.0|(2,[0],[1.0])|
|  b|       0.0|(2,[0],[1.0])|
|  b|       0.0|(2,[0],[1.0])|
|  c|       2.0|    (2,[],[])|
+---+----------+-------------+

Specify to not drop the last category

If we choose to not drop the last category, we get the expected results.


In [49]:
OneHotEncoder(dropLast=False, inputCol='indexed_x1', outputCol='encoded_x1').transform(df_x1_indexed).show()


+---+----------+-------------+
| x1|indexed_x1|   encoded_x1|
+---+----------+-------------+
|  a|       1.0|(3,[1],[1.0])|
|  a|       1.0|(3,[1],[1.0])|
|  b|       0.0|(3,[0],[1.0])|
|  b|       0.0|(3,[0],[1.0])|
|  b|       0.0|(3,[0],[1.0])|
|  c|       2.0|(3,[2],[1.0])|
+---+----------+-------------+