MovieLens Dataset (https://grouplens.org/datasets/movielens/100k/)
100'000 movie ratings by 1'000 users for 1'700 different movies.
In [1]:
lines = sc.textFile("../data/ml-100k/u.data")
print(lines.first())
# user_id movie_id rating timestamp
Read text file line by line. Split it at '\t', parse it and create Row objects with ratings.
In [2]:
from pyspark.mllib.recommendation import Rating
from pyspark.sql import Row
ratings_split = lines.map(lambda s: s.split("\t"))
ratings = ratings_split.map(
lambda col: Row(userid=int(col[0]), movieid=int(col[1]), rating=float(col[2]))
)
Loead titles and movie IDs. Split this text file at |. Column 0 is the ID, column 1 is the title of the movie.
In [3]:
movies = sc.textFile("../data/ml-100k/u.item")
titles = movies.map(lambda s: s.split('|')).map(lambda line: (int(line[0]), line[1])).collectAsMap()
Show first ten ratings.
In [4]:
first10 = ratings.map(lambda r: '{:3d}'.format(r.userid) + ": "+str(r.rating) + " "+titles[r.movieid]).take(10)
for s in first10: print(s)
In [5]:
from pyspark.ml.recommendation import ALS
Matrix factorization by ALS (Alternate Least Squares) using 10 iterationens and rank 10, that is, 10 latent dimensions. Use 0.01 as regularization parameter.
In [6]:
als = ALS(rank=10, maxIter=10, regParam=0.01, userCol="userid",
itemCol="movieid", ratingCol="rating")
Train model (fit).
In [7]:
ratingsDF = spark.createDataFrame(ratings)
model = als.fit(ratingsDF)
User factor matrix: Latent features of user 10.
In [8]:
model.userFactors.first()
Out[8]:
item (movie) factor matrix: Latent features of movie with ID 10.
In [9]:
model.itemFactors.first()
Out[9]:
Ratings provided by user 51.
In [10]:
user = 51
user_ratings = ratings.filter(lambda r: r["userid"]==user).map(
lambda r: str(r["rating"]) + ": " + titles[r["movieid"]]).collect()
for ur in user_ratings: print(ur)
Predicted rating for this user for four arbitrarily chosen movies.
In [11]:
movies = [56, 176, 161, 179]
user_movies = spark.createDataFrame([(user, m) for m in movies],
["userid", "movieid"])
predictions = model.transform(user_movies).rdd.map(
lambda r: "User " +str(r["userid"]) + ": predicted rating " +
"{:5.3f}".format(r["prediction"]) +
" for " + titles[r["movieid"]]).collect()
for p in predictions: print(p)
In [12]:
movie = 50
print(titles[movie])
In [13]:
movie_features = model.itemFactors.rdd
import numpy as np
query_movie = np.array(movie_features.filter(lambda r: r.id == movie).map(lambda r: r.features).first())
print(query_movie)
In [14]:
def cosine_similarity(a, b):
return np.dot(a, b)/(np.linalg.norm(a)*np.linalg.norm(b))
Top-10 of the most similar movies to "Star Wars".
In [15]:
top10 = movie_features.map(lambda r: (r.id, cosine_similarity(query_movie, r.features))
).sortBy(lambda r: -r[1]).map(lambda r: (titles[r[0]], r[1])).take(10)
for t, r in top10: print("{:.3}".format(r)+": "+t)
In [ ]: