Music Recommender System using Apache Spark and Python

Estimated time: 8hrs

Description

For this project, you are to create a recommender system that will recommend new musical artists to a user based on their listening history. Suggesting different songs or musical artists to a user is important to many music streaming services, such as Pandora and Spotify. In addition, this type of recommender system could also be used as a means of suggesting TV shows or movies to a user (e.g., Netflix).

To create this system you will be using Spark and the collaborative filtering technique. The instructions for completing this project will be laid out entirely in this file. You will have to implement any missing code as well as answer any questions.

Submission Instructions:

  • Add all of your updates to this IPython file and do not clear any of the output you get from running your code.
  • Upload this file onto moodle.

Datasets

You will be using some publicly available song data from audioscrobbler, which can be found here. However, we modified the original data files so that the code will run in a reasonable time on a single machine. The reduced data files have been suffixed with _small.txt and contains only the information relevant to the top 50 most prolific users (highest artist play counts).

The original data file user_artist_data.txt contained about 141,000 unique users, and 1.6 million unique artists. About 24.2 million users’ plays of artists are recorded, along with their count.

Note that when plays are scribbled, the client application submits the name of the artist being played. This name could be misspelled or nonstandard, and this may only be detected later. For example, "The Smiths", "Smiths, The", and "the smiths" may appear as distinct artist IDs in the data set, even though they clearly refer to the same artist. So, the data set includes artist_alias.txt, which maps artist IDs that are known misspellings or variants to the canonical ID of that artist.

The artist_data.txt file then provides a map from the canonical artist ID to the name of the artist.

Necessary Package Imports


In [1]:
from pyspark.mllib.recommendation import *
import random
from operator import *

Loading data

Load the three datasets into RDDs and name them artistData, artistAlias, and userArtistData. View the README, or the files themselves, to see how this data is formated. Some of the files have tab delimeters while some have space delimiters. Make sure that your userArtistData RDD contains only the canonical artist IDs.


In [2]:
#Loading data into RDD
artistData = sc.textFile("artist_data_small.txt")
artistAlias = sc.textFile("artist_alias_small.txt")
userArtistData = sc.textFile("user_artist_data_small.txt")

alias_data = artistAlias.collect()
user_data = userArtistData.collect()
artist_canonical_dict = {}
user_list = []

for line in alias_data:
    artist_record = line.split("\t")
    artist_canonical_dict[artist_record[0]] = artist_record[1]

#Function to get canonical artist names
def canonicalArtistID(line):
    line = line.split(" ")
    
    if line[1] in artist_canonical_dict:
        return (int(line[0]),int(artist_canonical_dict[line[1]]),int(line[2]))
    else:
        return (int(line[0]),int(line[1]),int(line[2]))
    
#Getting canonical artist names        
userArtistData = userArtistData.map(canonicalArtistID)

#Creating allArtists dataset to be used later during model evaluation process
allArtists = userArtistData.map(lambda x:(x[1])).collect()
allArtists = list(set(allArtists))

Data Exploration

In the blank below, write some code that with find the users' total play counts. Find the three users with the highest number of total play counts (sum of all counters) and print the user ID, the total play count, and the mean play count (average number of times a user played an artist). Your output should look as follows:

User 1059637 has a total play count of 674412 and a mean play count of 1878.
User 2064012 has a total play count of 548427 and a mean play count of 9455.
User 2069337 has a total play count of 393515 and a mean play count of 1519.

In [3]:
artist_data = artistAlias.collect()
    
user_play_count = {}
user_count_number = {}

for line in user_data:
     user_record = line.split()
     if user_record[0] in user_play_count:
         user_play_count[str(user_record[0])] = user_play_count[user_record[0]] + int(user_record[2])
         user_count_number[str(user_record[0])] = user_count_number[user_record[0]] + 1
     else:
         user_play_count[str(user_record[0])] = int(user_record[2])
         user_count_number[str(user_record[0])] = 1
top = 0
maximum = 2

for word, count in sorted(user_play_count.iteritems(), key=lambda (k,v): (v,k), reverse = True):
     if top > maximum:
        break
     print 'User ' + str(word) + ' has a total play count of ' + str(count) + ' and a mean play count of ' + str(count/user_count_number[word]) 
     top += 1


User 1059637 has a total play count of 674412 and a mean play count of 1878
User 2064012 has a total play count of 548427 and a mean play count of 9455
User 2069337 has a total play count of 393515 and a mean play count of 1519

Splitting Data for Testing

Use the randomSplit function to divide the data (userArtistData) into:

  • A training set, trainData, that will be used to train the model. This set should constitute 40% of the data.
  • A validation set, validationData, used to perform parameter tuning. This set should constitute 40% of the data.
  • A test set, testData, used for a final evaluation of the model. This set should constitute 20% of the data.

Use a random seed value of 13. Since these datasets will be repeatedly used you will probably want to persist them in memory using the cache function.

In addition, print out the first 3 elements of each set as well as their sizes; if you created these sets correctly, your output should look as follows:

[(1059637, 1000049, 1), (1059637, 1000056, 1), (1059637, 1000113, 5)]
[(1059637, 1000010, 238), (1059637, 1000062, 11), (1059637, 1000112, 423)]
[(1059637, 1000094, 1), (1059637, 1000130, 19129), (1059637, 1000139, 4)]
19817
19633
10031

In [4]:
#Splitting the data into train, test and cross validation
trainData, validationData, testData = userArtistData.randomSplit([4, 4, 2], 13)

print trainData.take(3)
print validationData.take(3)
print testData.take(3)
print trainData.count()
print validationData.count()
print testData.count()

#Caching and creating ratings object
trainData = trainData.map(lambda l: Rating(*l)).cache()
validationData = validationData.map(lambda l: Rating(*l)).cache()
testData = testData.map(lambda l: Rating(*l)).cache()


[(1059637, 1000049, 1), (1059637, 1000056, 1), (1059637, 1000113, 5)]
[(1059637, 1000010, 238), (1059637, 1000062, 11), (1059637, 1000112, 423)]
[(1059637, 1000094, 1), (1059637, 1000130, 19129), (1059637, 1000139, 4)]
19817
19633
10031

The Recommender Model

For this project, we will train the model with implicit feedback. You can read more information about this from the collaborative filtering page: http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html. The function you will be using has a few tunable parameters that will affect how the model is built. Therefore, to get the best model, we will do a small parameter sweep and choose the model that performs the best on the validation set

Therefore, we must first devise a way to evaluate models. Once we have a method for evaluation, we can run a parameter sweep, evaluate each combination of parameters on the validation data, and choose the optimal set of parameters. The parameters then can be used to make predictions on the test data.

Model Evaluation

Although there may be several ways to evaluate a model, we will use a simple method here. Suppose we have a model and some dataset of true artist plays for a set of users. This model can be used to predict the top X artist recommendations for a user and these recommendations can be compared the artists that the user actually listened to (here, X will be the number of artists in the dataset of true artist plays). Then, the fraction of overlap between the top X predictions of the model and the X artists that the user actually listened to can be calculated. This process can be repeated for all users and an average value returned.

For example, suppose a model predicted [1,2,4,8] as the top X=4 artists for a user. Suppose, that user actually listened to the artists [1,3,7,8]. Then, for this user, the model would have a score of 2/4=0.5. To get the overall score, this would be performed for all users, with the average returned.

NOTE: when using the model to predict the top-X artists for a user, do not include the artists listed with that user in the training data.

Name your function modelEval and have it take a model (the output of ALS.trainImplicit) and a dataset as input. For parameter tuning, the dataset parameter should be set to the validation data (validationData). After parameter tuning, the model can be evaluated on the test data (testData).


In [5]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from collections import defaultdict

#model evaluation function
def modelEval(model, dataset):
    global trainData
    global allArtists
    
    #Getting nonTrainArtists for each user
    userArtists = defaultdict(list)
    
    for data in trainData.collect():
        userArtists[data[0]].append(data[1])
        
    cvList = []
        
    for key in userArtists.keys():
        userArtists[key] = list(set(allArtists) - set(userArtists[key]))
        for artist in userArtists[key]:
            cvList.append((key, artist))
    
    #Creating user,nonTrainArtists RDD
    cvData = sc.parallelize(cvList)
    
    userOriginal = dataset.map(lambda x:(x.user, (x.product, x.rating))).groupByKey().collect()
    
    #prediction on the user, nonTrainArtists RDD
    predictions = model.predictAll(cvData)
    userPredictions = predictions.map(lambda x:(x.user, (x.product, x.rating))).groupByKey().collect()
    original = {}
    predictions = {}
    
    #Getting top X artists for each user
    for line in userOriginal:
        original[line[0]] = sorted(line[1], key=lambda x:x[1], reverse = True)
        
    for line in userPredictions:
        predictions[line[0]] = sorted(line[1], key=lambda x:x[1], reverse = True)
        
    similarity = []
    
    for key in userOriginal:
        similar = 0.0
        
        pred = predictions[key[0]]
        org = original[key[0]]
            
        for value in org:
            for item in pred[0:len(org)]:
                if (value[0] == item[0]):
                    similar += 1
                    break
                    
        #Similarity calculation        
        similarity.append(float(similar/len(org)))        
        
    string = "The model score for rank " + str(rank) + " is " + str(float(sum(similarity)/len(similarity)))    
    print string

Model Construction

Now we can build the best model possibly using the validation set of data and the modelEval function. Although, there are a few parameters we could optimize, for the sake of time, we will just try a few different values for the rank parameter (leave everything else at its default value, except make seed=345). Loop through the values [2, 10, 20] and figure out which one produces the highest scored based on your model evaluation function.

Note: this procedure may take several minutes to run.

For each rank value, print out the output of the modelEval function for that model. Your output should look as follows:

The model score for rank 2 is 0.090431
The model score for rank 10 is 0.095294
The model score for rank 20 is 0.090248

In [6]:
#Model evaluation through different rank parameters
rank_list = [2, 10, 20]

for rank in rank_list:
    model = ALS.trainImplicit(trainData, rank, seed=345)
    modelEval(model,validationData)


The model score for rank 2 is 0.0909391661474
The model score for rank 10 is 0.0957125879247
The model score for rank 20 is 0.09047041725

Now, using the bestModel, we will check the results over the test data. Your result should be ~0.0507.


In [7]:
bestModel = ALS.trainImplicit(trainData, rank=10, seed=345)
modelEval(bestModel, testData)


The model score for rank 20 is 0.0512321818394

Trying Some Artist Recommendations

Using the best model above, predict the top 5 artists for user 1059637 using the recommendProducts function. Map the results (integer IDs) into the real artist name using artistAlias. Print the results. The output should look as follows:

Artist 0: Brand New
Artist 1: Taking Back Sunday
Artist 2: Evanescence
Artist 3: Elliott Smith
Artist 4: blink-182

In [8]:
ratings = bestModel.recommendProducts(1059637, 5)

In [9]:
import re
artist_data = artistData.collect()

artist_names_dict = {}

for line in artist_data:
    pattern = re.match( r'(\d+)(\s+)(.*)', line)
    artist_names_dict[str(pattern.group(1))] = pattern.group(3)

for i in range(0,5):
    if str(ratings[i].product) in artist_canonical_dict:
        artist_id = artist_canonical_dict[str(ratings[i].product)]
        print "Artist " + str(i) + ": "  + str(artist_names_dict[str(artist_id)])
    else:
        print "Artist " + str(i) + ": " + str(artist_names_dict[str(ratings[i].product)])


Artist 0: Brand New
Artist 1: Taking Back Sunday
Artist 2: Evanescence
Artist 3: Elliott Smith
Artist 4: blink-182

In [ ]:


In [ ]:


In [ ]: