Résumé

Présentation du problème de reconnaissance de caractères manuscrits (MNIST DataBase à partir d’images numérisées. L’objectif est de comparer les performances (qualité de prévision, temps d'exécution) en fonction de latechnologie, ici Spark et la librairie MLlib, et en fonction de la taille de l'échantillon. a principale limitation de Spark concerne la mémoire; répartie sur chaque noeud elle est vite insuffisante lorsque des grands modèles (beaoucp d'arbres d'une forêt aléatoire) doivent être archivés sur chacun de ceux-ci.

1 Introduction

1.1 Objetif

L'objectif général est la construction d'un meilleur modèle de reconnaissance de chiffres manuscrits. Ce problème est ancien (zipcodes) et sert souvent de base pour la comparaison de méthodes et d'algorithmes d'apprentissage. Le site de Yann Le Cun: MNIST DataBase, est à la source des données étudiées, il décrit précisément le problème et les modes d'acquisition. Il tenait à jour la liste des publications proposant des solutions avec la qualité de prévision obtenue. Ce problème a également été proposé comme sujet d'un concours Kaggle mais sur un sous-ensemble des données.

De façon très schématique, plusieurs stratégies sont développées dans une vaste littérature sur ces données.

  • Utiliser une méthode classique (k-nn, random forest...) sans trop raffiner mais avec des temps d'apprentissage rapide conduit à un taux d'erreur autour de 3\%.
  • Ajouter ou intégrer un pré-traitement des données permettant de recaler les images par des distorsions plus ou moins complexes.
  • Construire une mesure de distance adaptée au problème, par exemple invariante par rotation, translation, puis l'intégrer dans une technique d'apprentissage classique comme les $k$ plus proches voisins.
  • Utiliser une méthode plus flexibles (réseau de neurones épais) avec une optimisation fine des paramètres.

L'objectif de cet atelier est de comparer sur des données relativement volumineuses les performances de différents environnements technologiques et librairies. Une dernière question est abordée, elle concerne l'influence de la taille de l'échantillon d'apprentissage sur le temps d'exécution ainsi que sur la qualité des prévisions.

Analyse des données avec Spark, noter les temps d'exécution, la précision estimée sur l'échantillon test.

1.2 Lecture des données d'apprentissage et de test

Les données peuvent être préalablement téléchargées ou directement lues. Ce sont celles originales du site MNIST DataBase mais préalablement converties au format .csv, certes plus volumineux mais plus facile à lire. Attention le fichier mnist_train.zip présent dans le dépôt est compressé.


In [ ]:
sc

In [ ]:
# Importation des packages
import time
from numpy import array
# Répertoire courant ou répertoire accessible de tous les "workers" du cluster
DATA_PATH=""

Gestion des données

Importation et transformation des données au format RDD

Les données sont déjà partagée en une partie apprentissage et une test utilisée pour les comparaisons entre méthodes dans les publications. Ce sont bien les données du site MNIST mais transformée au format .csv pour en faciliter la lecture.

Elles doivent être stockées à un emplacement accessibles de tous les noeuds du cluster pour permettre la construction de la base de données réparties (RDD).

Dans une utilisation monoposte (standalone) de Spark, elles sont simplement chargées dans le répertoire courant.


In [ ]:
# Chargement des fichiers
import urllib.request
f = urllib.request.urlretrieve("https://www.math.univ-toulouse.fr/~besse/Wikistat/data/mnist_train.csv",DATA_PATH+"mnist_train.csv")
f = urllib.request.urlretrieve("https://www.math.univ-toulouse.fr/~besse/Wikistat/data/mnist_test.csv",DATA_PATH+"mnist_test.csv")

In [ ]:
# Transformation du fichier texte en RDD de valeurs
## Données d'apprentissage
# Transformation ou étape map de séparation des champs
trainRDD = sc.textFile(DATA_PATH+"mnist_train.csv").map(lambda l: [float(x) for x in l.split(',')])
# Action
trainRDD.count() # taille de l'échantillon
#test

Conversion des données au format DataFrame

Pour pouvoir être intérprété par les différentes méthodes de classification de la librairie SparkML, les données doivent être converties en objet DataFrame.

Pour plus d'information sur l'utilisation de ces DataFrames, reportez vous aux calepins 1-Intro-PySpark/Cal3-PySpark-SQL.ipynb et 1-Intro-PySpark/Cal4-PySpark-Statelem&Pipeline-SparkML.ipynb


In [ ]:
# Transformation du de la RDD en DataFrame

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

#Cette fonction va permettre de transformer chaque ligne de la RDD en une "Row" pyspark.sql. 

def list_to_Row(l):    
    #Creation d'un vecteur sparse pour les features
    features = Vectors.sparse(784,dict([(i,v) for i,v in enumerate(l[:-1]) if v!=0]))
    row = Row(label = l[-1], features= features)
    return row

trainDF = trainRDD.map(list_to_Row).toDF()

In [ ]:
# Exemple de ligne
trainDF.take(1)[0]

In [ ]:
## Même chose pour les données de test
testRDD = sc.textFile(DATA_PATH+'mnist_test.csv').map(lambda l: [float(x) for x in l.split(',')])
testRDD.count() # taille de l'échantillon

In [ ]:
testDF = testRDD.map(list_to_Row).toDF()
testDF.take(1)

Sous-échantillon d'apprentissage

Extraction d'un sous-échantillon d'apprentissage pour tester les programmes sur des données plus petites. Itérer cette démarche permet d'étudier l'évolution de l'erreur de prévision en fonction de la taille de l'échantillon d'apprentissage.


In [ ]:
tauxEch=0.1 # tester pour des tailles croissantes d'échantillon d'apprentissage
(trainData, DropDatal) = trainRDD.randomSplit([tauxEch, 1-tauxEch])
trainData.count()

Méthode de classification

Les méthodes de classifications de la librairie SparkML suivent le même shéma d'utilisation.

Il faut dans un premier temps crée un objets Estimators pour configurer les paramètres de la méthode. Dans un second temps on réalise l'apprentissage en appliquant la fonction fit de l'Estimators sur la DataFrame d'apprentissage. Cette commande créé un objet différent, le Transformers qui permettra de réaliser les prédictions.

Par défaut les différentes méthodes considère que les noms des colonnes correspondants aux variables et au prédicants du jeux d'apprentissage sont respectivement "features" et "label". Tandis que les prédictions seront automatiquement assigné à une colonne de nom "prediction". Il est conseillé de garder cette terminiologie, mais ces attributs par défaut peuvent être modifié en spécifiant les paramètres featuresCol, labelCol et predictionCol de chaque méthode.

Régression logistique

Exemple d'utilisation pour expliciter la syntaxe mais sans grand intérêt pour ces données qui ne satisfont pas à des frontières de discrimination linéaires. L'algorithme permettant de réaliser une regression logistique multinomial est l'algorithme softmax.


In [ ]:
### Logistic Regression
from pyspark.ml.classification import LogisticRegression

### Configuraiton des paramètres de la méthode
time_start=time.time()
lr = LogisticRegression(maxIter=100, regParam=0.01, fitIntercept=False, tol=0.0001,
            family = "multinomial", elasticNetParam=0.0) #0 for L2 penalty, 1 for L1 penalty

### Génération du modèle
model_lr = lr.fit(trainDF)
 
time_end=time.time()
time_lrm=(time_end - time_start)
print("LR prend %d s" %(time_lrm)) # (104s avec taux=1)

Erreur sur l'échantillon test


In [ ]:
predictionsRDD = model_lr.transform(testDF)
labelsAndPredictions = predictionsRDD.select("label","prediction").collect()
nb_good_prediction = sum([r[0]==r[1] for r in labelsAndPredictions])
nb_test = testDF.count()
testErr = 1-nb_good_prediction/nb_test
print('Test Error = ' + str(testErr)) # (0.08 avec taux =1)

LogisticRegressionTrainingSummary provides a summary for a LogisticRegressionModel. Currently, only binary classification is supported. Support for multiclass model summaries will be added in the future.

Arbre binaire de décision

Même chose pour un arbre de discrimination. Comme pour l'implémentation de scikit-learn, les arbres ne peuvent être optimisés par un élagage basé sur une pénalisation de la complexité. Ce paramètre n'est pas présent, seule la profondeur max ou le nombre minimal d'observations par feuille peut contrôler la complexité. Noter l'apparition d'un nouveau paramètre: maxBins qui, schématiquement, rend qualitative ordinale à maxBins classes toute variable quantitative. D'autre part, il n'y a pas de représentation graphique. Cette implémentation d'arbre est issue d'un projet Google pour adapter cet algorithme aux contraintes mapreduce de données sous Hadoop. Elle vaut surtout pour permettre de construire une implémentation des forêts aléatoires.


In [ ]:
### Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier

### Configuraiton des paramètres de la méthode
time_start=time.time()
dt = DecisionTreeClassifier(impurity='gini',maxDepth=5,maxBins=32, minInstancesPerNode=1,
                            minInfoGain=0.0)

### Génération du modèle
model_dt = dt.fit(trainDF)

time_end=time.time()
time_dt=(time_end - time_start)
print("DT takes %d s" %(time_dt))

Erreur sur l'échantillon test


In [ ]:
predictionsRDD = model_dt.transform(testDF)
labelsAndPredictions = predictionsRDD.select("label","prediction").collect()
nb_good_prediction = sum([r[0]==r[1] for r in labelsAndPredictions])
nb_test = testDF.count()
testErr = 1-nb_good_prediction/nb_test
print('Test Error = ' + str(testErr))

Random Forest

Les $k$-nn ne sont pas "scalables" et donc pas présents. Voici la syntaxe et les paramètres associés à l'algorithme des forêts aléatoires. Parmi ceux "classiques" se trouvent numTrees, featureSubsetStrategy, impurity, maxdepth et en plus maxbins comme pour les arbres. Les valeurs du paramètres maxDepth est critique pour la qualité de la prévision. en principe, il n'est pas contraint, un arbre peut se déployer sans "limite" mais face à des données massives cela peut provoquer des plantages intempestifs.


In [ ]:
### Random Forest
from pyspark.ml.classification import RandomForestClassifier

### Configuraiton des paramètres de la méthode
time_start=time.time()
rf = RandomForestClassifier(numTrees = 2, impurity='gini', maxDepth=12,
                            maxBins=32, seed=None)

### Génération du modèle
model_rf = rf.fit(trainDF)

time_end=time.time()
time_rf=(time_end - time_start)
print("RF takes %d s" %(time_rf))#

Erreur sur l'échantillon test

Erreur sur l'échantillon test


In [ ]:
predictionsRDD = model_rf.transform(testDF)
labelsAndPredictions = predictionsRDD.select("label","prediction").collect()
nb_good_prediction = sum([r[0]==r[1] for r in labelsAndPredictions])
nb_test = testDF.count()
testErr = 1-nb_good_prediction/nb_test
print('Test Error = ' + str(testErr))

Même traitement sur la totalité de l'échantillon

3 Quelques résultats

100 arbres, sélection automatique, maxDepth=9

maxBins Temps Erreur
32 259 0.067
64 264 0.068
128 490 0.065

100 arbres, sélection automatique, maxBins=32

maxDepth Temps Erreur
4 55 0.21
9 259 0.067
18 983 0.035

Le nombre de variables tirées à chaque noeud n'a pas été optimisé.

Le paramètre maxBins ne semble pas trop influencer la précision du modèle, au contriare de la profondeur maximum des arbres. Avec une profondeur suffisante, on retrouve (presque) les résultats classiques des forêts aléatoires sur ces données.

COmparer les résultats obtenus pour les trois environnements.


In [ ]: