Il s'agit d'une version simplifiée du concours proposé par Cdiscount et paru sur le site datascience.net. Les données d'apprentissage sont accessibles sur demande auprès de Cdiscount mais les solutions de l'échantillon test du concours ne sont pas et ne seront pas rendues publiques. Un échantillon test est donc construit pour l'usage de ce tutoriel. L'objectif est de prévoir la catégorie d'un produit à partir de son descriptif (text mining). Seule la catégorie principale (1er niveau, 47 classes) est prédite au lieu des trois niveaux demandés dans le concours. L'objectif est plutôt de comparer les performances des méthodes et technologies en fonction de la taille de la base d'apprentissage ainsi que d'illustrer sur un exemple complexe le prétraitement de données textuelles.
Le jeux de données complet (15M produits) permet un test en vrai grandeur du passage à l'échelle volume des phases de préparation (munging), vectorisation (hashage, TF-IDF) et d'apprentissage en fonction de la technologie utilisée.
La synthèse des résultats obtenus est développée par Besse et al. 2016 (section 5).
Le principal objectif est de comparer les performances: temps de calcul, qualité des résultats, des principales technologies; ici PySpark avec la librairie SparkML. Il s'agit d'un problème de fouille de texte qui enchaîne nécessairement plusieurs étapes et le choix de la meilleure stratégie est fonction de l'étape:
L'objectif est ici de comparer les performances des méthodes et technologies en fonction de la taille de la base d'apprentissage. La stratégie de sous ou sur échantillonnage des catégories qui permet d'améliorer la prévision n'a pas été mise en oeuvre.
In [ ]:
sc.version
In [ ]:
# Importation des packages génériques et ceux
# des librairie ML et MLlib
##Nettoyage
import nltk
import re
##Liste
from numpy import array
import pandas as pd
##Temps
import time
##Row and Vector
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
##Hashage et vectorisation
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF
##Regression logistique
from pyspark.ml.classification import LogisticRegression
##Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier
##Random Forest
from pyspark.ml.classification import RandomForestClassifier
##Pour la création des DataFrames
from pyspark.sql import SQLContext
from pyspark.sql.types import *
In [ ]:
sqlContext = SQLContext(sc)
RowDF = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data/cdiscount_train.csv')
RowDF.take(2)
In [ ]:
# Taux de sous-échantillonnage des données pour tester le programme de préparation
# sur un petit jeu de données
taux_donnees=0.99
dataEchDF,data_drop = RowDF.randomSplit([taux_donnees,1-taux_donnees])
Les instructions précédentes ne sont pas exécutées (exécution paresseuse ou lazy). Elles ne le sont que lors d'un traitement explicite.
In [ ]:
ts=time.time()
size = dataEchDF.count()
te=time.time()
rt_count = te-ts
print("Comptage prend %d s, pour une taille de %d" %(rt_count, size)) #63s
taux | Taille M | Temps |
---|---|---|
0.01 | 0.157 | 51 |
0.1 | 1.57 | 49 |
0.4 | 6.29 | 53 |
0.8 | 12.6 | 60 |
1 | 15.7 | 64 |
Afin de limiter la dimension de l'espace des variables ou features tout en conservant les informations essentielles, il est nécessaire de nettoyer les données en appliquant plusieurs étapes:
La librairie pyspark.ml ne supporte actuellement pas de fonction de stemming pour pyspark. Pour cette étape nous faisons donc appel à la librairie nltk de python et la fonction nltk.stem.SnowballStemmer.
In [ ]:
import nltk
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import udf,col
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
# liste des mots à supprimer
STOPWORDS = set(nltk.corpus.stopwords.words('french'))
# Fonction tokenizer qui permet de remplacer un long texte par une liste de mot
regexTokenizer = RegexTokenizer(inputCol="Description", outputCol="tokenizedDescr", pattern="[^a-z_]",
minTokenLength=3, gaps=True)
dataTokenized = regexTokenizer.transform(dataEchDF)
# Fonction StopWordsRemover qui permet de supprimer des mots
remover = StopWordsRemover(inputCol="tokenizedDescr", outputCol="tokenizedRemovedDescr", stopWords = list(STOPWORDS))
dataTokenizedRemoved = remover.transform(dataTokenized)
# La fonction de stemming de spark existe aujourd'hui qu'en scala et ne possède pas de wrapper en python.
# Cette étape doit donc être effectué en utilisant des librairies python et notamment nltk
STEMMER = nltk.stem.SnowballStemmer('french')
def clean_text(tokens):
tokens_stem = [ STEMMER.stem(token) for token in tokens]
return tokens_stem
udfCleanText = udf(lambda lt : clean_text(lt), ArrayType(StringType()))
dataClean = dataTokenizedRemoved.withColumn("cleanDescr", udfCleanText(col('tokenizedRemovedDescr')))
In [ ]:
dataClean.take(2)
In [ ]:
# Create a new rdd with the resulte of the clean_text function on the description
ts=time.time()
size = dataClean.count()
te=time.time()
rt = te-ts
print("Nettoyage prend %d s, pour la taille %d" %(rt, size)) #64s
Convertir les 47 labels d'origine (Catégorie 1), qui ne sont pas dans un format acceptable (string) par les fonctions de sparkML, en des entiers numérotés de 0 a 46 nécessaires lors de l'application de la régression logistique. Cette étape peut être facilement réalisé a l'aide de la fonction StringIndexer.
In [ ]:
ts=time.time()
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Categorie1", outputCol="categoryIndex")
dataCleanindexed = indexer.fit(dataClean).transform(dataClean)
te=time.time()
rt = te-ts
print("Index car prend %d s, taille %d" %(rt, size)) #67
Une fois extrait l'échantillon test, l'échantillon d'apprentissage peut être sous échantillonné.
In [ ]:
tauxEch=0.8
(trainTotDF, testDF) = dataCleanindexed.randomSplit([tauxEch, 1-tauxEch])
n_test = trainTotDF.count()
print(n_test) # 314441
Si toutes les données sont préparées, il est possible de sous-échantillonner l'échantillon d'apprentissage pour étudier l'impact de sa taille sur l'erreur de prévision.
In [ ]:
tauxApp=0.9
(trainDF, testDF)=trainTotDF.randomSplit([tauxApp, 1-tauxApp])
n_train=trainDF.count()
print(n_train) # 1131577
La vectorisation, c'est à dire la construction des features à partir de la liste des mots se fait en 2 étapes:
N.B. $h$ n'est pas généré aléatoirement. Ainsi pour un même fichier d'apprentissage (ou de test) et pour un même entier n_hash, le résultat de la fonction de hashage est identique
Le TF-IDF. Le TF-IDF permet de faire ressortir l'importance relative de chaque mot $m$ (ou couples de mots consécutifs) dans un texte-produit ou un descriptif $d$, par rapport à la liste entière des produits. La fonction $TF(m,d)$ compte le nombre d'occurences du mot $m$ dans le descriptif $d$. La fonction $IDF(m)$ mesure l'importance du terme dans l'ensemble des documents ou descriptifs en donnant plus de poids aux termes les moins fréquents car considérés comme les plus discriminants (motivation analogue à celle de la métrique du chi2 en anamlyse des correspondance). $IDF(m,l)=\log\frac{D}{f(m)}$ où $D$ est le nombre de documents, la taille de l'échantillon d'apprentissage, et $f(m)$ le nombre de documents ou descriptifs contenant le mot $m$. La nouvelle variable ou features est $V_m(l)=TF(m,l)\times IDF(m,l)$.
Comme pour les transformations des variables quantitatives (centrage, réduction), la même transformation c'est-à-dire les mêmes pondérations, est calculée sur l'achantillon d'apprentissage et appliquée à celui de test.
Dans le traitement qui suit, étape de hashage et calcul de $TF$ sont associés dans la même fonction.
Les transformations hashage et tf-idf calculées sur l'apprentissage sont appliquées à l'échantillon test.
In [ ]:
trainDF.take(2)
In [ ]:
ts=time.time()
# Term Frequency
hashing_tf = HashingTF(inputCol="cleanDescr", outputCol='tf', numFeatures=10000)
trainTfDF = hashing_tf.transform(trainDF)
# Inverse Document Frequency
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="tfidf")
idf_model = idf.fit(trainTfDF)
trainTfIdfDF = idf_model.transform(trainTfDF)
# application à l'échantillon tesy
testTfDF = hashing_tf.transform(testDF)
testTfIdfDF = idf_model.transform(testTfDF)
te=time.time()
rt = te-ts
print("Hashage et vectorisation prennent %d s, taille %d" %(rt, n_train)) #456 s, taille 1131577
Le calepin python a clairement montré la meilleure performance de la régression logistique dans ce problème. C'est d'ailleurs ce modèle ou une pyramide de logistiques qui est industriellement implémentée.
Le paramètre de pénalisation (lasso) est pris par défaut sans optimisation.
In [ ]:
### Configuraiton des paramètres de la méthode
time_start=time.time()
lr = LogisticRegression(maxIter=100, regParam=0.01, fitIntercept=False, tol=0.0001,
family = "multinomial", elasticNetParam=0.0, featuresCol="tfidf", labelCol="categoryIndex") #0 for L2 penalty, 1 for L1 penalty
### Génération du modèle
model_lr = lr.fit(trainTfIdfDF)
time_end=time.time()
time_lrm=(time_end - time_start)
print("LR prend %d s" %(time_lrm)) # (104s avec taux=1)
In [ ]:
predictionsDF = model_lr.transform(testTfIdfDF)
labelsAndPredictions = predictionsDF.select("categoryIndex","prediction").collect()
nb_good_prediction = sum([r[0]==r[1] for r in labelsAndPredictions])
testErr = 1-nb_good_prediction/n_test
print('Test Error = ' + str(testErr)) # (0.08 avec taux =1)
Taille M | Temps | Erreur |
---|---|---|
1.131 | 786 | 0.94 |
In [ ]: