For this problem, we will MNIST dataset. http://yann.lecun.com/exdb/mnist/
In [1]:
spark.sparkContext.uiWebUrl
Out[1]:
In [4]:
import matplotlib.pyplot as plt
import pandas as pd
%matplotlib inline
In [10]:
df_training = (spark
.read
.options(header = False, inferSchema = True)
.csv("data/MNIST/mnist_train.csv"))
In [71]:
df_training.count()
Out[71]:
In [14]:
print("No of columns: ", len(df_training.columns), df_training.columns)
In [18]:
feature_culumns = ["_c" + str(i+1) for i in range(784)]
print(feature_culumns)
In [19]:
from pyspark.ml.feature import VectorAssembler
In [20]:
vectorizer = VectorAssembler(inputCols=feature_culumns, outputCol="features")
training = (vectorizer
.transform(df_training)
.select("_c0", "features")
.toDF("label", "features")
.cache())
training.show()
In [26]:
a = training.first().features.toArray()
type(a)
Out[26]:
In [29]:
plt.imshow(a.reshape(28, 28), cmap="Greys")
Out[29]:
In [40]:
images = training.sample(False, 0.01, 1).take(25)
fig, _ = plt.subplots(5, 5, figsize = (10, 10))
for i, ax in enumerate(fig.axes):
r = images[i]
label = r.label
features = r.features
ax.imshow(features.toArray().reshape(28, 28), cmap = "Greys")
ax.set_title("True: " + str(label))
plt.tight_layout()
In [74]:
counts = training.groupBy("label").count()
In [76]:
counts_df = counts.rdd.map(lambda r: {"label": r['label'],
"count": r['count']}).collect()
pd.DataFrame(counts_df).set_index("label").sort_index().plot.bar()
Out[76]:
In [55]:
df_testing = (spark
.read
.options(header = False, inferSchema = True)
.csv("data/MNIST/mnist_test.csv"))
testing = (vectorizer
.transform(df_testing)
.select("_c0", "features")
.toDF("label", "features")
.cache())
In [56]:
from pyspark.ml.classification import LogisticRegression
In [57]:
lr = LogisticRegression(featuresCol="features",
labelCol="label",
regParam=0.1,
elasticNetParam=0.1,
maxIter=10000)
In [58]:
lr_model = lr.fit(training)
In [65]:
from pyspark.sql.functions import *
In [67]:
test_pred = lr_model.transform(testing).withColumn("matched", expr("label == prediction"))
test_pred.show()
In [61]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
In [62]:
evaluator = MulticlassClassificationEvaluator(labelCol="label",
predictionCol="prediction",
metricName="accuracy")
In [63]:
evaluator.evaluate(test_pred)
Out[63]:
In [70]:
(test_pred
.withColumn("matched", expr("cast(matched as int)"))
.groupby("label")
.agg(avg("matched"))
.orderBy("label")
.show())
In [15]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
In [16]:
layers = [784, 100, 20, 10]
perceptron = MultilayerPerceptronClassifier(maxIter=1000, layers=layers, blockSize=128, seed=1234)
perceptron_model = perceptron.fit(training)
In [ ]:
from time import time
In [ ]:
start_time = time()
perceptron_model = perceptron.fit(training)
test_pred = perceptron_model.transform(testing)
print("Accuracy:", evaluator.evaluate(test_pred))
print("Time taken: %d" % (time() - start_time))
In [ ]: