Ce calepin traite d'un problème classique de recommandation par filtrage collaboratif en utilisant les ressources de la librairie MLlib de Spark avec l'API pyspark. Le problème général est décrit en introduction et dans une vignette de Wikistat. Il est appliqué aux données publiques du site GroupLens. L'objectif est de tester les méthodes et la procédure d'optimisation sur le plus petit jeu de données composé de 100k notes de 943 clients sur 1682 films où chaque client a au moins noté 20 films. Les jeux de données plus gros (1M, 10M, 20M notes) peuvent être utilisés pour "passer à l'échelle volume".
Ce calepin s'inspire des exemples de la documentation et d'un tutoriel de Jose A. Dianes. Le sujet a été traité lors d'un Spark Summit.
L'objectif est d'utiliser ces seules données pour proposer des recommandations. Les données initiales sont sous la forme d'une matrice très creuse (sparse) contenant des notes ou évaluations. Attention, les "0" de la matrice ne sont pas des notes mais des données manquantes, le film n'a pas encore été vu ou évalué.
Un algorithme satisfaisant à l'objectif de complétion de grande matrice creuse, et implémenté dans un logiciel libre d'accès est disponible dans la librairie softImpute de R. SOn utilisaiton est décrite dans un autre calepin. La version de NMF de MLlib de Spark autorise permet également la complétion.
En revanche,la version de NMF incluse dans la librairie Scikit-learn traite également des matrices creuses mais le critère (moindres carrés) optimisé considère les "0" comme des notes nulles, pas comme des données manquantes. Elle n'est pas adaptée au problème de complétion, contrairement à celle de MLliB. Il faudrait sans doute utiliser la librairie nonnegfac en Python de Kim et al. (2014); à tester!
Dans la première partie, le plus petit fichier est partagé en trois échantillons: apprentissage, validation et test; l'optimisation du rang de la factorisation (nombre de facteurs latents) est réalisée par minimisation de l'erreur estimée sur l'échantillon de validation.
Ensuite le plus gros fichier est utilisé pour évaluer l'impact de la taille de la base d'apprentissage.
Les données doivent être stockées à un emplacement accessibles de tous les noeuds du cluster pour permettre la construction de la base de données réparties (RDD). Dans une utilisation monoposte (standalone) de Spark, elles sont simplement chargées dans le répertoire courant.
In [ ]:
sc
In [ ]:
# Chargement des fichiers si ce n'est déjà fait
#Renseignez ici le dossier où vous souhaitez stocker le fichier téléchargé.
DATA_PATH=""
import urllib.request
# fichier réduit
f = urllib.request.urlretrieve("http://www.math.univ-toulouse.fr/~besse/Wikistat/data/ml-ratings100k.csv",DATA_PATH+"ml-ratings100k.csv")
Les données sont lues comme une seule ligne de texte avant d'être restructurées au bon format d'une matrice creuse à savoir une liste de triplets contenant les indices de ligne, de colonne et la note pour les seules valeurs renseignées.
In [ ]:
# Importer les données au format texte dans un RDD
small_ratings_raw_data = sc.textFile(DATA_PATH+"ml-ratings100k.csv")
# Identifier et afficher la première ligne
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
print(small_ratings_raw_data_header)
# Create RDD without header
all_lines = small_ratings_raw_data.filter(lambda l : l!=small_ratings_raw_data_header)
In [ ]:
# Séparer les champs (user, item, note) dans un nouveau RDD
from pyspark.sql import Row
split_lines = all_lines.map(lambda l : l.split(","))
ratingsRDD = split_lines.map(lambda p: Row(user=int(p[0]), item=int(p[1]),
rating=float(p[2]), timestamp=int(p[3])))
# .cache() : le RDD est conservé en mémoire une fois traité
ratingsRDD.cache()
# Display the two first rows
ratingsRDD.take(2)
In [ ]:
# Convert RDD to DataFrame
ratingsDF = spark.createDataFrame(ratingsRDD)
ratingsDF.take(2)
Séparation aléatoire en trois échantillons apprentissage, validation et test. Le paramètre de rang est optimisé en minimisant l'estimaiton de l'erreur sur l'échantillon test. Cette stratégie, plutôt qu'ue validation croisée est plus adaptée à des données massives.
In [ ]:
tauxTrain=0.6
tauxVal=0.2
tauxTes=0.2
# Si le total est inférieur à 1, les données sont sous-échantillonnées.
(trainDF, validDF, testDF) = ratingsDF.randomSplit([tauxTrain, tauxVal, tauxTes])
# validation et test à prédire, sans les notes
validDF_P = validDF.select("user", "item")
testDF_P = testDF.select("user", "item")
In [ ]:
trainDF.take(2), validDF_P.take(2), testDF_P.take(2)
L'erreur d'imputation des données, donc de recommandation, est estimée sur l'échantillon de validation pour différentes valeurs (grille) du rang de la factorisation matricielle.
Il faudrait en principe aussi optimiser la valeur du paramètre de pénalisation pris à 0.1 par défaut.
Point important: l'erreur d'ajustement de la factorisation ne prend en compte que les valeurs listées dans la matrice creuses, pas les "0" qui sont des données manquantes.
In [ ]:
from pyspark.ml.recommendation import ALS
import math
import collections
# Initialisation du générateur
seed = 5
# Nombre max d'itérations (ALS)
maxIter = 10
# Régularisation L1; à optimiser également
regularization_parameter = 0.1
# Choix d'une grille pour les valeurs du rang à optimiser
ranks = [4, 8, 12]
#Initialisation variable
# création d'un dictionaire pour stocker l'erreur par rang testé
errors = collections.defaultdict(float)
tolerance = 0.02
min_error = float('inf')
best_rank = -1
best_iteration = -1
In [ ]:
from pyspark.ml.evaluation import RegressionEvaluator
for rank in ranks:
als = ALS( rank=rank, seed=seed, maxIter=maxIter,
regParam=regularization_parameter)
model = als.fit(trainDF)
# Prévision de l'échantillon de validation
predDF = model.transform(validDF).select("prediction","rating")
#Remove unpredicter row due to no-presence of user in the train dataset
pred_without_naDF = predDF.na.drop()
# Calcul du RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(pred_without_naDF)
print("Root-mean-square error for rank %d = "%rank + str(rmse))
errors[rank] = rmse
if rmse < min_error:
min_error = rmse
best_rank = rank
# Meilleure solution
print('Rang optimal: %s' % best_rank)
In [ ]:
# Quelques prévisions
pred_without_naDF.take(3)
Prévision finale de l'échantillon test.
In [ ]:
#On concatane la DataFrame Train et Validatin
trainValidDF = trainDF.union(validDF)
# On crée un model avec le nouveau Dataframe complété d'apprentissage et le rank fixé à la valeur optimal
als = ALS( rank=best_rank, seed=seed, maxIter=maxIter,
regParam=regularization_parameter)
model = als.fit(trainValidDF)
#Prediction sur la DataFrame Test
testDF = model.transform(testDF).select("prediction","rating")
#Remove unpredicter row due to no-presence of user in the trai dataset
pred_without_naDF = predDF.na.drop()
# Calcul du RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(pred_without_naDF)
print("Root-mean-square error for rank %d = "%best_rank + str(rmse))
MovieLens propose un plus gros fichier avec 20M de notes (138000 utilisateurs, 27000 films). Ce fichier est utilisé pour extraire un fichier test de deux millions de notes à reconstruire. Les paramètres précédemment optimisés, ils pourraient sans doute l'être mieux, sont appliqués pour une succesion d'estimation / prévision avec une taille croissante de l'échantillon d'apprentissage. Il aurait été plus élégant d'automatiser le travail dans une boucle mais lorsque les données sont les plus volumineuses des comportement mal contrôlés de Spark peuvent provoquer des plantages par défaut de mémoire.
Le fichier est prétraité de manière analogue.
In [ ]:
# Chargement des fichiers si ce n'est déjà fait
import urllib.request
# fichier complet mais compressé
f = urllib.request.urlretrieve("http://www.math.univ-toulouse.fr/~besse/Wikistat/data/ml-ratings20M.zip",DATA_PATH+"ml-ratings20M.zip")
In [ ]:
#Unzip downloaded file
import zipfile
zip_ref = zipfile.ZipFile(DATA_PATH+"ml-ratings20M.zip", 'r')
zip_ref.extractall(DATA_PATH)
zip_ref.close()
In [ ]:
# Importer les données au format texte dans un RDD
ratings_raw_data = sc.textFile(DATA_PATH+"ratings20M.csv")
# Identifier et afficher la première ligne
ratings_raw_data_header = ratings_raw_data.take(1)[0]
ratings_raw_data_header
# Create RDD without header
all_lines = ratings_raw_data.filter(lambda l : l!=ratings_raw_data_header)
In [ ]:
# Séparer les champs (user, item, note) dans un nouveau RDD
split_lines = all_lines.map(lambda l : l.split(","))
ratingsRDD = split_lines.map(lambda p: Row(user=int(p[0]), item=int(p[1]),
rating=float(p[2]), timestamp=int(p[3])))
# Display the two first rows
ratingsRDD.take(2)
In [ ]:
# Convert RDD to DataFrame
ratingsDF = spark.createDataFrame(ratingsRDD)
ratingsDF.take(2)
Extraction de l'échantillon test et éventuellement sous-échantillonnage de l'échantillon d'apprentissage.
In [ ]:
tauxTest=0.1
# Si le total est inférieur à 1, les données sont sous-échantillonnées.
(trainTotDF, testDF) = ratingsDF.randomSplit([1-tauxTest, tauxTest])
In [ ]:
# Sous-échantillonnage de l'apprentissage permettant de
# tester pour des tailles croissantes de cet échantillon
tauxEch=0.2
(trainDF, DropData) = trainTotDF.randomSplit([tauxEch, 1-tauxEch])
In [ ]:
testDF.take(2), trainDF.take(2)
Le modèle est estimé en utilisant les valeurs des paramètres obtenues dans l'étape précédente.
In [ ]:
import time
time_start=time.time()
# Initialisation du générateur
seed = 5
# Nombre max d'itérations (ALS)
maxIter = 10
# Régularisation L1 (valeur par défaut)
regularization_parameter = 0.1
best_rank = 8
# Estimation pour chaque valeur de rang
als = ALS(rank=rank, seed=seed, maxIter=maxIter,
regParam=regularization_parameter)
model = als.fit(trainDF)
time_end=time.time()
time_als=(time_end - time_start)
print("ALS prend %d s" %(time_als))
In [ ]:
# Prévision de l'échantillon de validation
predDF = model.transform(testDF).select("prediction","rating")
#Remove unpredicter row due to no-presence of user in the train dataset
pred_without_naDF = predDF.na.drop()
# Calcul du RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(pred_without_naDF)
print("Root-mean-square error for rank %d = "%best_rank + str(rmse))
In [ ]:
trainDF.count()
Quelques résultats montrant l'évolution du temps de calcul et de l'erreur de prévision en fonction de la taille de l'échantillon d'apprentissage. Attention, il est probable que la valeur des paramètres optimaux dépendent de la taille de l'échantillon d'apprentissage.
Taille | Temps(s) | RMSE |
---|---|---|
217439 | 70 | 1.65 |
1029416 | 73 | 1.06 |
2059855 | 72 | 1.05 |
4119486 | 89 | 0.88 |
6176085 | 99 | 0.85 |
10301909 | 117 | 0.83 |
12361034 | 125 | 0.83 |
14414907 | 137 | 0.82 |
16474087 | 148 | 0.818 |
18538142 | 190 | 0.816 |
20596263 | 166 | 0.82 |