Traitement Naturel du Langage (NLP) : Catégorisation de Produits Cdiscount

Il s'agit d'une version simplifiée du concours proposé par Cdiscount et paru sur le site datascience.net. Les données d'apprentissage sont accessibles sur demande auprès de Cdiscount mais les solutions de l'échantillon test du concours ne sont pas et ne seront pas rendues publiques. Un échantillon test est donc construit pour l'usage de ce tutoriel. L'objectif est de prévoir la catégorie d'un produit à partir de son descriptif (text mining). Seule la catégorie principale (1er niveau, 47 classes) est prédite au lieu des trois niveaux demandés dans le concours. L'objectif est plutôt de comparer les performances des méthodes et technologies en fonction de la taille de la base d'apprentissage ainsi que d'illustrer sur un exemple complexe le prétraitement de données textuelles.

Le jeux de données complet (15M produits) permet un test en vrai grandeur du passage à l'échelle volume des phases de préparation (munging), vectorisation (hashage, TF-IDF) et d'apprentissage en fonction de la technologie utilisée.

La synthèse des résultats obtenus est développée par Besse et al. 2016 (section 5).

Partie 2-2 Catégorisation des Produits Cdiscount avec SparkML de

Le principal objectif est de comparer les performances: temps de calcul, qualité des résultats, des principales technologies; ici PySpark avec la librairie SparkML. Il s'agit d'un problème de fouille de texte qui enchaîne nécessairement plusieurs étapes et le choix de la meilleure stratégie est fonction de l'étape:

  • Spark pour la préparation des données: nettoyage, racinisaiton
  • Python Scikit-learn pour la transformaiton suivante (TF-IDF) et l'apprentissage notamment avec la régresison logistique qui conduit aux meilleurs résultats.

L'objectif est ici de comparer les performances des méthodes et technologies en fonction de la taille de la base d'apprentissage. La stratégie de sous ou sur échantillonnage des catégories qui permet d'améliorer la prévision n'a pas été mise en oeuvre.

  • L'exemple est présenté avec la possibilité de sous-échantillonner afin de réduire les temps de calcul.
  • L'échantillon réduit peut encore l'être puis, après "nettoyage", séparé en 2 parties: apprentissage et test.
  • Les données textuelles de l'échantillon d'apprentissage sont, "racinisées", "hashées", "vectorisées" avant modélisation.
  • Les mêmes transformations, notamment (hashage et TF-IDF) évaluées sur l'échantillon d'apprentissage sont appliquées à l'échantillon test.
  • Un seul modèle est estimé par régression logistique "multimodal", plus précisément et implicitement, un modèle par classe.
  • Différents paramètres: de vectorisation (hashage, TF-IDF), paramètres de la régression logistique (pénalisation L1) pourraient encore être optimisés.

Librairies


In [ ]:
sc.version

In [ ]:
# Importation des packages génériques et ceux 
# des librairie ML et MLlib
##Nettoyage
import nltk
import re
##Liste
from numpy import array
import pandas as pd
##Temps
import time
##Row and Vector
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
##Hashage et vectorisation
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF
##Regression logistique
from pyspark.ml.classification import LogisticRegression
##Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier
##Random Forest
from pyspark.ml.classification import RandomForestClassifier 
##Pour la création des DataFrames
from pyspark.sql import SQLContext
from pyspark.sql.types import *

Nettoyage des données

Lecture des données

"dezippé" le fichier data/cdiscount_train.csv.zip avant d'éxecuter la cellule suivante.


In [ ]:
sqlContext = SQLContext(sc)
RowDF = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data/cdiscount_train.csv')
RowDF.take(2)

Extraction sous-échantillon


In [ ]:
# Taux de sous-échantillonnage des données pour tester le programme de préparation
# sur un petit jeu de données
taux_donnees=0.99
dataEchDF,data_drop = RowDF.randomSplit([taux_donnees,1-taux_donnees])

Les instructions précédentes ne sont pas exécutées (exécution paresseuse ou lazy). Elles ne le sont que lors d'un traitement explicite.


In [ ]:
ts=time.time()
size = dataEchDF.count()
te=time.time()
rt_count = te-ts
print("Comptage prend %d s, pour une taille de %d" %(rt_count, size)) #63s
taux Taille M Temps
0.01 0.157 51
0.1 1.57 49
0.4 6.29 53
0.8 12.6 60
1 15.7 64

Nettoyage

Afin de limiter la dimension de l'espace des variables ou features tout en conservant les informations essentielles, il est nécessaire de nettoyer les données en appliquant plusieurs étapes:

  • Tokenizer: Les textes de descrpitions sont transformés en liste de mots. Chaque mot est écrit en minuscule, les termes numériques, de ponctuation et autres symboles sont supprimés.
  • StopWord: 155 mots-courants, et donc non informatif, de la langue française sont supprimés (STOPWORDS). Ex: le, la, du, alors, etc...
  • Stemmer: Chaque mot est "racinisé". La racinisation transforme un mot en son radical ou sa racine. Par exemple, les mots: cheval, chevaux, chevalier, chevalerie, chevaucher sont tous remplacés par "cheva".

La librairie pyspark.ml ne supporte actuellement pas de fonction de stemming pour pyspark. Pour cette étape nous faisons donc appel à la librairie nltk de python et la fonction nltk.stem.SnowballStemmer.


In [ ]:
import nltk
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import udf,col
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover

# liste des mots à supprimer
STOPWORDS = set(nltk.corpus.stopwords.words('french'))
# Fonction tokenizer qui permet de remplacer un long texte par une liste de mot
regexTokenizer = RegexTokenizer(inputCol="Description", outputCol="tokenizedDescr", pattern="[^a-z_]",
                                minTokenLength=3, gaps=True)
dataTokenized = regexTokenizer.transform(dataEchDF)

# Fonction StopWordsRemover qui permet de supprimer des mots
remover = StopWordsRemover(inputCol="tokenizedDescr", outputCol="tokenizedRemovedDescr", stopWords = list(STOPWORDS))
dataTokenizedRemoved = remover.transform(dataTokenized)

# La fonction de stemming de spark existe aujourd'hui qu'en scala et ne possède pas de wrapper en python.
# Cette étape doit donc être effectué en utilisant des librairies python et notamment nltk
STEMMER = nltk.stem.SnowballStemmer('french')

def clean_text(tokens):
    tokens_stem = [ STEMMER.stem(token) for token in tokens]
    return tokens_stem
udfCleanText =  udf(lambda lt : clean_text(lt), ArrayType(StringType()))
dataClean = dataTokenizedRemoved.withColumn("cleanDescr", udfCleanText(col('tokenizedRemovedDescr')))

In [ ]:
dataClean.take(2)

In [ ]:
# Create a new rdd with the resulte of the clean_text function on the description
ts=time.time()
size = dataClean.count()
te=time.time()
rt = te-ts
print("Nettoyage prend %d s, pour la taille %d" %(rt, size)) #64s

Préparation des données

Construction des Labels

Convertir les 47 labels d'origine (Catégorie 1), qui ne sont pas dans un format acceptable (string) par les fonctions de sparkML, en des entiers numérotés de 0 a 46 nécessaires lors de l'application de la régression logistique. Cette étape peut être facilement réalisé a l'aide de la fonction StringIndexer.


In [ ]:
ts=time.time()
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Categorie1", outputCol="categoryIndex")
dataCleanindexed = indexer.fit(dataClean).transform(dataClean)
te=time.time()
rt = te-ts
print("Index car prend %d s, taille %d" %(rt, size)) #67

Création des DataFrame train and Test

Une fois extrait l'échantillon test, l'échantillon d'apprentissage peut être sous échantillonné.


In [ ]:
tauxEch=0.8
(trainTotDF, testDF) = dataCleanindexed.randomSplit([tauxEch, 1-tauxEch])
n_test = trainTotDF.count()
print(n_test)  # 314441

Sous-échantillonnage

Si toutes les données sont préparées, il est possible de sous-échantillonner l'échantillon d'apprentissage pour étudier l'impact de sa taille sur l'erreur de prévision.


In [ ]:
tauxApp=0.9
(trainDF, testDF)=trainTotDF.randomSplit([tauxApp, 1-tauxApp])
n_train=trainDF.count()
print(n_train)  # 1131577

4 Construction des features (TF-IDF)

4.1 Introduction

La vectorisation, c'est à dire la construction des features à partir de la liste des mots se fait en 2 étapes:

  • Le hashage permet de réduire l'espace des variables (taille du dictionnaire) en un nombre limité et fixé a priori n_hash de features. Il repose sur la définition d'une hash function, $h$ qui à un indice $j$ défini dans l'espace des entiers naturels, renvoie un indice $i=h(j)$ dans dans l'espace réduit (1 à n_hash) des features. Ainsi le poids de l'indice $i$, du nouvel espace, est l'association de tous les poids d'indice $j$ tels que $i=h(j)$ de l'espace originale. Ici, les poids sont associés d'après la méthode décrit par Weinberger et al. (2009).

N.B. $h$ n'est pas généré aléatoirement. Ainsi pour un même fichier d'apprentissage (ou de test) et pour un même entier n_hash, le résultat de la fonction de hashage est identique

  • Le TF-IDF. Le TF-IDF permet de faire ressortir l'importance relative de chaque mot $m$ (ou couples de mots consécutifs) dans un texte-produit ou un descriptif $d$, par rapport à la liste entière des produits. La fonction $TF(m,d)$ compte le nombre d'occurences du mot $m$ dans le descriptif $d$. La fonction $IDF(m)$ mesure l'importance du terme dans l'ensemble des documents ou descriptifs en donnant plus de poids aux termes les moins fréquents car considérés comme les plus discriminants (motivation analogue à celle de la métrique du chi2 en anamlyse des correspondance). $IDF(m,l)=\log\frac{D}{f(m)}$ où $D$ est le nombre de documents, la taille de l'échantillon d'apprentissage, et $f(m)$ le nombre de documents ou descriptifs contenant le mot $m$. La nouvelle variable ou features est $V_m(l)=TF(m,l)\times IDF(m,l)$.

  • Comme pour les transformations des variables quantitatives (centrage, réduction), la même transformation c'est-à-dire les mêmes pondérations, est calculée sur l'achantillon d'apprentissage et appliquée à celui de test.

4.2 Traitement

Dans le traitement qui suit, étape de hashage et calcul de $TF$ sont associés dans la même fonction.

Les transformations hashage et tf-idf calculées sur l'apprentissage sont appliquées à l'échantillon test.


In [ ]:
trainDF.take(2)

In [ ]:
ts=time.time()
# Term Frequency
hashing_tf = HashingTF(inputCol="cleanDescr", outputCol='tf', numFeatures=10000)
trainTfDF = hashing_tf.transform(trainDF)
# Inverse Document Frequency
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="tfidf")
idf_model = idf.fit(trainTfDF) 
trainTfIdfDF = idf_model.transform(trainTfDF)
# application à l'échantillon tesy
testTfDF = hashing_tf.transform(testDF)
testTfIdfDF = idf_model.transform(testTfDF)
te=time.time()
rt = te-ts
print("Hashage et vectorisation prennent %d s, taille %d" %(rt, n_train))  #456 s, taille 1131577

4 Modélisation et test

Le calepin python a clairement montré la meilleure performance de la régression logistique dans ce problème. C'est d'ailleurs ce modèle ou une pyramide de logistiques qui est industriellement implémentée.

4.1 Estimation du modèle logistique

Le paramètre de pénalisation (lasso) est pris par défaut sans optimisation.


In [ ]:
### Configuraiton des paramètres de la méthode
time_start=time.time()
lr = LogisticRegression(maxIter=100, regParam=0.01, fitIntercept=False, tol=0.0001,
            family = "multinomial", elasticNetParam=0.0, featuresCol="tfidf", labelCol="categoryIndex") #0 for L2 penalty, 1 for L1 penalty

### Génération du modèle
model_lr = lr.fit(trainTfIdfDF)
 
time_end=time.time()
time_lrm=(time_end - time_start)
print("LR prend %d s" %(time_lrm)) # (104s avec taux=1)

4.2 Estimation de l'erreur sur l'échantillon test


In [ ]:
predictionsDF = model_lr.transform(testTfIdfDF)
labelsAndPredictions = predictionsDF.select("categoryIndex","prediction").collect()
nb_good_prediction = sum([r[0]==r[1] for r in labelsAndPredictions])
testErr = 1-nb_good_prediction/n_test
print('Test Error = ' + str(testErr)) # (0.08 avec taux =1)
Taille M Temps Erreur
1.131 786 0.94

In [ ]: