In [1]:
import numpy as np
In [5]:
# renvoie :
# un dictionnaire movies {id_film : NomFilm}
# un dictionnaire data {idUser : {Nomfilm : rating}}
# un dictionnaire times {idUser : {Nomfilm : time}}
def loadMovieLens(path='./data/ml-100k'):
# Get movie titles
movies={}
for line in open(path+'/u.item'):
(id,title)=line.split('|')[0:2]
movies[id]=title
# Load data
prefs={} # Un dictionnaire User > Item > Rating
times={} # Un dictionnaire User > Item > Timestamps
for line in open(path+'/u.data'):
(user,movieid,rating,ts)=line.split('\t')
prefs.setdefault(user,{})
prefs[user][movies[movieid]]=float(rating)
times.setdefault(user,{})
times[user][movies[movieid]]=float(ts)
return movies, prefs, times
def loadMovieLens1M(path='./data/ml-1m'):
# Get movie titles
movies={}
for line in open(path+'/movies.dat'):
id,title=line.split('::')[0:2]
movies[id]=title
# Load data
prefs={}
times={}
for line in open(path+'/ratings.dat'):
(user,movieid,rating,ts)=line.split('::')
prefs.setdefault(user,{})
prefs[user][movies[movieid]]=float(rating)
times.setdefault(user,{})
times[user][movies[movieid]]=float(ts)
return movies, prefs, times
In [6]:
def getTriplet(data):
triplet = []
for u in data.keys():
for i in data[u].keys():
triplet.append([u,i,data[u][i]])
return triplet
def getDataByUsers(triplet) :
dataByUsers = {}
for t in triplet:
if not t[0] in dataByUsers.keys():
dataByUsers[t[0]] = {}
dataByUsers[t[0]][t[1]] = float(t[2])
return dataByUsers
def getDataByItems(triplet) :
dataByItems = {}
for t in triplet:
if not t[1] in dataByItems.keys():
dataByItems[t[1]] = {}
dataByItems[t[1]][t[0]] = float(t[2])
return dataByItems
# Split l'ensemble des triplets
def splitTrainTest(triplet, testProp) :
perm = np.random.permutation(triplet)
splitIndex = int(testProp * len(triplet))
return perm[splitIndex:], perm[:splitIndex]
# supprime des données de test les données inconnus en train
def deleteUnknowData(triplet_test, trainUsers, trainItems) :
to_Del = []
for i,t in enumerate(triplet_test):
if not t[0] in trainUsers:
to_Del.append(i)
elif not t[1] in trainItems:
to_Del.append(i)
return np.delete(triplet_test, to_Del, 0)
In [18]:
# LoadMovieLense
movies,data,ts = loadMovieLens()
triplet = getTriplet(data)
# split 80% train 20% test
triplet_train, triplet_test = splitTrainTest(triplet , 0.2)
# train
trainUsers = getDataByUsers(triplet_train)
trainItems = getDataByItems(triplet_train)
#print len(triplet_test)
triplet_test = deleteUnknowData(triplet_test, trainUsers, trainItems)
#print len(triplet_test)
# test
testUsers = getDataByUsers(triplet_test)
testItems = getDataByItems(triplet_test)
In [8]:
class BLMeanUsers() :
def __init__(self):
self.Y = {}
def fit(self, trainUsers) :
for u in trainUsers.keys():
self.Y[u] = 0
for i in trainUsers[u].keys():
self.Y[u] = self.Y[u] + trainUsers[u][i]
self.Y[u] = self.Y[u]*1.0 / len(trainUsers[u])
def predict(self, triplet_test):
pred = np.zeros(len(triplet_test))
for ind,c in enumerate(triplet_test):
pred[ind] = self.Y[c[0]]
return pred
def loss(self, triplet_test) :
return ((self.predict(triplet_test) - np.array(triplet_test[:,2], float)) ** 2).mean()
In [9]:
model = BLMeanUsers()
model.fit(trainUsers)
print "erreur en test:", model.loss(triplet_test)
In [10]:
class BLMeanItems():
def __init__(self):
self.Y = {}
def fit(self, dataItems):
self.Y = {}
for i in dataItems.keys():
self.Y[i] = 0
for u in dataItems[i].keys():
self.Y[i] = self.Y[i] + dataItems[i][u]
self.Y[i] = self.Y[i]*1.0 / len(dataItems[i])
def predict(self, triplet_test):
pred = np.zeros(len(triplet_test))
for ind,c in enumerate(triplet_test):
pred[ind] = self.Y[c[1]]
return pred
def loss(self, triplet_test) :
return ((self.predict(triplet_test) - np.array(triplet_test[:,2], float)) ** 2).mean()
In [11]:
model = BLMeanItems()
model.fit(trainItems)
print "erreur en test:", model.loss(triplet_test)
In [12]:
class FactoMatrice():
def __init__(self, k, epsilon=1e-3, nbIter=2000, lamb=0.5):
self.k = k
self.lamb = lamb
self.epsilon = epsilon
self.nbIter = nbIter
# descente de gradient stochastique avec mise à jour altérnée
def fit(self, trainUsers, trainItems, triplet):
self.p = {}
self.q = {}
self.triplet = triplet
for j in range(len(triplet)): # On initialise les cases vides en random
u = triplet[j][0]
i = triplet[j][1]
if not u in self.p:
self.p[u] = np.random.rand(1,self.k) # matrice ligne pour un users
if not i in self.q:
self.q[i] = np.random.rand(self.k,1) # matrice colonne pour un item
loss = []
for it in range(self.nbIter):
ind = np.random.randint(len(triplet))
u = triplet[ind][0]
i = triplet[ind][1]
tmp = trainUsers[u][i] - self.p[u].dot(self.q[i])[0][0]
self.p[u] = (1 - self.lamb * self.epsilon) * self.p[u] + self.epsilon * 2 * tmp * self.q[i].transpose()
self.q[i] = (1 - self.lamb * self.epsilon) * self.q[i] + self.epsilon * 2 * tmp * self.p[u].transpose()
loss.append(tmp*tmp) # erreur sans régularisation
#loss.append(tmp**2 + self.lamb *(np.linalg.norm(self.p[u]).sum()**2 + np.linalg.norm(self.q[i]).sum()**2))
if ((it)%(self.nbIter*0.2) == 0) :
print "itération : " , it
print "loss : ", np.mean(loss)
print "-------"
loss = []
def predict(self, triplet_test):
pred = np.zeros(len(triplet_test))
for ind,t in enumerate(triplet_test):
pred[ind] = self.p[t[0]].dot(self.q[t[1]])[0][0]
return pred
def score(self, triplet_test) :
return ((self.predict(triplet_test) - np.array(triplet_test[:,2], float)) ** 2).mean()
In [13]:
k = 10
epsilon = 7e-3
nbIter = 20*len(triplet_train)
lamb = 0.2
model = FactoMatrice(k, epsilon=epsilon, nbIter=nbIter,lamb=lamb)
model.fit(trainUsers, trainItems, triplet_train)
print "erreur en test:", model.score(triplet_test)
In [14]:
class FactoMatriceBiais():
def __init__(self, k, epsilon=1e-3, nbIter=2000, lamb=0.5):
self.k = k
self.lamb = lamb
self.epsilon = epsilon
self.nbIter = nbIter
def fit(self, trainUsers, trainItems, triplet):
self.p = {}
self.q = {}
self.bu = {} #biais sur les utilisateurs
self.bi = {} #biais sur les items
self.mu = np.random.random() * 2 - 1
for j in range(len(triplet)): # On initialise les cases vides en random
u = triplet[j][0]
i = triplet[j][1]
if not u in self.p:
self.p[u] = np.random.rand(1,self.k) # matrice ligne pour un users
self.bu[u] = np.random.rand() * 2 - 1
if not i in self.q:
self.q[i] = np.random.rand(self.k,1) # matrice colonne pour un item
self.bi[i] = np.random.rand() * 2 - 1
loss = []
for it in range(self.nbIter):
ind = np.random.randint(len(triplet))
u = triplet[ind][0]
i = triplet[ind][1]
tmp = trainUsers[u][i] - (self.mu + self.bi[i] + self.bu[u] +self.p[u].dot(self.q[i])[0][0])
self.p[u] = (1 - self.lamb * self.epsilon) * self.p[u] + self.epsilon * 2 * tmp * self.q[i].transpose()
self.bu[u] = (1 - self.lamb * self.epsilon) * self.bu[u] + self.epsilon * 2 * tmp
self.q[i] = (1 - self.lamb * self.epsilon) * self.q[i] + self.epsilon * 2 * tmp * self.p[u].transpose()
self.bi[i] = (1 - self.lamb * self.epsilon) * self.bi[i] + self.epsilon * 2 * tmp
self.mu = (1 - self.lamb * self.epsilon) * self.mu + self.epsilon * 2 * tmp
loss.append(tmp*tmp) # erreur sans régularisation
#loss.append(tmp**2 + self.lamb *(np.linalg.norm(self.p[u]).sum()**2 + np.linalg.norm(self.q[i]).sum()**2))
if ((it)%(self.nbIter*0.2) == 0) :
print "itération : " , it
print "loss : ", np.mean(loss)
print "-------"
loss = []
def predict(self, triplet_test):
pred = np.zeros(len(triplet_test))
for ind,t in enumerate(triplet_test):
pred[ind] = self.mu + self.bu[t[0]] + self.bi[t[1]] + self.p[t[0]].dot(self.q[t[1]])[0][0]
return pred
def score(self, triplet_test) :
return ((self.predict(triplet_test) - np.array(triplet_test[:,2], float)) ** 2).mean()
In [19]:
len(triplet_train)
Out[19]:
In [15]:
k = 10
epsilon = 7e-3
nbIter = 20*len(triplet_train)
lamb = 0.2
model = FactoMatriceBiais(k, epsilon=epsilon, nbIter=nbIter,lamb=lamb)
model.fit(trainUsers, trainItems, triplet_train)
print "erreur en test:", model.score(triplet_test)
In [16]:
# LoadMovieLense
movies,data,ts = loadMovieLens1M()
triplet = getTriplet(data)
# split 80% train 20% test
triplet_train, triplet_test = splitTrainTest(triplet , 0.2)
# train
trainUsers = getDataByUsers(triplet_train)
trainItems = getDataByItems(triplet_train)
#print len(triplet_test)
triplet_test = deleteUnknowData(triplet_test, trainUsers, trainItems)
#print len(triplet_test)
# test
testUsers = getDataByUsers(triplet_test)
testItems = getDataByItems(triplet_test)
In [ ]:
model = BLMeanUsers()
model.fit(trainUsers)
print "erreur en test:", model.loss(triplet_test)
In [ ]:
model = BLMeanItems()
model.fit(trainItems)
print "erreur en test:", model.loss(triplet_test)
In [ ]:
k = 10
epsilon = 7e-3
nbIter = 20*len(triplet_train)
lamb = 0.2
model = FactoMatrice(k, epsilon=epsilon, nbIter=nbIter,lamb=lamb)
model.fit(trainUsers, trainItems, triplet_train)
print "erreur en test:", model.score(triplet_test)
In [78]:
k = 10
epsilon = 7e-3
nbIter = 20*len(triplet_train)
lamb = 0.2
model = FactoMatriceBiais(k, epsilon=epsilon, nbIter=nbIter,lamb=lamb)
model.fit(trainUsers, trainItems, triplet_train)
print "erreur en test:", model.score(triplet_test)
In [ ]: