In [24]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Feedforward neural network example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
In [25]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Load training data
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').load("../data/WineData.csv",header=True);
df.show(5)
In [26]:
df.printSchema()
In [27]:
# Convert to float format
def string_to_float(x):
return float(x)
#
def condition(r):
if (0<= r <= 4):
label = "low"
elif(4< r <= 6):
label = "medium"
else:
label = "high"
return label
In [28]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType
string_to_float_udf = udf(string_to_float, DoubleType())
quality_udf = udf(lambda x: condition(x), StringType())
In [29]:
#df= df.withColumn("quality", string_to_float_udf("quality")).withColumn("Cquality", quality_udf("quality"))
df= df.withColumn("quality", quality_udf("quality"))
In [35]:
df.printSchema()
Out[35]:
In [31]:
df.show()
In [33]:
# convert the data to dense vector
def transData(data):
return data.rdd.map(lambda r: [r[-1], Vectors.dense(r[:-1])]).toDF(['label','features'])
Out[33]:
In [11]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
data= transData(df)
data.show()
In [12]:
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
labelIndexer.transform(data).show(6)
In [13]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", \
outputCol="indexedFeatures", \
maxCategories=4).fit(data)
featureIndexer.transform(data).show(6)
In [14]:
data.printSchema()
In [15]:
# Split the data into train and test
(trainingData, testData) = data.randomSplit([0.6, 0.4])
In [15]:
data.show()
In [16]:
# specify layers for the neural network:
# input layer of size 11 (features), two intermediate of size 5 and 4
# and output of size 7 (classes)
layers = [11, 5, 4, 4, 3 , 7]
# create the trainer and set its parameters
FNN = MultilayerPerceptronClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures",\
maxIter=100, layers=layers, blockSize=128, seed=1234)
In [17]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
labels=labelIndexer.labels)
In [18]:
# Chain indexers and forest in a Pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, FNN, labelConverter])
In [19]:
# train the model
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
In [20]:
# Make predictions.
predictions = model.transform(testData)
In [21]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
In [23]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Predictions accuracy = %g, Test Error = %g" % (accuracy,(1.0 - accuracy)))
In [ ]:
In [ ]: