In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
In [2]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)
In [3]:
import pandas as pd
pdf = pd.DataFrame({
'x1': ['a','a','b','b', 'b', 'c'],
'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],
'x3': [1, 1, 2, 2, 2, 4],
'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],
'y1': [1, 0, 1, 0, 0, 1],
'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']
})
df = spark.createDataFrame(pdf)
df.show()
Pipeline is a sequence of stages which consists of Estimators and/or Transformers. Estimator has fit
method and Transformer has transform
method. Therefore, we can say, a pipeline is a sequence of fit
and transform
methods. When it is a fit
method, it applies to the input data and turns into a transform
method. Then the transform
method applies to the fitted data and output transformed data. The transformed data output from previous stage has to be an acceptable input to the next stage's fit/transform method.
In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder
In [21]:
stringindex_stages = [StringIndexer(inputCol=c, outputCol='idx_' + c) for c in ['x1', 'x2', 'y1', 'y2']]
stringindex_stages
Out[21]:
In [22]:
onehotencode_stages = [OneHotEncoder(inputCol='idx_' + c, outputCol='ohe_' + c) for c in ['x1', 'x2', 'y1', 'y2']]
onehotencode_stages
Out[22]:
Note that the outputCol label in StringIndex stages is the same as the inputCol label in the OneHotEncode stages.
In [23]:
all_stages = stringindex_stages + onehotencode_stages
[type(x) for x in all_stages]
Out[23]:
In the above list, pyspark.ml.feature.StringIndexer
is an Estimator(has a fit method) and pyspark.ml.feature.OneHotEncoder
is a transformer(has a transform method).
In [24]:
Pipeline(stages=all_stages).fit(df).transform(df).show()
In the example above, our strategy is to StringIndex all four columns and then OneHotEncode them. Since each OneHotEncode stage only depends on the output of their corresponding StringIndex stage, our stages list could be [stringindexer on x1, onehotencoder on x1, stringindexer on x2, onehotencoder on x2, stringindexer on y1, onehotencoder on y1, stringindexer on y2, onehotencoder on y2]
.
In [33]:
all_stages
Out[33]:
In [35]:
new_all_stages = [all_stages[x] for x in [0,4,1,5,2,6,3,7]]
new_all_stages
Out[35]:
In [36]:
Pipeline(stages=new_all_stages).fit(df).transform(df).show()
In [ ]: