Il s'agit d'une version simplifiée du concours proposé par Cdiscount et paru sur le site datascience.net. Les données d'apprentissage sont accessibles sur demande auprès de Cdiscount mais les solutions de l'échantillon test du concours ne sont pas et ne seront pas rendues publiques. Un échantillon test est donc construit pour l'usage de ce tutoriel. L'objectif est de prévoir la catégorie d'un produit à partir de son descriptif (text mining). Seule la catégorie principale (1er niveau, 47 classes) est prédite au lieu des trois niveaux demandés dans le concours. L'objectif est plutôt de comparer les performances des méthodes et technologies en fonction de la taille de la base d'apprentissage ainsi que d'illustrer sur un exemple complexe le prétraitement de données textuelles.
Le jeux de données complet (15M produits) permet un test en vrai grandeur du passage à l'échelle volume des phases de préparation (munging), vectorisation (hashage, TF-IDF) et d'apprentissage en fonction de la technologie utilisée.
La synthèse des résultats obtenus est développée par Besse et al. 2016 (section 5).
Le contenu de ce calepin est sensiblement identique au calepin précédent : Part2-2-AIF-PysparkWorkflow-Cdiscount.ipynb Dans ce dernier, le résultat de chaque étape était détaillé afin d'aider a la compréhension de celles-ci.
Dans ce calepin, nous utilisons la fonction Pipeline de la librairie spark-ML afin de créer un modèle qui inclut directement toutes les étapes, du nettoyage de texte jusqu'a l'apprentissage d'un modèle de regression logistique.
In [1]:
sc
Out[1]:
In [2]:
# Importation des packages génériques et ceux
# des librairie ML et MLlib
##Nettoyage
import nltk
import re
##Liste
from numpy import array
##Temps
import time
##Row and Vector
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
##Hashage et vectorisation
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF
##Regression logistique
from pyspark.ml.classification import LogisticRegression
##Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier
##Random Forest
from pyspark.ml.classification import RandomForestClassifier
##Pour la création des DataFrames
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.ml import Pipeline
In [3]:
sqlContext = SQLContext(sc)
RowDF = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('data/cdiscount_train.csv')
RowDF.take(2)
Out[3]:
In [4]:
# Taux de sous-échantillonnage des données pour tester le programme de préparation
# sur un petit jeu de données
taux_donnees=[0.80,0.19,0.01]
dataTrain, DataTest, data_drop = RowDF.randomSplit(taux_donnees)
n_train = dataTrain.count()
n_test= DataTest.count()
print("DataTrain : size = %d, DataTest : size = %d"%(n_train, n_test))
In [5]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType
class MyNltkStemmer(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, inputCol=None, outputCol=None):
super(MyNltkStemmer, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
STEMMER = nltk.stem.SnowballStemmer('french')
def clean_text(tokens):
tokens_stem = [ STEMMER.stem(token) for token in tokens]
return tokens_stem
udfCleanText = udf(lambda lt : clean_text(lt), ArrayType(StringType()))
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udfCleanText(in_col))
In [6]:
import nltk
from pyspark.sql.types import ArrayType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.ml.feature import StringIndexer
# liste des mots à supprimer
STOPWORDS = set(nltk.corpus.stopwords.words('french'))
# Fonction tokenizer qui permet de remplacer un long texte par une liste de mot
regexTokenizer = RegexTokenizer(inputCol="Description", outputCol="tokenizedDescr", pattern="[^a-z_]",
minTokenLength=3, gaps=True)
#V1
# Fonction StopWordsRemover qui permet de supprimer des mots
#remover = StopWordsRemover(inputCol="tokenizedDescr", outputCol="cleanDescr", stopWords = list(STOPWORDS))
#V2
# Fonction StopWordsRemover qui permet de supprimer des mots
remover = StopWordsRemover(inputCol="tokenizedDescr", outputCol="stopTokenizedDescr", stopWords = list(STOPWORDS))
# Stemmer
stemmer = MyNltkStemmer(inputCol="stopTokenizedDescr", outputCol="cleanDescr")
# Indexer
indexer = StringIndexer(inputCol="Categorie1", outputCol="categoryIndex")
# Hasing
hashing_tf = HashingTF(inputCol="cleanDescr", outputCol='tf', numFeatures=10000)
# Inverse Document Frequency
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="tfidf")
#Logistic Regression
lr = LogisticRegression(maxIter=100, regParam=0.01, fitIntercept=False, tol=0.0001,
family = "multinomial", elasticNetParam=0.0, featuresCol="tfidf", labelCol="categoryIndex") #0 for L2 penalty, 1 for L1 penalty
# Creation du pipeline
pipeline = Pipeline(stages=[regexTokenizer, remover, stemmer, indexer, hashing_tf, idf, lr ])
Le paramètre de pénalisation (lasso) est pris par défaut sans optimisation.
In [ ]:
time_start = time.time()
# On applique toutes les étapes sur la DataFrame d'apprentissage.
model = pipeline.fit(dataTrain)
time_end=time.time()
time_lrm=(time_end - time_start)
print("LR prend %d s pour un echantillon d'apprentissage de taille : n = %d" %(time_lrm, n_train)) # (104s avec taux=1)
In [ ]:
predictionsDF = model.transform(DataTest)
labelsAndPredictions = predictionsDF.select("categoryIndex","prediction").collect()
nb_good_prediction = sum([r[0]==r[1] for r in labelsAndPredictions])
testErr = 1-nb_good_prediction/n_test
print('Test Error = , pour un echantillon test de taille n = %d' + str(testErr)) # (0.08 avec taux =1)
Taille M | Temps | Erreur |
---|---|---|
1.131 | 786 | 0.94 |