In [1]:
import os
import sys
import urllib2
import collections
import matplotlib.pyplot as plt
import math
from time import time, sleep
%pylab inline
In [2]:
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError("Please set SPARK_HOME environment variable!")
# Add the py4j to the path.
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'C:/spark/python/lib/py4j-0.9-src.zip'))
In [3]:
from pyspark.mllib.recommendation import ALS, Rating
from pyspark import SparkConf, SparkContext
In [4]:
conf = SparkConf().setMaster("local[*]").setAppName("MovieRecommendationsALS").set("spark.executor.memory", "2g")
sc = SparkContext(conf = conf)
In [5]:
def loadMovieNames():
movieNames = {}
for line in urllib2.urlopen("https://raw.githubusercontent.com/psumank/DATA643/master/WK5/ml-100k/u.item"):
fields = line.split('|')
movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
return movieNames
In [6]:
print "\nLoading movie names..."
nameDict = loadMovieNames()
print "\nLoading ratings data..."
data = sc.textFile("file:///C:/Users/p_sum/.ipynb_checkpoints/ml-100k/u.data")
In [7]:
ratings = data.map(lambda x: x.split()[2])
#action -- just to trigger the driver [ lazy evaluation ]
rating_results = ratings.countByValue()
sortedResults = collections.OrderedDict(sorted(rating_results.items()))
for key, value in sortedResults.iteritems():
print "%s %i" % (key, value)
In [8]:
ratPlot = plt.bar(range(len(sortedResults)), sortedResults.values(), align='center')
plt.xticks(range(len(sortedResults)), list(sortedResults.keys()))
ratPlot[3].set_color('g')
print "Ratings Histogram"
In [9]:
movies = data.map(lambda x: (int(x.split()[1]), 1))
movieCounts = movies.reduceByKey(lambda x, y: x + y)
flipped = movieCounts.map( lambda (x, y) : (y, x))
sortedMovies = flipped.sortByKey(False)
sortedMoviesWithNames = sortedMovies.map(lambda (count, movie) : (nameDict[movie], count))
In [10]:
results = sortedMoviesWithNames.collect()
subset = results[0:10]
popular_movieNm = [str(i[0]) for i in subset]
popularity_strength = [int(i[1]) for i in subset]
In [11]:
popMovplot = plt.barh(range(len(subset)), popularity_strength, align='center')
plt.yticks(range(len(subset)), popular_movieNm)
popMovplot[0].set_color('g')
print "Most Popular Movies from the Dataset"
Find similar movies for a given movie using cosine similarity
In [12]:
ratingsRDD = data.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))
In [13]:
ratingsRDD.takeOrdered(10, key = lambda x: x[0])
Out[13]:
In [14]:
ratingsRDD.take(4)
Out[14]:
In [15]:
# Movies rated by same user. ==> [ user ID ==> ( (movieID, rating), (movieID, rating)) ]
userJoinedRatings = ratingsRDD.join(ratingsRDD)
userJoinedRatings.takeOrdered(10, key = lambda x: x[0])
Out[15]:
In [16]:
# Remove dups
def filterDups( (userID, ratings) ):
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return movie1 < movie2
uniqueUserJoinedRatings = userJoinedRatings.filter(filterDups)
uniqueUserJoinedRatings.takeOrdered(10, key = lambda x: x[0])
Out[16]:
In [17]:
# Now key by (movie1, movie2) pairs ==> (movie1, movie2) => (rating1, rating2)
def makeMovieRatingPairs((user, ratings)):
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return ((movie1, movie2), (rating1, rating2))
moviePairs = uniqueUserJoinedRatings.map(makeMovieRatingPairs)
moviePairs.takeOrdered(10, key = lambda x: x[0])
Out[17]:
In [18]:
#collect all ratings for each movie pair and compute similarity. (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
moviePairRatings = moviePairs.groupByKey()
moviePairRatings.takeOrdered(10, key = lambda x: x[0])
Out[18]:
In [19]:
#Compute Similarity
def cosineSimilarity(ratingPairs):
numPairs = 0
sum_xx = sum_yy = sum_xy = 0
for ratingX, ratingY in ratingPairs:
sum_xx += ratingX * ratingX
sum_yy += ratingY * ratingY
sum_xy += ratingX * ratingY
numPairs += 1
numerator = sum_xy
denominator = sqrt(sum_xx) * sqrt(sum_yy)
score = 0
if (denominator):
score = (numerator / (float(denominator)))
return (score, numPairs)
moviePairSimilarities = moviePairRatings.mapValues(cosineSimilarity).cache()
moviePairSimilarities.takeOrdered(10, key = lambda x: x[0])
Out[19]:
In [20]:
scoreThreshold = 0.97
coOccurenceThreshold = 50
inputMovieID = 1 #Toy Story.
# Filter for movies with this sim that are "good" as defined by our quality thresholds.
filteredResults = moviePairSimilarities.filter(lambda((pair,sim)): \
(pair[0] == inputMovieID or pair[1] == inputMovieID) and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)
In [21]:
#Top 10 by quality score.
results = filteredResults.map(lambda((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)
print "Top 10 similar movies for " + nameDict[inputMovieID]
for result in results:
(sim, pair) = result
# Display the similarity result that isn't the movie we're looking at
similarMovieID = pair[0]
if (similarMovieID == inputMovieID):
similarMovieID = pair[1]
print nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1])
In [22]:
ratings = data.map(lambda l: l.split()).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))).cache()
ratings.take(3)
Out[22]:
In [23]:
nratings = ratings.count()
nUsers = ratings.keys().distinct().count()
nMovies = ratings.values().distinct().count()
print "We have Got %d ratings from %d users on %d movies." % (nratings, nUsers, nMovies)
In [24]:
# Build the recommendation model using Alternating Least Squares
#Train a matrix factorization model given an RDD of ratings given by users to items, in the form of
#(userID, itemID, rating) pairs. We approximate the ratings matrix as the product of two lower-rank matrices
#of a given rank (number of features). To solve for these features, we run a given number of iterations of ALS.
#The level of parallelism is determined automatically based on the number of partitions in ratings.
start = time()
seed = 5L
iterations = 10
rank = 8
model = ALS.train(ratings, rank, iterations)
duration = time() - start
print "Model trained in %s seconds" % round(duration,3)
In [25]:
#Lets recommend movies for the user id - 2
userID = 2
In [26]:
print "\nTop 10 recommendations:"
recommendations = model.recommendProducts(userID, 10)
for recommendation in recommendations:
print nameDict[int(recommendation[1])] + \
" score " + str(recommendation[2])