Présentation du problème de reconnaissance de caractères manuscrits (MNIST DataBase à partir d’images numérisées. L’objectif est de comparer les performances (qualité de prévision, temps d'exécution) en fonction de latechnologie, ici Spark et la librairie MLlib, et en fonction de la taille de l'échantillon. a principale limitation de Spark concerne la mémoire; répartie sur chaque noeud elle est vite insuffisante lorsque des grands modèles (beaoucp d'arbres d'une forêt aléatoire) doivent être archivés sur chacun de ceux-ci.
L'objectif général est la construction d'un meilleur modèle de reconnaissance de chiffres manuscrits. Ce problème est ancien (zipcodes) et sert souvent de base pour la comparaison de méthodes et d'algorithmes d'apprentissage. Le site de Yann Le Cun: MNIST DataBase, est à la source des données étudiées, il décrit précisément le problème et les modes d'acquisition. Il tenait à jour la liste des publications proposant des solutions avec la qualité de prévision obtenue. Ce problème a également été proposé comme sujet d'un concours Kaggle mais sur un sous-ensemble des données.
De façon très schématique, plusieurs stratégies sont développées dans une vaste littérature sur ces données.
L'objectif de cet atelier est de comparer sur des données relativement volumineuses les performances de différents environnements technologiques et librairies. Une dernière question est abordée, elle concerne l'influence de la taille de l'échantillon d'apprentissage sur le temps d'exécution ainsi que sur la qualité des prévisions.
Analyse des données avec Spark, noter les temps d'exécution, la précision estimée sur l'échantillon test.
Les données peuvent être préalablement téléchargées ou directement lues. Ce sont celles originales du site MNIST DataBase mais préalablement converties au format .csv, certes plus volumineux mais plus facile à lire. Attention le fichier mnist_train.zip
présent dans le dépôt est compressé.
In [ ]:
sc
In [ ]:
# Importation des packages
import time
from numpy import array
# Répertoire courant ou répertoire accessible de tous les "workers" du cluster
DATA_PATH=""
Les données sont déjà partagée en une partie apprentissage et une test utilisée pour les comparaisons entre méthodes dans les publications. Ce sont bien les données du site MNIST mais transformée au format .csv pour en faciliter la lecture.
Elles doivent être stockées à un emplacement accessibles de tous les noeuds du cluster pour permettre la construction de la base de données réparties (RDD).
Dans une utilisation monoposte (standalone) de Spark, elles sont simplement chargées dans le répertoire courant.
In [ ]:
# Chargement des fichiers
import urllib.request
f = urllib.request.urlretrieve("https://www.math.univ-toulouse.fr/~besse/Wikistat/data/mnist_train.csv",DATA_PATH+"mnist_train.csv")
f = urllib.request.urlretrieve("https://www.math.univ-toulouse.fr/~besse/Wikistat/data/mnist_test.csv",DATA_PATH+"mnist_test.csv")
In [ ]:
# Transformation du fichier texte en RDD de valeurs
## Données d'apprentissage
# Transformation ou étape map de séparation des champs
trainRDD = sc.textFile(DATA_PATH+"mnist_train.csv").map(lambda l: [float(x) for x in l.split(',')])
# Action
trainRDD.count() # taille de l'échantillon
#test
Pour pouvoir être intérprété par les différentes méthodes de classification de la librairie SparkML, les données doivent être converties en objet DataFrame.
Pour plus d'information sur l'utilisation de ces DataFrames, reportez vous aux calepins 1-Intro-PySpark/Cal3-PySpark-SQL.ipynb et 1-Intro-PySpark/Cal4-PySpark-Statelem&Pipeline-SparkML.ipynb
In [ ]:
# Transformation du de la RDD en DataFrame
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
#Cette fonction va permettre de transformer chaque ligne de la RDD en une "Row" pyspark.sql.
def list_to_Row(l):
#Creation d'un vecteur sparse pour les features
features = Vectors.sparse(784,dict([(i,v) for i,v in enumerate(l[:-1]) if v!=0]))
row = Row(label = l[-1], features= features)
return row
trainDF = trainRDD.map(list_to_Row).toDF()
In [ ]:
# Exemple de ligne
trainDF.take(1)[0]
In [ ]:
## Même chose pour les données de test
testRDD = sc.textFile(DATA_PATH+'mnist_test.csv').map(lambda l: [float(x) for x in l.split(',')])
testRDD.count() # taille de l'échantillon
In [ ]:
testDF = testRDD.map(list_to_Row).toDF()
testDF.take(1)
Extraction d'un sous-échantillon d'apprentissage pour tester les programmes sur des données plus petites. Itérer cette démarche permet d'étudier l'évolution de l'erreur de prévision en fonction de la taille de l'échantillon d'apprentissage.
In [ ]:
tauxEch=0.1 # tester pour des tailles croissantes d'échantillon d'apprentissage
(trainData, DropDatal) = trainRDD.randomSplit([tauxEch, 1-tauxEch])
trainData.count()
Les méthodes de classifications de la librairie SparkML suivent le même shéma d'utilisation.
Il faut dans un premier temps crée un objets Estimators pour configurer les paramètres de la méthode. Dans un second temps on réalise l'apprentissage en appliquant la fonction fit de l'Estimators sur la DataFrame d'apprentissage. Cette commande créé un objet différent, le Transformers qui permettra de réaliser les prédictions.
Par défaut les différentes méthodes considère que les noms des colonnes correspondants aux variables et au prédicants du jeux d'apprentissage sont respectivement "features" et "label". Tandis que les prédictions seront automatiquement assigné à une colonne de nom "prediction". Il est conseillé de garder cette terminiologie, mais ces attributs par défaut peuvent être modifié en spécifiant les paramètres featuresCol, labelCol et predictionCol de chaque méthode.
Exemple d'utilisation pour expliciter la syntaxe mais sans grand intérêt pour ces données qui ne satisfont pas à des frontières de discrimination linéaires. L'algorithme permettant de réaliser une regression logistique multinomial est l'algorithme softmax.
In [ ]:
### Logistic Regression
from pyspark.ml.classification import LogisticRegression
### Configuraiton des paramètres de la méthode
time_start=time.time()
lr = LogisticRegression(maxIter=100, regParam=0.01, fitIntercept=False, tol=0.0001,
family = "multinomial", elasticNetParam=0.0) #0 for L2 penalty, 1 for L1 penalty
### Génération du modèle
model_lr = lr.fit(trainDF)
time_end=time.time()
time_lrm=(time_end - time_start)
print("LR prend %d s" %(time_lrm)) # (104s avec taux=1)
In [ ]:
predictionsRDD = model_lr.transform(testDF)
labelsAndPredictions = predictionsRDD.select("label","prediction").collect()
nb_good_prediction = sum([r[0]==r[1] for r in labelsAndPredictions])
nb_test = testDF.count()
testErr = 1-nb_good_prediction/nb_test
print('Test Error = ' + str(testErr)) # (0.08 avec taux =1)
LogisticRegressionTrainingSummary provides a summary for a LogisticRegressionModel. Currently, only binary classification is supported. Support for multiclass model summaries will be added in the future.
Même chose pour un arbre de discrimination. Comme pour l'implémentation de scikit-learn, les arbres ne peuvent être optimisés par un élagage basé sur une pénalisation de la complexité. Ce paramètre n'est pas présent, seule la profondeur max ou le nombre minimal d'observations par feuille peut contrôler la complexité. Noter l'apparition d'un nouveau paramètre: maxBins qui, schématiquement, rend qualitative ordinale à maxBins classes toute variable quantitative. D'autre part, il n'y a pas de représentation graphique. Cette implémentation d'arbre est issue d'un projet Google pour adapter cet algorithme aux contraintes mapreduce de données sous Hadoop. Elle vaut surtout pour permettre de construire une implémentation des forêts aléatoires.
In [ ]:
### Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier
### Configuraiton des paramètres de la méthode
time_start=time.time()
dt = DecisionTreeClassifier(impurity='gini',maxDepth=5,maxBins=32, minInstancesPerNode=1,
minInfoGain=0.0)
### Génération du modèle
model_dt = dt.fit(trainDF)
time_end=time.time()
time_dt=(time_end - time_start)
print("DT takes %d s" %(time_dt))
In [ ]:
predictionsRDD = model_dt.transform(testDF)
labelsAndPredictions = predictionsRDD.select("label","prediction").collect()
nb_good_prediction = sum([r[0]==r[1] for r in labelsAndPredictions])
nb_test = testDF.count()
testErr = 1-nb_good_prediction/nb_test
print('Test Error = ' + str(testErr))
Les $k$-nn ne sont pas "scalables" et donc pas présents. Voici la syntaxe et les paramètres associés à l'algorithme des forêts aléatoires. Parmi ceux "classiques" se trouvent numTrees, featureSubsetStrategy, impurity, maxdepth et en plus maxbins comme pour les arbres. Les valeurs du paramètres maxDepth est critique pour la qualité de la prévision. en principe, il n'est pas contraint, un arbre peut se déployer sans "limite" mais face à des données massives cela peut provoquer des plantages intempestifs.
In [ ]:
### Random Forest
from pyspark.ml.classification import RandomForestClassifier
### Configuraiton des paramètres de la méthode
time_start=time.time()
rf = RandomForestClassifier(numTrees = 2, impurity='gini', maxDepth=12,
maxBins=32, seed=None)
### Génération du modèle
model_rf = rf.fit(trainDF)
time_end=time.time()
time_rf=(time_end - time_start)
print("RF takes %d s" %(time_rf))#
Erreur sur l'échantillon test
In [ ]:
predictionsRDD = model_rf.transform(testDF)
labelsAndPredictions = predictionsRDD.select("label","prediction").collect()
nb_good_prediction = sum([r[0]==r[1] for r in labelsAndPredictions])
nb_test = testDF.count()
testErr = 1-nb_good_prediction/nb_test
print('Test Error = ' + str(testErr))
Même traitement sur la totalité de l'échantillon
100 arbres, sélection automatique, maxDepth=9
maxBins | Temps | Erreur |
---|---|---|
32 | 259 | 0.067 |
64 | 264 | 0.068 |
128 | 490 | 0.065 |
100 arbres, sélection automatique, maxBins=32
maxDepth | Temps | Erreur |
---|---|---|
4 | 55 | 0.21 |
9 | 259 | 0.067 |
18 | 983 | 0.035 |
Le nombre de variables tirées à chaque noeud n'a pas été optimisé.
Le paramètre maxBins ne semble pas trop influencer la précision du modèle, au contriare de la profondeur maximum des arbres. Avec une profondeur suffisante, on retrouve (presque) les résultats classiques des forêts aléatoires sur ces données.
COmparer les résultats obtenus pour les trois environnements.
In [ ]: