IMPLEMENTATION DE L'ALGORITHME DIMSUM SOUS PIG

David DUONG PRUNIER - Ismail MACHRAOUI

MASTERE SPECIALISE DATA SCIENCE

Pour cette étude, nous allons implémenter l'algorithme DIMSUM décrit par Bosagh-Zadesh en 2012. Cet algorithme propose de trouver toutes les paires d'éléments semblables. La littérature sur cet algorithme nous propose différentes versions du même algorithme, plus ou moins puissantes, plus pou moins flexibles. Il est intéressant de noter que Twitter l'a implémenté dans ses clusters et a rendu publique l'algorithme modifié qu'ils ont utilisé. C'est cet algorithme que nous allons implémenter [2].
Sans entrer dans les détails, le coeur de l'algorithme repose sur le calcul de la matrice $A^tA$ (A transposé x A), et la sélection des paires selon une probabilité donnée. Pour l'implémentation, nous avons choisi de développer l'algorithme sous PIG, dans un cluster Azure. Le jeu de donnée de test sera une base composée des notes données par les utilisateurs à différents films. L'algorithme trouvera les paires de films équivalents.

Nous effectuerons notre étude sur un jeu de données composé de 1000208 notes de films basé sur 6040 utilisateurs et 3952 films différents.

Une fois les similarités calculées, nous effectuerons un test afin de récupérer les films similaires à un film qu'on passera en input.

[1] Bosagh-Zadeh, Reza and Goel, Ashish (2012), Dimension Independent Similarity Computation, arXiv:1206.2082 http://arxiv.org/abs/1206.2082

[2] https://blog.twitter.com/2014/all-pairs-similarity-via-dimsum

ETAPE PAR ETAPE

Mise en place des librairies et des modules de connexions

On code en dur afin d'éviter de retaper tout le temps les identifiants. Attention c'est une mauvaise pratique.


In [1]:
import pyensae
import os
blobstorage = "hdblobstorage" #blobhp["blob_storage"]
blobpassword = "jQIPVO/T54w8X49UPIbzAVvaNO3wmuUwI4/o9AJnCaPTHoCQnsaGBUkT4eIyi0BRQavgc/TAQMQwy8eu19CSBQ==" #blobhp["password1"]#
hadoop_server = "sparkclus2ensae"#blobhp["hadoop_server"]#
hadoop_password = "ENSAEspark1;"#blobhp["password2"]#
username = "imdd" #blobhp["username"]

On ouvre la connexion au cluster et au blob


In [22]:
#%blob_close
cl, bs = %hd_open 
cl,bs


Out[22]:
(<pyensae.remote.azure_connection.AzureClient at 0x6f1128>,
 <azure.storage.blob.blobservice.BlobService at 0x6f1320>)

On upload les fichiers qui contient tous les ratings des users.


In [118]:
%blob_up data/ratings_mean.csv hdblobstorage/imdd/ratings_mean.csv


Out[118]:
'imdd/ratings_mean.csv'

On vérifie que tous les fichiers sont présents dans le blob


In [141]:
#List files in blob storage
df=%blob_ls hdblobstorage/imdd/
df


Out[141]:
name last_modified content_type content_length blob_type
0 imdd/ConfLongDemo_JSI.txt Sun, 17 Jan 2016 14:44:35 GMT application/octet-stream 21546346 BlockBlob
1 imdd/ConfLongDemo_JSI_small.txt Sun, 17 Jan 2016 16:47:23 GMT application/octet-stream 638 BlockBlob
2 imdd/dom Thu, 28 Jan 2016 17:21:25 GMT 0 BlockBlob
3 imdd/exp_data_final_short.csv Mon, 25 Jan 2016 22:48:10 GMT application/octet-stream 1704019 BlockBlob
4 imdd/exp_original.csv Mon, 25 Jan 2016 23:16:54 GMT application/octet-stream 12553678 BlockBlob
5 imdd/exp_original_medium.csv Thu, 28 Jan 2016 07:24:52 GMT application/octet-stream 5086 BlockBlob
6 imdd/exp_original_short.csv Wed, 27 Jan 2016 22:10:26 GMT application/octet-stream 1785 BlockBlob
7 imdd/numpyudf.py Sun, 17 Jan 2016 16:38:30 GMT application/octet-stream 702 BlockBlob
8 imdd/ratings_mean.csv Thu, 28 Jan 2016 22:47:18 GMT application/octet-stream 18589813 BlockBlob
9 imdd/ratings_mean_short.csv Thu, 28 Jan 2016 12:51:01 GMT application/octet-stream 2802 BlockBlob
10 imdd/scripts/pig/exptestnumpy.pig Thu, 28 Jan 2016 23:39:40 GMT application/octet-stream 2367 BlockBlob
11 imdd/scripts/pig/exptestnumpy.pig.log Thu, 28 Jan 2016 23:40:31 GMT 0 BlockBlob
12 imdd/scripts/pig/exptestnumpy.pig.log/exit Thu, 28 Jan 2016 23:40:31 GMT application/octet-stream 3 BlockBlob
13 imdd/scripts/pig/exptestnumpy.pig.log/stderr Thu, 28 Jan 2016 23:40:21 GMT application/octet-stream 6502 BlockBlob
14 imdd/scripts/pig/exptestnumpy.pig.log/stdout Thu, 28 Jan 2016 23:40:21 GMT application/octet-stream 136 BlockBlob
15 imdd/testcpython.py Fri, 15 Jan 2016 21:36:32 GMT application/octet-stream 486 BlockBlob

On code l'algorithme en PIG. La difficulté est que PIG gère très mal l'imbrication de FOREACH, absolument nécessaire à l'algorithme. Notre solution s'est portée sur la mise à plat totale des données. D'où le FLATTEN puis nous avons effectué un JOIN pour la deuxième boucle. Puis nous avons appliqué les règles définies par l'algorithme.

En sortie nous obtenons l'ensemble des paires semblables, avec leur mesure de similarité. On stocke dans un fichier pour l'exploiter ensuite avec un autre script PIG. Pour réduire le nombre de résultats, nous ne retiendrons que les paires de films qui ont une similarité de plus de 0.5 .


In [12]:
%%PIG_azure dimsum.pig

-- Macro de calcul des normes par colonne (movieID)
DEFINE computeMatrixNorms(cData,sqrt_gamma) RETURNS Matrix_Norms {
    cData_grp = GROUP $cData BY MovieID;
    -- On calcule la norme et le gamma sur la norme
    $Matrix_Norms = FOREACH cData_grp {
        tmp_out = FOREACH $cData GENERATE Rating*Rating;
        out = SUM(tmp_out);
        GENERATE group as MovieID, SQRT(out) as Norm, ($sqrt_gamma.$0/SQRT(out)>1?1:$sqrt_gamma.$0/SQRT(out)) as Prob_j;
    }
}

cData =  LOAD '$CONTAINER/imdd/ratings_mean.csv'
        using PigStorage (',')
        AS (UserID:int, MovieID:int, Rating:double) ;
        
-- On calcule le gamma
users = GROUP cData all ;
total= FOREACH users GENERATE MAX($1.UserID) as m, MAX($1.MovieID) as n;
sqrt_gamma = FOREACH total GENERATE SQRT(4*LOG(n)/0.7) as a;
                

-- On calcule la norme et le gamma sur la norme
Matrix_Norms = computeMatrixNorms(cData,sqrt_gamma);


-- On ajoute la colonne Norm et probabilite dans cData
C = JOIN cData BY MovieID,Matrix_Norms BY MovieID;
D = FOREACH C GENERATE cData::UserID as UserID_f,cData::MovieID as MovieID_f,cData::Rating as Rating_f,
                        Matrix_Norms::Norm as Norm_f,Matrix_Norms::Prob_j as Prob_j_f;

Matrix_data = GROUP D BY UserID_f;
FF = FOREACH Matrix_data GENERATE group as UID, FLATTEN(D.MovieID_f) as MV1;

-- Ajout des informations de MV1
FFF = JOIN FF BY (UID,MV1), D BY (UserID_f,MovieID_f); 

-- Condition de validite premier IF
FFD = FILTER FFF BY RANDOM()<Prob_j_f;

-- Ajout de la seconde loop
GG = JOIN FFD BY UID, D BY UserID_f;

-- Cleaning du tableau
GGG = FOREACH GG GENERATE FFD::FF::UID as UserID,FFD::FF::MV1 as MV_1,FFD::D::Rating_f as Rating_1,FFD::D::Norm_f as Norm_1,
                FFD::D::Prob_j_f as Proba_1,D::MovieID_f as MV_2,D::Rating_f as Rating_2,
                D::Norm_f as Norm_2,D::Prob_j_f as Proba_2;
                                
-- Ajout de la deuxieme boucle
-- Condition de validite second IF
GGD = FILTER GGG BY RANDOM()<Proba_2;

-- Generation des similarites
HH = FOREACH GGD{
    val = Rating_1*Rating_2/(((sqrt_gamma.$0>Norm_1)?Norm_1:sqrt_gamma.$0)*((sqrt_gamma.$0>Norm_2)?Norm_2:sqrt_gamma.$0));
    GENERATE MV_1,MV_2,val as VAL;
}
DESCRIBE HH;
-- Ajout d un filtre supplementaire pour reduire la taille des resultats
HHH = FILTER HH BY VAL > 0.5;
HHHH = DISTINCT HHH;

STORE GGD INTO '$CONTAINER/$PSEUDO/dom/matrix_all.txt' USING PigStorage(',');
STORE HHH INTO '$CONTAINER/$PSEUDO/dom/similarities.txt' USING PigStorage(',');

Dans la partie de code suivante, nous supprimons les fichiers générés par l'algorithme précédent pour pouvoir les regénérer une deuxième fois.


In [166]:
cl.delete_blob(bs, "hdblobstorage", 'imdd/dom/matrix_all.txt')
cl.delete_blob(bs, "hdblobstorage", 'imdd/dom/similarities.txt')
df = %blob_ls hdblobstorage/imdd/dom/matrix_all.txt/
df
for name in df["name"]:
    cl.delete_blob(bs, "hdblobstorage", name)
df = %blob_ls hdblobstorage/imdd/dom/similarities.txt/
df
for name in df["name"]:
    cl.delete_blob(bs, "hdblobstorage", name)

Upload du script dimsum.pig et lancement de son exécution :


In [28]:
jid = %hd_pig_submit dimsum.pig
jid


Out[28]:
{'id': 'job_1452664005967_1065'}

In [29]:
st = %hd_job_status jid["id"]
st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"]


Out[29]:
('job_1452664005967_1065', '100% complete', 'done', True, 'SUCCEEDED')

In [10]:
df=%blob_ls hdblobstorage/imdd/
list(df["name"])


Out[10]:
['imdd/ConfLongDemo_JSI.txt',
 'imdd/ConfLongDemo_JSI_small.txt',
 'imdd/dom',
 'imdd/dom/matrix_all.txt',
 'imdd/dom/matrix_all.txt/_SUCCESS',
 'imdd/dom/matrix_all.txt/part-r-00000',
 'imdd/dom/matrix_all11.txt',
 'imdd/dom/matrix_all11.txt/_SUCCESS',
 'imdd/dom/matrix_all11.txt/part-r-00000',
 'imdd/dom/matrix_all_f.txt',
 'imdd/dom/matrix_all_f.txt/_SUCCESS',
 'imdd/dom/matrix_all_f.txt/part-r-00000',
 'imdd/dom/recom.txt',
 'imdd/dom/recom.txt/_SUCCESS',
 'imdd/dom/recom.txt/part-r-00000',
 'imdd/dom/similarities.txt',
 'imdd/dom/similarities.txt/_SUCCESS',
 'imdd/dom/similarities.txt/part-m-00000',
 'imdd/dom/similarities11.txt',
 'imdd/dom/similarities11.txt/_SUCCESS',
 'imdd/dom/similarities11.txt/part-m-00000',
 'imdd/dom/similarities_f.txt',
 'imdd/dom/similarities_f.txt/_SUCCESS',
 'imdd/dom/similarities_f.txt/part-m-00000',
 'imdd/dom/similarities_f.txt/part-m-00001',
 'imdd/dom/similarities_f.txt/part-m-00002',
 'imdd/dom/similarities_f.txt/part-m-00003',
 'imdd/dom/similarities_f.txt/part-m-00004',
 'imdd/dom/similarities_f.txt/part-m-00005',
 'imdd/exp_data_final_short.csv',
 'imdd/exp_original.csv',
 'imdd/exp_original_medium.csv',
 'imdd/exp_original_short.csv',
 'imdd/numpyudf.py',
 'imdd/ratings_mean.csv',
 'imdd/ratings_mean_short.csv',
 'imdd/scripts/pig/dimsum.pig',
 'imdd/scripts/pig/dimsum.pig.log',
 'imdd/scripts/pig/dimsum.pig.log/exit',
 'imdd/scripts/pig/dimsum.pig.log/stderr',
 'imdd/scripts/pig/dimsum.pig.log/stdout',
 'imdd/scripts/pig/exptestnumpy.pig',
 'imdd/scripts/pig/exptestnumpy.pig.log',
 'imdd/scripts/pig/exptestnumpy.pig.log/exit',
 'imdd/scripts/pig/exptestnumpy.pig.log/stderr',
 'imdd/scripts/pig/exptestnumpy.pig.log/stdout',
 'imdd/scripts/pig/exptestnumpy_f.pig',
 'imdd/scripts/pig/exptestnumpy_f.pig.log',
 'imdd/scripts/pig/exptestnumpy_f.pig.log/exit',
 'imdd/scripts/pig/exptestnumpy_f.pig.log/stderr',
 'imdd/scripts/pig/exptestnumpy_f.pig.log/stdout',
 'imdd/scripts/pig/load_results.pig',
 'imdd/scripts/pig/load_results.pig.log',
 'imdd/scripts/pig/load_results.pig.log/exit',
 'imdd/scripts/pig/load_results.pig.log/stderr',
 'imdd/scripts/pig/load_results.pig.log/stdout',
 'imdd/testcpython.py']

Test

L'algorithme DimSum ayant été bien exécuté , nous allons maintenant exploiter notre matrice de similarités à l'aide d'un autre script PIG qui se base sur le fichier de similarités généré par le script PIG vu auparavant.

A partir d'un id d'un film, qui existe dans notre base, nous nous attendrons à récupérer les ids des films dont la similarité calculée est maximale.


In [4]:
%%PIG_azure load_results.pig


cData =  LOAD '$CONTAINER/$PSEUDO/dom/similarities.txt'
        using PigStorage (',')
        AS (MovieID1:int, MovieID2:int, sim:double) ;

filtered = FILTER cData BY MovieID1 == $MvID ;
ordered = ORDER filtered BY sim DESC;
ordered_limit = LIMIT ordered $size;
movies = FOREACH ordered_limit GENERATE MovieID2;
STORE movies INTO '$CONTAINER/imdd/dom/recom.txt' USING PigStorage(',');

Nous supprimons d'abord le fichier généré par la dernière exécution, ensuite nous lançons le script PIG afin de récupérer les ids des films similaires. Pour cet exemple, nous souhaitons récupérer les 20 films les plus proches à celui dont l'id est 1610.


In [8]:
if cl.exists(bs, cl.account_name, "$PSEUDO/imdd/dom/recom.txt"):
    r = cl.delete_folder (bs, cl.account_name, "$PSEUDO/imdd/dom/recom.txt")
jid = cl.pig_submit(bs, blobstorage, "load_results.pig",params={"MvID":'1610',"size":"20"})
jid


Out[8]:
{'id': 'job_1452664005967_0843'}

In [9]:
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])


Out[9]:
('job_1452664005967_0843', None, 'done', True, 'SUCCEEDED')

Nous récupérons ensuite le fichier généré recom.txt, contenant les ids :


In [10]:
if os.path.exists("recom.txt"):os.remove("recom.txt")
%blob_downmerge /imdd/dom/recom.txt recom.txt


Out[10]:
'recom.txt'

Et nous affichons enfin les résultats :


In [11]:
with open('recom.txt', 'r') as f:
    ids = f.read()
    print(ids)


3255
3147
380
3095
3071
3068
3035
3030
2943
982
2858
368
920
356
349
318
2571
2501
2396
2353