Pour cette étude, nous allons implémenter l'algorithme DIMSUM décrit par Bosagh-Zadesh en 2012. Cet algorithme propose de trouver toutes les paires d'éléments semblables.
La littérature sur cet algorithme nous propose différentes versions du même algorithme, plus ou moins puissantes, plus pou moins flexibles. Il est intéressant de noter que Twitter l'a implémenté dans ses clusters et a rendu publique l'algorithme modifié qu'ils ont utilisé. C'est cet algorithme que nous allons implémenter [2].
Sans entrer dans les détails, le coeur de l'algorithme repose sur le calcul de la matrice $A^tA$ (A transposé x A), et la sélection des paires selon une probabilité donnée.
Pour l'implémentation, nous avons choisi de développer l'algorithme sous PIG, dans un cluster Azure. Le jeu de donnée de test sera une base composée des notes données par les utilisateurs à différents films. L'algorithme trouvera les paires de films équivalents.
Nous effectuerons notre étude sur un jeu de données composé de 1000208 notes de films basé sur 6040 utilisateurs et 3952 films différents.
Une fois les similarités calculées, nous effectuerons un test afin de récupérer les films similaires à un film qu'on passera en input.
[1] Bosagh-Zadeh, Reza and Goel, Ashish (2012), Dimension Independent Similarity Computation, arXiv:1206.2082 http://arxiv.org/abs/1206.2082
[2] https://blog.twitter.com/2014/all-pairs-similarity-via-dimsum
Mise en place des librairies et des modules de connexions
On code en dur afin d'éviter de retaper tout le temps les identifiants. Attention c'est une mauvaise pratique.
In [1]:
import pyensae
import os
blobstorage = "hdblobstorage" #blobhp["blob_storage"]
blobpassword = "jQIPVO/T54w8X49UPIbzAVvaNO3wmuUwI4/o9AJnCaPTHoCQnsaGBUkT4eIyi0BRQavgc/TAQMQwy8eu19CSBQ==" #blobhp["password1"]#
hadoop_server = "sparkclus2ensae"#blobhp["hadoop_server"]#
hadoop_password = "ENSAEspark1;"#blobhp["password2"]#
username = "imdd" #blobhp["username"]
On ouvre la connexion au cluster et au blob
In [22]:
#%blob_close
cl, bs = %hd_open
cl,bs
Out[22]:
On upload les fichiers qui contient tous les ratings des users.
In [118]:
%blob_up data/ratings_mean.csv hdblobstorage/imdd/ratings_mean.csv
Out[118]:
On vérifie que tous les fichiers sont présents dans le blob
In [141]:
#List files in blob storage
df=%blob_ls hdblobstorage/imdd/
df
Out[141]:
On code l'algorithme en PIG. La difficulté est que PIG gère très mal l'imbrication de FOREACH, absolument nécessaire à l'algorithme. Notre solution s'est portée sur la mise à plat totale des données. D'où le FLATTEN puis nous avons effectué un JOIN pour la deuxième boucle. Puis nous avons appliqué les règles définies par l'algorithme.
En sortie nous obtenons l'ensemble des paires semblables, avec leur mesure de similarité. On stocke dans un fichier pour l'exploiter ensuite avec un autre script PIG. Pour réduire le nombre de résultats, nous ne retiendrons que les paires de films qui ont une similarité de plus de 0.5 .
In [12]:
%%PIG_azure dimsum.pig
-- Macro de calcul des normes par colonne (movieID)
DEFINE computeMatrixNorms(cData,sqrt_gamma) RETURNS Matrix_Norms {
cData_grp = GROUP $cData BY MovieID;
-- On calcule la norme et le gamma sur la norme
$Matrix_Norms = FOREACH cData_grp {
tmp_out = FOREACH $cData GENERATE Rating*Rating;
out = SUM(tmp_out);
GENERATE group as MovieID, SQRT(out) as Norm, ($sqrt_gamma.$0/SQRT(out)>1?1:$sqrt_gamma.$0/SQRT(out)) as Prob_j;
}
}
cData = LOAD '$CONTAINER/imdd/ratings_mean.csv'
using PigStorage (',')
AS (UserID:int, MovieID:int, Rating:double) ;
-- On calcule le gamma
users = GROUP cData all ;
total= FOREACH users GENERATE MAX($1.UserID) as m, MAX($1.MovieID) as n;
sqrt_gamma = FOREACH total GENERATE SQRT(4*LOG(n)/0.7) as a;
-- On calcule la norme et le gamma sur la norme
Matrix_Norms = computeMatrixNorms(cData,sqrt_gamma);
-- On ajoute la colonne Norm et probabilite dans cData
C = JOIN cData BY MovieID,Matrix_Norms BY MovieID;
D = FOREACH C GENERATE cData::UserID as UserID_f,cData::MovieID as MovieID_f,cData::Rating as Rating_f,
Matrix_Norms::Norm as Norm_f,Matrix_Norms::Prob_j as Prob_j_f;
Matrix_data = GROUP D BY UserID_f;
FF = FOREACH Matrix_data GENERATE group as UID, FLATTEN(D.MovieID_f) as MV1;
-- Ajout des informations de MV1
FFF = JOIN FF BY (UID,MV1), D BY (UserID_f,MovieID_f);
-- Condition de validite premier IF
FFD = FILTER FFF BY RANDOM()<Prob_j_f;
-- Ajout de la seconde loop
GG = JOIN FFD BY UID, D BY UserID_f;
-- Cleaning du tableau
GGG = FOREACH GG GENERATE FFD::FF::UID as UserID,FFD::FF::MV1 as MV_1,FFD::D::Rating_f as Rating_1,FFD::D::Norm_f as Norm_1,
FFD::D::Prob_j_f as Proba_1,D::MovieID_f as MV_2,D::Rating_f as Rating_2,
D::Norm_f as Norm_2,D::Prob_j_f as Proba_2;
-- Ajout de la deuxieme boucle
-- Condition de validite second IF
GGD = FILTER GGG BY RANDOM()<Proba_2;
-- Generation des similarites
HH = FOREACH GGD{
val = Rating_1*Rating_2/(((sqrt_gamma.$0>Norm_1)?Norm_1:sqrt_gamma.$0)*((sqrt_gamma.$0>Norm_2)?Norm_2:sqrt_gamma.$0));
GENERATE MV_1,MV_2,val as VAL;
}
DESCRIBE HH;
-- Ajout d un filtre supplementaire pour reduire la taille des resultats
HHH = FILTER HH BY VAL > 0.5;
HHHH = DISTINCT HHH;
STORE GGD INTO '$CONTAINER/$PSEUDO/dom/matrix_all.txt' USING PigStorage(',');
STORE HHH INTO '$CONTAINER/$PSEUDO/dom/similarities.txt' USING PigStorage(',');
Dans la partie de code suivante, nous supprimons les fichiers générés par l'algorithme précédent pour pouvoir les regénérer une deuxième fois.
In [166]:
cl.delete_blob(bs, "hdblobstorage", 'imdd/dom/matrix_all.txt')
cl.delete_blob(bs, "hdblobstorage", 'imdd/dom/similarities.txt')
df = %blob_ls hdblobstorage/imdd/dom/matrix_all.txt/
df
for name in df["name"]:
cl.delete_blob(bs, "hdblobstorage", name)
df = %blob_ls hdblobstorage/imdd/dom/similarities.txt/
df
for name in df["name"]:
cl.delete_blob(bs, "hdblobstorage", name)
Upload du script dimsum.pig et lancement de son exécution :
In [28]:
jid = %hd_pig_submit dimsum.pig
jid
Out[28]:
In [29]:
st = %hd_job_status jid["id"]
st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"]
Out[29]:
In [10]:
df=%blob_ls hdblobstorage/imdd/
list(df["name"])
Out[10]:
L'algorithme DimSum ayant été bien exécuté , nous allons maintenant exploiter notre matrice de similarités à l'aide d'un autre script PIG qui se base sur le fichier de similarités généré par le script PIG vu auparavant.
A partir d'un id d'un film, qui existe dans notre base, nous nous attendrons à récupérer les ids des films dont la similarité calculée est maximale.
In [4]:
%%PIG_azure load_results.pig
cData = LOAD '$CONTAINER/$PSEUDO/dom/similarities.txt'
using PigStorage (',')
AS (MovieID1:int, MovieID2:int, sim:double) ;
filtered = FILTER cData BY MovieID1 == $MvID ;
ordered = ORDER filtered BY sim DESC;
ordered_limit = LIMIT ordered $size;
movies = FOREACH ordered_limit GENERATE MovieID2;
STORE movies INTO '$CONTAINER/imdd/dom/recom.txt' USING PigStorage(',');
Nous supprimons d'abord le fichier généré par la dernière exécution, ensuite nous lançons le script PIG afin de récupérer les ids des films similaires. Pour cet exemple, nous souhaitons récupérer les 20 films les plus proches à celui dont l'id est 1610.
In [8]:
if cl.exists(bs, cl.account_name, "$PSEUDO/imdd/dom/recom.txt"):
r = cl.delete_folder (bs, cl.account_name, "$PSEUDO/imdd/dom/recom.txt")
jid = cl.pig_submit(bs, blobstorage, "load_results.pig",params={"MvID":'1610',"size":"20"})
jid
Out[8]:
In [9]:
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])
Out[9]:
Nous récupérons ensuite le fichier généré recom.txt, contenant les ids :
In [10]:
if os.path.exists("recom.txt"):os.remove("recom.txt")
%blob_downmerge /imdd/dom/recom.txt recom.txt
Out[10]:
Et nous affichons enfin les résultats :
In [11]:
with open('recom.txt', 'r') as f:
ids = f.read()
print(ids)