In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

Example data


In [3]:
import pandas as pd
pdf = pd.DataFrame({
        'x1': ['a','a','b','b', 'b', 'c'],
        'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],
        'x3': [1, 1, 2, 2, 2, 4],
        'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],
        'y1': [1, 0, 1, 0, 0, 1],
        'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']
    })
df = spark.createDataFrame(pdf)
df.show()


+---+------+---+---+---+---+
| x1|    x2| x3| x4| y1| y2|
+---+------+---+---+---+---+
|  a| apple|  1|2.4|  1|yes|
|  a|orange|  1|2.5|  0| no|
|  b|orange|  2|3.5|  1| no|
|  b|orange|  2|1.4|  0|yes|
|  b| peach|  2|2.1|  0|yes|
|  c| peach|  4|1.5|  1|yes|
+---+------+---+---+---+---+

Pipeline

Pipeline is a sequence of stages which consists of Estimators and/or Transformers. Estimator has fit method and Transformer has transform method. Therefore, we can say, a pipeline is a sequence of fit and transform methods. When it is a fit method, it applies to the input data and turns into a transform method. Then the transform method applies to the fitted data and output transformed data. The transformed data output from previous stage has to be an acceptable input to the next stage's fit/transform method.


In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

Example

We are going to use pipeline to StringIndex columns x1, x2, y1, and y2. Then we OneHotEncode the resulting StringIdexed columns.


In [21]:
stringindex_stages = [StringIndexer(inputCol=c, outputCol='idx_' + c) for c in ['x1', 'x2', 'y1', 'y2']]
stringindex_stages


Out[21]:
[StringIndexer_4822a1cd65667da67acd,
 StringIndexer_458889022f08259328b3,
 StringIndexer_4cf4afdb4c495ecc227f,
 StringIndexer_4460a02263a04635288b]

In [22]:
onehotencode_stages = [OneHotEncoder(inputCol='idx_' + c, outputCol='ohe_' + c) for c in ['x1', 'x2', 'y1', 'y2']]
onehotencode_stages


Out[22]:
[OneHotEncoder_4435bac1095e2bed0952,
 OneHotEncoder_480db42beb32e2d6cb83,
 OneHotEncoder_498cbd54623097da2b6c,
 OneHotEncoder_460c95da4c6185ac4fe2]

Note that the outputCol label in StringIndex stages is the same as the inputCol label in the OneHotEncode stages.

Elements in the stage list


In [23]:
all_stages = stringindex_stages + onehotencode_stages
[type(x) for x in all_stages]


Out[23]:
[pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.StringIndexer,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder,
 pyspark.ml.feature.OneHotEncoder]

In the above list, pyspark.ml.feature.StringIndexer is an Estimator(has a fit method) and pyspark.ml.feature.OneHotEncoder is a transformer(has a transform method).

Build and run pipeline


In [24]:
Pipeline(stages=all_stages).fit(df).transform(df).show()


+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+
| x1|    x2| x3| x4| y1| y2|idx_x1|idx_x2|idx_y1|idx_y2|       ohe_x1|       ohe_x2|       ohe_y1|       ohe_y2|
+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+
|  a| apple|  1|2.4|  1|yes|   1.0|   2.0|   1.0|   0.0|(2,[1],[1.0])|    (2,[],[])|    (1,[],[])|(1,[0],[1.0])|
|  a|orange|  1|2.5|  0| no|   1.0|   0.0|   0.0|   1.0|(2,[1],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|    (1,[],[])|
|  b|orange|  2|3.5|  1| no|   0.0|   0.0|   1.0|   1.0|(2,[0],[1.0])|(2,[0],[1.0])|    (1,[],[])|    (1,[],[])|
|  b|orange|  2|1.4|  0|yes|   0.0|   0.0|   0.0|   0.0|(2,[0],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|   0.0|   1.0|   0.0|   0.0|(2,[0],[1.0])|(2,[1],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|   2.0|   1.0|   1.0|   0.0|    (2,[],[])|(2,[1],[1.0])|    (1,[],[])|(1,[0],[1.0])|
+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+

Reorder pipeline stages

In the example above, our strategy is to StringIndex all four columns and then OneHotEncode them. Since each OneHotEncode stage only depends on the output of their corresponding StringIndex stage, our stages list could be [stringindexer on x1, onehotencoder on x1, stringindexer on x2, onehotencoder on x2, stringindexer on y1, onehotencoder on y1, stringindexer on y2, onehotencoder on y2].

Old stages


In [33]:
all_stages


Out[33]:
[StringIndexer_4822a1cd65667da67acd,
 StringIndexer_458889022f08259328b3,
 StringIndexer_4cf4afdb4c495ecc227f,
 StringIndexer_4460a02263a04635288b,
 OneHotEncoder_4435bac1095e2bed0952,
 OneHotEncoder_480db42beb32e2d6cb83,
 OneHotEncoder_498cbd54623097da2b6c,
 OneHotEncoder_460c95da4c6185ac4fe2]

New stages


In [35]:
new_all_stages = [all_stages[x] for x in [0,4,1,5,2,6,3,7]]
new_all_stages


Out[35]:
[StringIndexer_4822a1cd65667da67acd,
 OneHotEncoder_4435bac1095e2bed0952,
 StringIndexer_458889022f08259328b3,
 OneHotEncoder_480db42beb32e2d6cb83,
 StringIndexer_4cf4afdb4c495ecc227f,
 OneHotEncoder_498cbd54623097da2b6c,
 StringIndexer_4460a02263a04635288b,
 OneHotEncoder_460c95da4c6185ac4fe2]

Build and run pipeline


In [36]:
Pipeline(stages=new_all_stages).fit(df).transform(df).show()


+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+
| x1|    x2| x3| x4| y1| y2|idx_x1|       ohe_x1|idx_x2|       ohe_x2|idx_y1|       ohe_y1|idx_y2|       ohe_y2|
+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+
|  a| apple|  1|2.4|  1|yes|   1.0|(2,[1],[1.0])|   2.0|    (2,[],[])|   1.0|    (1,[],[])|   0.0|(1,[0],[1.0])|
|  a|orange|  1|2.5|  0| no|   1.0|(2,[1],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   1.0|    (1,[],[])|
|  b|orange|  2|3.5|  1| no|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   1.0|    (1,[],[])|   1.0|    (1,[],[])|
|  b|orange|  2|1.4|  0|yes|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|
|  b| peach|  2|2.1|  0|yes|   0.0|(2,[0],[1.0])|   1.0|(2,[1],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|
|  c| peach|  4|1.5|  1|yes|   2.0|    (2,[],[])|   1.0|(2,[1],[1.0])|   1.0|    (1,[],[])|   0.0|(1,[0],[1.0])|
+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+


In [ ]: