Traitement Naturel du Langage (NLP) : Catégorisation de Produits Cdiscount

Il s'agit d'une version simplifiée du concours proposé par Cdiscount et paru sur le site datascience.net. Les données d'apprentissage sont accessibles sur demande auprès de Cdiscount mais les solutions de l'échantillon test du concours ne sont pas et ne seront pas rendues publiques. Un échantillon test est donc construit pour l'usage de ce tutoriel. L'objectif est de prévoir la catégorie d'un produit à partir de son descriptif (text mining). Seule la catégorie principale (1er niveau, 47 classes) est prédite au lieu des trois niveaux demandés dans le concours. L'objectif est plutôt de comparer les performances des méthodes et technologies en fonction de la taille de la base d'apprentissage ainsi que d'illustrer sur un exemple complexe le prétraitement de données textuelles.

Le jeux de données complet (15M produits) permet un test en vrai grandeur du passage à l'échelle volume des phases de préparation (munging), vectorisation (hashage, TF-IDF) et d'apprentissage en fonction de la technologie utilisée.

La synthèse des résultats obtenus est développée par Besse et al. 2016 (section 5).

Partie 2-2 Catégorisation des Produits Cdiscount avec SparkML de et utilisation de Pipeline.

Le contenu de ce calepin est sensiblement identique au calepin précédent : Part2-2-AIF-PysparkWorkflow-Cdiscount.ipynb Dans ce dernier, le résultat de chaque étape était détaillé afin d'aider a la compréhension de celles-ci.

Dans ce calepin, nous utilisons la fonction Pipeline de la librairie spark-ML afin de créer un modèle qui inclut directement toutes les étapes, du nettoyage de texte jusqu'a l'apprentissage d'un modèle de regression logistique.


In [1]:
sc


Out[1]:

SparkContext

Spark UI

Version
v2.3.1
Master
local[*]
AppName
pyspark-shell

In [2]:
# Importation des packages génériques et ceux 
# des librairie ML et MLlib
##Nettoyage
import nltk
import re
##Liste
from numpy import array
##Temps
import time
##Row and Vector
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
##Hashage et vectorisation
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF
##Regression logistique
from pyspark.ml.classification import LogisticRegression
##Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier
##Random Forest
from pyspark.ml.classification import RandomForestClassifier 
##Pour la création des DataFrames
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.ml import Pipeline

Lecture des données


In [3]:
sqlContext = SQLContext(sc)
RowDF = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data/cdiscount_train.csv')
RowDF.take(2)


Out[3]:
[Row(Categorie1='INFORMATIQUE', Categorie2='CONNECTIQUE - ALIMENTATION', Categorie3='BATTERIE', Description='Batterie Acer Aspire One 751H-52Yr - Li-Ion 11.1V 5200mAh, 54Wh Noir, compatible batterie… Voir la présentation', Libelle='Batterie Acer Aspire One 751H-52Yr', Marque='AUCUNE'),
 Row(Categorie1='TELEPHONIE - GPS', Categorie2='ACCESSOIRE TELEPHONE', Categorie3='COQUE - BUMPER - FACADE TELEPHONE', Description='Coque rigide Bleu lagon pour ALCATEL OT / 6033 motif Drapeau Liberia + 3 Films - Coque rigide Ultra Fine Bleu lagon ORIGINALE de MUZZANO au motif Drapeau Liberia pour ALCATEL … Voir la présentation', Libelle='Coque rigide Bleu lagon pour ALCATEL OT / 6033 …', Marque='MUZZANO')]

Extraction sous-échantillon


In [4]:
# Taux de sous-échantillonnage des données pour tester le programme de préparation
# sur un petit jeu de données
taux_donnees=[0.80,0.19,0.01]
dataTrain, DataTest, data_drop = RowDF.randomSplit(taux_donnees)
n_train = dataTrain.count()
n_test= DataTest.count()
print("DataTrain : size = %d, DataTest : size = %d"%(n_train, n_test))


DataTrain : size = 799962, DataTest : size = 190050

Création du pipeline

Création d'un Transformer pour l'étape de stemming.

Dans le calepin précédent, nous avons définie une fonction stemmer à partir de la librairie nltk. Pour que celle-ci puisse être utilisé dans un Pipeline ML, nous devons en faire un objet transformers.


In [5]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType

class MyNltkStemmer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(MyNltkStemmer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        STEMMER = nltk.stem.SnowballStemmer('french')
        def clean_text(tokens):
            tokens_stem = [ STEMMER.stem(token) for token in tokens]
            return tokens_stem
        udfCleanText =  udf(lambda lt : clean_text(lt), ArrayType(StringType()))
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udfCleanText(in_col))

Définition des différentes étapes


In [6]:
import nltk
from pyspark.sql.types import ArrayType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.ml.feature import StringIndexer

# liste des mots à supprimer
STOPWORDS = set(nltk.corpus.stopwords.words('french'))
# Fonction tokenizer qui permet de remplacer un long texte par une liste de mot
regexTokenizer = RegexTokenizer(inputCol="Description", outputCol="tokenizedDescr", pattern="[^a-z_]",
                                minTokenLength=3, gaps=True)

#V1
# Fonction StopWordsRemover qui permet de supprimer des mots
#remover = StopWordsRemover(inputCol="tokenizedDescr", outputCol="cleanDescr", stopWords = list(STOPWORDS))

#V2
# Fonction StopWordsRemover qui permet de supprimer des mots
remover = StopWordsRemover(inputCol="tokenizedDescr", outputCol="stopTokenizedDescr", stopWords = list(STOPWORDS))
# Stemmer 
stemmer = MyNltkStemmer(inputCol="stopTokenizedDescr", outputCol="cleanDescr")

# Indexer
indexer = StringIndexer(inputCol="Categorie1", outputCol="categoryIndex")

# Hasing
hashing_tf = HashingTF(inputCol="cleanDescr", outputCol='tf', numFeatures=10000)

# Inverse Document Frequency
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="tfidf")

#Logistic Regression
lr = LogisticRegression(maxIter=100, regParam=0.01, fitIntercept=False, tol=0.0001,
            family = "multinomial", elasticNetParam=0.0, featuresCol="tfidf", labelCol="categoryIndex") #0 for L2 penalty, 1 for L1 penalty

# Creation du pipeline
pipeline = Pipeline(stages=[regexTokenizer, remover, stemmer, indexer, hashing_tf, idf, lr ])

Estimation du pipeline

Le paramètre de pénalisation (lasso) est pris par défaut sans optimisation.


In [ ]:
time_start = time.time()
# On applique toutes les étapes sur la DataFrame d'apprentissage.
model = pipeline.fit(dataTrain)
time_end=time.time()
time_lrm=(time_end - time_start)
print("LR prend %d s pour un echantillon d'apprentissage de taille : n = %d" %(time_lrm, n_train)) # (104s avec taux=1)

Estimation de l'erreur sur l'échantillon test


In [ ]:
predictionsDF = model.transform(DataTest)
labelsAndPredictions = predictionsDF.select("categoryIndex","prediction").collect()
nb_good_prediction = sum([r[0]==r[1] for r in labelsAndPredictions])
testErr = 1-nb_good_prediction/n_test
print('Test Error = , pour un echantillon test de taille n = %d' + str(testErr)) # (0.08 avec taux =1)
Taille M Temps Erreur
1.131 786 0.94