Movie recommder example

this example is a EclairJS (JavaScript) implementation of movie recommending.

This notebook requires the following additional setup

Create the spark context and globals


In [1]:
var SparkContext = require('eclairjs/SparkContext');
var SparkConf = require('eclairjs/SparkConf');
var sparkConf = new SparkConf()
    .set("spark.executor.memory", "10g")
    .set("spark.driver.memory", "6g")
    .setMaster("local[*]")
    .setAppName("movie_recommender");
var sc = new SparkContext(sparkConf);
var Tuple2 = require('eclairjs/Tuple2');
var Tuple3 = require('eclairjs/Tuple3');
var ALS = require('eclairjs/mllib/recommendation/ALS');
var Rating = require('eclairjs/mllib/recommendation/Rating');

Set the path to the movieLens Datasets


In [3]:
var pathToSmallDataset = '../data/mllib/ml-latest-small';
var pathToCompleteDataset = '../data/mllib/ml-latest-small';

load the small rating dataset


In [4]:
var small_ratings_raw_data = sc.textFile(pathToSmallDataset + '/ratings.csv');
var small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0];
var small_ratings_data = small_ratings_raw_data
    .filter(function(line, small_ratings_raw_data_header) {
        // filters out the header
        return line != small_ratings_raw_data_header;
    }, [small_ratings_raw_data_header])
    .map(function(line, Rating) {
        var tokens = line.split(",");
         return new Rating(tokens[0],tokens[1],tokens[2]);
    },[Rating])
    .cache();

JSON.stringify(small_ratings_data.take(3));


Out[4]:
[{"user":1,"product":16,"rating":4},{"user":1,"product":24,"rating":1.5},{"user":1,"product":32,"rating":4}]

load the small moving dataset


In [5]:
var small_movies_raw_data = sc.textFile(pathToSmallDataset + '/movies.csv');
var small_movies_raw_data_header = small_movies_raw_data.take(1)[0];
var small_movies_data = small_movies_raw_data
    .filter(function(line, small_movies_raw_data_header) {
        // filters out the header
        return line != small_movies_raw_data_header;
    }, [small_movies_raw_data_header])
    .map(function(line, Tuple2) {
        var fields = line.split(",");
        return new Tuple2(parseInt(fields[0]), fields[1]);
    }, [Tuple2])
    .cache();

var small_movies_titles = small_movies_data
    .mapToPair(function( tuple2, Tuple2) { // Tuple2
        return new Tuple2(tuple2[0], tuple2[1]);
    }, [Tuple2]);
JSON.stringify(small_movies_data.take(3));


Out[5]:
[{"0":1,"1":"Toy Story (1995)","length":2},{"0":2,"1":"Jumanji (1995)","length":2},{"0":3,"1":"Grumpier Old Men (1995)","length":2}]

Selecting ALS parameters using the small dataset

In order to determine the best ALS parameters, we will use the small dataset. We need first to split it into train, validation, and test datasets.


In [6]:
var seed = 0;
var split = small_ratings_data.randomSplit([0.6, 0.2, 0.2], seed)
var training_RDD = split[0];
var validation_RDD = split[1];
var test_RDD = split[2];

var  validation_for_predict_RDD = validation_RDD
    .map(function(rating, Tuple2) {
        return new Tuple2(rating.user(), rating.product());
    }, [Tuple2]);
JSON.stringify(validation_for_predict_RDD.take(3));


Out[6]:
[{"0":1,"1":50,"length":2},{"0":1,"1":161,"length":2},{"0":1,"1":356,"length":2}]

Proceed with the training phase.


In [7]:
seed = 5
var iterations = 10
var regularization_parameter = 0.1
var ranks = [4, 8, 12];
var errors = [0, 0, 0];
var err = 0
var tolerance = 0.02

var min_error = Number.POSITIVE_INFINITY
var best_rank = -1
var best_iteration = -1
var blocks = -1;
var lambda = regularization_parameter;

ranks.forEach(function(rank) {
    var model = ALS.train(training_RDD, rank, iterations, regularization_parameter, blocks, seed);
    var predictions = model.predict(validation_for_predict_RDD)
        .mapToPair(function(rating, Tuple2) {
            return new Tuple2(new Tuple2(rating.user(), rating.product()), rating.rating());
        }, [Tuple2]);

    var rates_and_preds = validation_RDD
        .mapToPair(function(rating, Tuple2) {
            return new Tuple2(new Tuple2(rating.user(), rating.product()), rating.rating());
        }, [Tuple2])
        .join(predictions);

    var t = rates_and_preds
        .mapToFloat(function(tuple) {
            var y =tuple._2()._1() - tuple._2()._2();
            return Math.pow(y, 2);
        });
    var error = Math.sqrt(t.mean());
    errors[err] = error;
    err += 1;
    if (error < min_error) {
        min_error = error;
        best_rank = rank;
    }

});

"The best model was trained with rank " +best_rank;


[Stage 32:>                                                         (0 + 0) / 8]
Out[7]:
The best model was trained with rank 4

To build our recommender model, we will use the complete dataset.


In [8]:
var complete_ratings_raw_data = 
    sc.textFile(pathToCompleteDataset + '/ratings.csv');
var complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0];

var complete_ratings_data = complete_ratings_raw_data
    .filter(function (line, complete_ratings_raw_data_header) {
        return line != complete_ratings_raw_data_header;
    }, [complete_ratings_raw_data_header])
    .map(function( line, Rating) {
        var fields = line.split(",");
        var userId = parseInt(fields[0]);
        var movieId = parseInt(fields[1]);
        var rating = parseFloat(fields[2]);
        return new Rating(userId, movieId, rating);
    }, [Rating])
    .cache();

JSON.stringify("There are recommendations in the complete dataset:  " + complete_ratings_data.count());


Out[8]:
"There are recommendations in the complete dataset:  105339"

We test on our testing set.


In [9]:
var splits2 = complete_ratings_data.randomSplit([0.7, 0.3], 0);
training_RDD = splits2[0];
test_RDD = splits2[1];

var complete_model = ALS.train(training_RDD, best_rank, iterations, regularization_parameter, blocks, seed);

var test_for_predict_RDD = test_RDD
    .map(function (rating, Tuple2) {
        return new Tuple2(rating.user(), rating.product());
    }, [Tuple2]);

var predictions = complete_model.predict(test_for_predict_RDD)
    .mapToPair(function( rating, Tuple2) {
        return new Tuple2(new Tuple2(rating.user(), rating.product()), rating.rating());
    }, [Tuple2]);

var rates_and_preds = test_RDD
    .mapToPair(function( rating, Tuple2) {
        return new Tuple2(new Tuple2(rating.user(), rating.product()), rating.rating());
    }, [Tuple2])
    .join(predictions);

var t = rates_and_preds
    .mapToFloat( function( x) {
        return Math.pow(x._2()._1() - x._2()._2(), 2);
    });
var error = Math.sqrt(t.mean());
JSON.stringify("For testing data the RMSE is " + error);


Out[9]:
"For testing data the RMSE is 0.9066538818812507"

How to make recommendations

So let's first load the movies complete file for later use.


In [10]:
var complete_movies_raw_data = 
    sc.textFile(pathToCompleteDataset + '/movies.csv');
var complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0];
var complete_movies_data = complete_movies_raw_data
    .filter(function(line, complete_movies_raw_data_header) {
        // filters out the header
        return line != complete_movies_raw_data_header;
    }, [complete_movies_raw_data_header])
    .map(function(line, Tuple2) {
        var fields = line.split(",");
        return new Tuple2(parseInt(fields[0]), fields[1]);
    }, [Tuple2]).cache();

var complete_movies_titles = complete_movies_data
    .mapToPair(function( tuple2, Tuple2) { // Tuple2
        return new Tuple2(tuple2._1(), tuple2._2());
    }, [Tuple2]);

JSON.stringify("There are movies in the complete dataset " + complete_movies_titles.count());


Out[10]:
"There are movies in the complete dataset 10329"

Give recommendations of movies

Another thing we want to do, is give recommendations of movies with a certain minimum number of ratings. For that, we need to count the number of ratings per movie.


In [11]:
var movie_ID_with_ratings_RDD = complete_ratings_data
    .mapToPair(function( rating, Tuple2) {
        return new Tuple2(rating.product(), rating.rating());
    }, [Tuple2])
    .groupByKey();

var movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD
    .mapToPair(function( ID_and_ratings_tuple, Tuple2) {
        var w = ID_and_ratings_tuple._2();
        var count = 0;
        var sum = 0;
        for (var i = 0; i < w.length; i++) {
            var r = w[i];
            sum += r;
            count++;
        }

        var avgRating = sum / count;
        return new Tuple2(ID_and_ratings_tuple._1(), new Tuple2(count, avgRating));

    }, [Tuple2]);

var movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD
    .mapToPair(function( ID_with_avg_ratings, Tuple2) {
        return new Tuple2(ID_with_avg_ratings._1(), ID_with_avg_ratings._2()._1()); // movieID, rating count
    }, [Tuple2]);

JSON.stringify("movie_ID_with_avg_ratings_RDD  " + movie_ID_with_avg_ratings_RDD.take(10));


Out[11]:
"movie_ID_with_avg_ratings_RDD  (1084,(45.0,3.966666666666667)),(32196,(2.0,4.0)),(1490,(1.0,0.5)),(91902,(1.0,3.0)),(68522,(1.0,5.0)),(3702,(43.0,3.5348837209302326)),(6754,(26.0,3.4423076923076925)),(68482,(1.0,2.5)),(6308,(4.0,3.0)),(91622,(3.0,3.3333333333333335))"

Rate some movies for the new user.


In [12]:
var new_user_ID = 0;

// The format of each line is (userID, movieID, rating)
var new_user_ratings = [
    new Rating(0, 260, 9), // Star Wars (1977)
    new Rating(0, 1, 8), // Toy Story (1995)
    new Rating(0, 16, 7), // Casino (1995)
    new Rating(0, 25, 8), // Leaving Las Vegas (1995)
    new Rating(0, 32, 9), // Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
    new Rating(0, 335, 4), // Flintstones, The (1994)
    new Rating(0, 379, 3), // Timecop (1994)
    new Rating(0, 296, 7), // Pulp Fiction (1994)
    new Rating(0, 858, 10), // Godfather, The (1972)
    new Rating(0, 50, 8) // Usual Suspects, The (1995)
];
var new_user_ratings_RDD = sc.parallelize(new_user_ratings);
JSON.stringify("New user ratings: " + new_user_ratings_RDD.take(10));


Out[12]:
"New user ratings: (0,260,9.0),(0,1,8.0),(0,16,7.0),(0,25,8.0),(0,32,9.0),(0,335,4.0),(0,379,3.0),(0,296,7.0),(0,858,10.0),(0,50,8.0)"

Add them to the data we will use to train our recommender model.


In [13]:
var complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD);


var new_ratings_model = 
    ALS.train(complete_data_with_new_ratings_RDD, best_rank, 
              iterations, regularization_parameter, blocks, seed);

/*
 Let's now get some recommendations
 */

//  get just movie IDs
var new_user_ratings_ids = [];
for (var i = 0; i < new_user_ratings.length; i++) {
    new_user_ratings_ids.push(new_user_ratings[i].product());
}

// keep just those not on the ID list
var new_user_unrated_movies_RDD = complete_movies_data.filter(function( tuple, new_user_ratings_ids) {
        if (new_user_ratings_ids.indexOf(tuple._1()) < 0) {
            return true;
        } else {
            return false;
        }
}, [new_user_ratings_ids])
.map(function( tuple, new_user_ID, Tuple2) {
        return new Tuple2(new_user_ID, tuple._1());
}, [new_user_ID, Tuple2]);
// Use the input RDD, new_user_unrated_movies_RDD, 
//with new_ratings_model.predictAll() to predict new ratings for the movies
var new_user_recommendations_RDD = new_ratings_model.predict(new_user_unrated_movies_RDD);

// Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
var new_user_recommendations_rating_RDD = new_user_recommendations_RDD.mapToPair( function( rating, Tuple2) {
        return new Tuple2(rating.product(), rating.rating());
}, [Tuple2]);

var new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_RDD
    .join(complete_movies_titles)
    .join(movie_rating_counts_RDD);

"new_user_recommendations_rating_title_and_count_RDD " + new_user_recommendations_rating_title_and_count_RDD.count();


Out[13]:
new_user_recommendations_rating_title_and_count_RDD 10315

Flatten the RDD

we need to flat this down a bit in order to have (Title, Rating, Ratings Count).


In [14]:
var new_user_recommendations_rating_title_and_count_RDD2 =
    new_user_recommendations_rating_title_and_count_RDD.map(function( t, Tuple3) {
        var x = new Tuple3(t._2()._1()._2(), t._2()._1()._1(), t._2()._2());
        return x;
}, [Tuple3]);
"count" + new_user_recommendations_rating_title_and_count_RDD2.count();
"new_user_recommendations_rating_title_and_count_RDD2" + 
        JSON.stringify(new_user_recommendations_rating_title_and_count_RDD2.take(3));


Out[14]:
new_user_recommendations_rating_title_and_count_RDD2[{"0":"Shackleton's Antarctic Adventure (2001)","1":7.553736917670094,"2":1,"length":3},{"0":"\"Call of Cthulhu","1":8.860291348428946,"2":2,"length":3},{"0":"RV (2006)","1":2.476218985783453,"2":4,"length":3}]

Get highest rated recommendations

Finally, get the highest rated recommendations for the new user, filtering out movies with less than 25 ratings.


In [15]:
var new_user_recommendations_rating_title_and_count_RDD2_filtered =
    new_user_recommendations_rating_title_and_count_RDD2.filter(function( tuple3) {
        if (tuple3._3() < 25) {
            return false;
        } else {
            return true;
        }
    });

/*
list top 25
 */

var top_movies = new_user_recommendations_rating_title_and_count_RDD2_filtered.takeOrdered(25,
   function(tuple3_a, tuple3_b){
        var aRate = tuple3_a._2();
        var bRate = tuple3_b._2();
        return aRate > bRate ? -1 : aRate == bRate? 0 : 1;

});

var str = "TOP recommended movies (with more than 25 reviews):\n\n";

for (var i = 0; i < top_movies.length; i++) {
    str += top_movies[i]._1() + " average rating " + 
    top_movies[i]._2() + " number of ratings " + top_movies[i]._3() + "\n";
}


[Stage 990:==============>                                          (2 + 6) / 8]
Out[15]:
TOP recommended movies (with more than 25 reviews):

Being There (1979) average rating 8.628200481841834 number of ratings 26
Annie Hall (1977) average rating 8.546276091351059 number of ratings 68
My Neighbor Totoro (Tonari no Totoro) (1988) average rating 8.519469203967729 number of ratings 28
Citizen Kane (1941) average rating 8.496776961299442 number of ratings 77
Cinema Paradiso (Nuovo cinema Paradiso) (1989) average rating 8.440391886714995 number of ratings 37
All About Eve (1950) average rating 8.42478972179922 number of ratings 27
Princess Mononoke (Mononoke-hime) (1997) average rating 8.42020502325419 number of ratings 52
Manhattan (1979) average rating 8.400647804793403 number of ratings 40
Ran (1985) average rating 8.369219421907811 number of ratings 31
Spirited Away (Sen to Chihiro no kamikakushi) (2001) average rating 8.367017216556984 number of ratings 72
Lone Star (1996) average rating 8.313256584773576 number of ratings 35
Monty Python and the Holy Grail (1975) average rating 8.310843186811322 number of ratings 154
Old Boy (2003) average rating 8.310831456431455 number of ratings 38
Band of Brothers (2001) average rating 8.294547580780362 number of ratings 26
One Flew Over the Cuckoo's Nest (1975) average rating 8.2834222602241 number of ratings 143
Goodfellas (1990) average rating 8.282319190627655 number of ratings 135
"Sting average rating 8.2696152337938 number of ratings 77
Fargo (1996) average rating 8.268114373240525 number of ratings 201
Rear Window (1954) average rating 8.26193105355311 number of ratings 74
Up (2009) average rating 8.256685003489332 number of ratings 62
Sunset Blvd. (a.k.a. Sunset Boulevard) (1950) average rating 8.245400319453282 number of ratings 29
Harold and Maude (1971) average rating 8.229893025284042 number of ratings 33
"Departed average rating 8.228682614639016 number of ratings 87
12 Angry Men (1957) average rating 8.214880043481715 number of ratings 63
Into the Wild (2007) average rating 8.212655662767194 number of ratings 32

Look up movie Title from ID


In [34]:
var DataTypes = require('eclairjs/sql/types/DataTypes');
var RowFactory = require('eclairjs/sql/RowFactory');
var SQLContext = require('eclairjs/sql/SQLContext');
var movieID = "1";
 //Generate the schema
var sqlContext = new SQLContext(sc);
var fields = [];
fields.push(DataTypes.createStructField("id", DataTypes.IntegerType, true));
fields.push(DataTypes.createStructField("title", DataTypes.StringType, true));
    
var schema = DataTypes.createStructType(fields);
var rowRDD = complete_movies_data.map(function (tuple2, RowFactory) {
    return RowFactory.create([tuple2._1(), tuple2._2()]);
}, [RowFactory]);

//Apply the schema to the RDD.
var complete_movies_titlesDF = sqlContext.createDataFrame(rowRDD, schema);
var col = complete_movies_titlesDF.col("id");
//var col2 = complete_movies_titlesDF.col("title");
var testCol = col.equalTo(movieID);
var result = complete_movies_titlesDF.filter(testCol).collect();

JSON.stringify(result);


Out[34]:
[{"values":[1,"Toy Story (1995)"],"schema":{"fields":[{"name":"id","dataType":"integer","nullable":true},{"name":"title","dataType":"string","nullable":true}]}}]

Lookup movie id from title


In [39]:
var movieTitle = "Toy Story";
//var col = complete_movies_titlesDF.col("id");
var col2 = complete_movies_titlesDF.col("title");
var testCol = col2.contains(movieTitle);
var result = complete_movies_titlesDF.filter(testCol).collect();

JSON.stringify(result);


Out[39]:
[{"values":[1,"Toy Story (1995)"],"schema":{"fields":[{"name":"id","dataType":"integer","nullable":true},{"name":"title","dataType":"string","nullable":true}]}},{"values":[3114,"Toy Story 2 (1999)"],"schema":{"fields":[{"name":"id","dataType":"integer","nullable":true},{"name":"title","dataType":"string","nullable":true}]}},{"values":[78499,"Toy Story 3 (2010)"],"schema":{"fields":[{"name":"id","dataType":"integer","nullable":true},{"name":"title","dataType":"string","nullable":true}]}}]

Predicted rating for a particular movie for a given user.


In [17]:
var my_movie = sc.parallelizePairs([new Tuple2(0, 500)]); // Quiz Show (1994)
var individual_movie_rating_RDD = new_ratings_model.predict(my_movie);

"Predicted rating for movie " + individual_movie_rating_RDD.take(1);


Out[17]:
Predicted rating for movie (0,500,5.7011845091042765)

In [ ]: