winutils.exe chmod 777 \tmp\hive
conda install -c conda-forge findspark
In [1]:
import sys, os, shutil
import findspark
# use findspark to locate and initialize pyspark before importing pyspark
findspark.init()
import pyspark
In [2]:
print("Python Version:", sys.version)
print("Spark Version:", pyspark.__version__)
Adapted from https://github.com/apache/spark/blob/master/examples/src/main/python/pi.py
In [3]:
from random import random
from operator import add
from pyspark.sql import SparkSession
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
spark = SparkSession \
.builder \
.appName("PythonPi") \
.getOrCreate()
partitions = 10
num_samples = 10000
count = spark.sparkContext.parallelize(range(1, num_samples + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / num_samples))
spark.stop()
Adapted from https://github.com/apache/spark/blob/master/examples/src/main/python/ml/decision_tree_classification_example.py
In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("DecisionTreeBinaryClassificationExample")\
.getOrCreate()
data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3], 1234)
# Create a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
# Select (prediction, true label) and compute accuracy
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
treeModel = model.stages[2]
# summary only
print(treeModel.toDebugString)
spark.stop()
In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("DecisionTreeMulticlassClassificationExample")\
.getOrCreate()
data = spark.read.format("libsvm").load("data/sample_multiclass_classification_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4], 1234)
# Create a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
# Select (prediction, true label) and compute accuracy
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
treeModel = model.stages[2]
# summary only
print(treeModel.toDebugString)
spark.stop()
Adapted from https://github.com/apache/spark/blob/master/examples/src/main/python/ml/multilayer_perceptron_classification.py
In [6]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("multilayer_perceptron_classification_example")\
.getOrCreate()
# Load training data
data = spark.read.format("libsvm").load("data/sample_multiclass_classification_data.txt")
# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
# train the model
model = trainer.fit(train)
# compute accuracy on the test set
predictions = model.transform(test)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute accuracy
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
spark.stop()
Using mushroom dataset from https://archive.ics.uci.edu/ml/datasets/Mushroom
This example describes the basic workflow consisting of the following phases:
using https://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html#module-pyspark.sql
In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("DecisionTreeBinaryClassificationExampleOnMushrooms")\
.getOrCreate()
using https://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html#module-pyspark.sql
In [8]:
# returns a DataFrame
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("data/mushrooms.csv")
using https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame
In [9]:
print("Number of data rows:", df.count())
In [10]:
df.printSchema() # or data.dtypes or df.columns
In [11]:
df.describe("class", "cap-shape", "cap-surface", "cap-color").show()
In [12]:
df.first() # or df.head(1) or df.show(1)
Out[12]:
This is a three-step process:
(a) Transform label
(b) Transform features
(c) Assemble features
using https://spark.apache.org/docs/2.1.1/api/python/pyspark.ml.html#module-pyspark.ml.feature
In [13]:
labelIndexer = StringIndexer(inputCol="class", outputCol="indexedLabel")
In [14]:
categorical_columns = df.columns[1:]
featureIndexers = [StringIndexer(inputCol=col, outputCol='stringindexed_' + col) for col in categorical_columns]
In [15]:
inputFeatures = ['stringindexed_' + col for col in categorical_columns]
assembler = VectorAssembler(
inputCols=inputFeatures,
outputCol="indexedFeatures")
using https://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit
In [16]:
(trainingData, testData) = df.randomSplit([0.7, 0.3], 1234)
In [17]:
clf = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
Chain indexers and model in a Pipeline
using https://spark.apache.org/docs/2.1.1/api/python/pyspark.ml.html#pyspark.ml.Pipeline
In [18]:
pipeline = Pipeline(stages=[labelIndexer] + featureIndexers + [assembler, clf])
using https://spark.apache.org/docs/2.1.1/api/python/pyspark.ml.html#pyspark.ml.Pipeline.fit
In [19]:
model = pipeline.fit(trainingData)
(1) Perform predictions against the test data by using transform
using https://spark.apache.org/docs/2.1.1/api/python/pyspark.ml.html#pyspark.ml.PipelineModel.transform
In [20]:
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "indexedFeatures").show(5)
(2) Compute accuracy against the test data using Evaluator
In [21]:
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
(3) Display trained decision tree using toDebugString
In [22]:
treeModel = model.stages[-1] # last stage in Pipeline
# summary only
print(treeModel.toDebugString)
using https://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession.stop
In [23]:
spark.stop()