In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
In [2]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)
In [3]:
import pandas as pd
pdf = pd.DataFrame({
'x1': ['a','a','b','b', 'b', 'c'],
'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],
'x3': [1, 1, 2, 2, 2, 4],
'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],
'y1': [1, 0, 1, 0, 0, 1],
'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']
})
df = spark.createDataFrame(pdf)
df.show()
To fit a ML model in pyspark, we need to combine all feature columns into one single column of vectors: the featuresCol. The VectorAssembler
can be used to combine multiple OneHotEncoder
columns and other continuous variable columns into one single column.
The example below shows how to combine three OneHotEncoder columns and one numeric column into a featureCol column.
In [4]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
In [5]:
all_stages = [StringIndexer(inputCol=c, outputCol='idx_' + c) for c in ['x1', 'x2', 'x3']] + \
[OneHotEncoder(inputCol='idx_' + c, outputCol='ohe_' + c) for c in ['x1', 'x2', 'x3']]
all_stages
Out[5]:
In [6]:
df_new = Pipeline(stages=all_stages).fit(df).transform(df)
df_new.show()
In [7]:
df_assembled = VectorAssembler(inputCols=['ohe_x1', 'ohe_x2', 'ohe_x3', 'x4'], outputCol='featuresCol')\
.transform(df_new)\
.drop('idx_x1', 'idx_x2', 'idx_x3')
df_assembled.show(truncate=False)
In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.linalg import SparseVector, DenseVector
In [9]:
def dense_features_col(x):
return(x.toArray().dtype)
dense_features_col_udf = udf(dense_features_col, returnType=StringType())
In [10]:
df_assembled.rdd.map(lambda x: x['featuresCol']).take(4)
Out[10]:
In [11]:
df_assembled.rdd.map(lambda x: list(x['featuresCol'].toArray())).take(5)
Out[11]:
In [ ]: