this example is a EclairJS (JavaScript) implementation of movie recommending.
In [1]:
var SparkContext = require('eclairjs/SparkContext');
var SparkConf = require('eclairjs/SparkConf');
var sparkConf = new SparkConf()
.set("spark.executor.memory", "10g")
.set("spark.driver.memory", "6g")
.setMaster("local[*]")
.setAppName("movie_recommender");
var sc = new SparkContext(sparkConf);
var Tuple2 = require('eclairjs/Tuple2');
var Tuple3 = require('eclairjs/Tuple3');
var ALS = require('eclairjs/mllib/recommendation/ALS');
var Rating = require('eclairjs/mllib/recommendation/Rating');
In [3]:
var pathToSmallDataset = '../data/mllib/ml-latest-small';
var pathToCompleteDataset = '../data/mllib/ml-latest-small';
In [4]:
var small_ratings_raw_data = sc.textFile(pathToSmallDataset + '/ratings.csv');
var small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0];
var small_ratings_data = small_ratings_raw_data
.filter(function(line, small_ratings_raw_data_header) {
// filters out the header
return line != small_ratings_raw_data_header;
}, [small_ratings_raw_data_header])
.map(function(line, Rating) {
var tokens = line.split(",");
return new Rating(tokens[0],tokens[1],tokens[2]);
},[Rating])
.cache();
JSON.stringify(small_ratings_data.take(3));
Out[4]:
In [5]:
var small_movies_raw_data = sc.textFile(pathToSmallDataset + '/movies.csv');
var small_movies_raw_data_header = small_movies_raw_data.take(1)[0];
var small_movies_data = small_movies_raw_data
.filter(function(line, small_movies_raw_data_header) {
// filters out the header
return line != small_movies_raw_data_header;
}, [small_movies_raw_data_header])
.map(function(line, Tuple2) {
var fields = line.split(",");
return new Tuple2(parseInt(fields[0]), fields[1]);
}, [Tuple2])
.cache();
var small_movies_titles = small_movies_data
.mapToPair(function( tuple2, Tuple2) { // Tuple2
return new Tuple2(tuple2[0], tuple2[1]);
}, [Tuple2]);
JSON.stringify(small_movies_data.take(3));
Out[5]:
In [6]:
var seed = 0;
var split = small_ratings_data.randomSplit([0.6, 0.2, 0.2], seed)
var training_RDD = split[0];
var validation_RDD = split[1];
var test_RDD = split[2];
var validation_for_predict_RDD = validation_RDD
.map(function(rating, Tuple2) {
return new Tuple2(rating.user(), rating.product());
}, [Tuple2]);
JSON.stringify(validation_for_predict_RDD.take(3));
Out[6]:
In [7]:
seed = 5
var iterations = 10
var regularization_parameter = 0.1
var ranks = [4, 8, 12];
var errors = [0, 0, 0];
var err = 0
var tolerance = 0.02
var min_error = Number.POSITIVE_INFINITY
var best_rank = -1
var best_iteration = -1
var blocks = -1;
var lambda = regularization_parameter;
ranks.forEach(function(rank) {
var model = ALS.train(training_RDD, rank, iterations, regularization_parameter, blocks, seed);
var predictions = model.predict(validation_for_predict_RDD)
.mapToPair(function(rating, Tuple2) {
return new Tuple2(new Tuple2(rating.user(), rating.product()), rating.rating());
}, [Tuple2]);
var rates_and_preds = validation_RDD
.mapToPair(function(rating, Tuple2) {
return new Tuple2(new Tuple2(rating.user(), rating.product()), rating.rating());
}, [Tuple2])
.join(predictions);
var t = rates_and_preds
.mapToFloat(function(tuple) {
var y =tuple._2()._1() - tuple._2()._2();
return Math.pow(y, 2);
});
var error = Math.sqrt(t.mean());
errors[err] = error;
err += 1;
if (error < min_error) {
min_error = error;
best_rank = rank;
}
});
"The best model was trained with rank " +best_rank;
Out[7]:
In [8]:
var complete_ratings_raw_data =
sc.textFile(pathToCompleteDataset + '/ratings.csv');
var complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0];
var complete_ratings_data = complete_ratings_raw_data
.filter(function (line, complete_ratings_raw_data_header) {
return line != complete_ratings_raw_data_header;
}, [complete_ratings_raw_data_header])
.map(function( line, Rating) {
var fields = line.split(",");
var userId = parseInt(fields[0]);
var movieId = parseInt(fields[1]);
var rating = parseFloat(fields[2]);
return new Rating(userId, movieId, rating);
}, [Rating])
.cache();
JSON.stringify("There are recommendations in the complete dataset: " + complete_ratings_data.count());
Out[8]:
In [9]:
var splits2 = complete_ratings_data.randomSplit([0.7, 0.3], 0);
training_RDD = splits2[0];
test_RDD = splits2[1];
var complete_model = ALS.train(training_RDD, best_rank, iterations, regularization_parameter, blocks, seed);
var test_for_predict_RDD = test_RDD
.map(function (rating, Tuple2) {
return new Tuple2(rating.user(), rating.product());
}, [Tuple2]);
var predictions = complete_model.predict(test_for_predict_RDD)
.mapToPair(function( rating, Tuple2) {
return new Tuple2(new Tuple2(rating.user(), rating.product()), rating.rating());
}, [Tuple2]);
var rates_and_preds = test_RDD
.mapToPair(function( rating, Tuple2) {
return new Tuple2(new Tuple2(rating.user(), rating.product()), rating.rating());
}, [Tuple2])
.join(predictions);
var t = rates_and_preds
.mapToFloat( function( x) {
return Math.pow(x._2()._1() - x._2()._2(), 2);
});
var error = Math.sqrt(t.mean());
JSON.stringify("For testing data the RMSE is " + error);
Out[9]:
In [10]:
var complete_movies_raw_data =
sc.textFile(pathToCompleteDataset + '/movies.csv');
var complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0];
var complete_movies_data = complete_movies_raw_data
.filter(function(line, complete_movies_raw_data_header) {
// filters out the header
return line != complete_movies_raw_data_header;
}, [complete_movies_raw_data_header])
.map(function(line, Tuple2) {
var fields = line.split(",");
return new Tuple2(parseInt(fields[0]), fields[1]);
}, [Tuple2]).cache();
var complete_movies_titles = complete_movies_data
.mapToPair(function( tuple2, Tuple2) { // Tuple2
return new Tuple2(tuple2._1(), tuple2._2());
}, [Tuple2]);
JSON.stringify("There are movies in the complete dataset " + complete_movies_titles.count());
Out[10]:
In [11]:
var movie_ID_with_ratings_RDD = complete_ratings_data
.mapToPair(function( rating, Tuple2) {
return new Tuple2(rating.product(), rating.rating());
}, [Tuple2])
.groupByKey();
var movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD
.mapToPair(function( ID_and_ratings_tuple, Tuple2) {
var w = ID_and_ratings_tuple._2();
var count = 0;
var sum = 0;
for (var i = 0; i < w.length; i++) {
var r = w[i];
sum += r;
count++;
}
var avgRating = sum / count;
return new Tuple2(ID_and_ratings_tuple._1(), new Tuple2(count, avgRating));
}, [Tuple2]);
var movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD
.mapToPair(function( ID_with_avg_ratings, Tuple2) {
return new Tuple2(ID_with_avg_ratings._1(), ID_with_avg_ratings._2()._1()); // movieID, rating count
}, [Tuple2]);
JSON.stringify("movie_ID_with_avg_ratings_RDD " + movie_ID_with_avg_ratings_RDD.take(10));
Out[11]:
In [12]:
var new_user_ID = 0;
// The format of each line is (userID, movieID, rating)
var new_user_ratings = [
new Rating(0, 260, 9), // Star Wars (1977)
new Rating(0, 1, 8), // Toy Story (1995)
new Rating(0, 16, 7), // Casino (1995)
new Rating(0, 25, 8), // Leaving Las Vegas (1995)
new Rating(0, 32, 9), // Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
new Rating(0, 335, 4), // Flintstones, The (1994)
new Rating(0, 379, 3), // Timecop (1994)
new Rating(0, 296, 7), // Pulp Fiction (1994)
new Rating(0, 858, 10), // Godfather, The (1972)
new Rating(0, 50, 8) // Usual Suspects, The (1995)
];
var new_user_ratings_RDD = sc.parallelize(new_user_ratings);
JSON.stringify("New user ratings: " + new_user_ratings_RDD.take(10));
Out[12]:
In [13]:
var complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD);
var new_ratings_model =
ALS.train(complete_data_with_new_ratings_RDD, best_rank,
iterations, regularization_parameter, blocks, seed);
/*
Let's now get some recommendations
*/
// get just movie IDs
var new_user_ratings_ids = [];
for (var i = 0; i < new_user_ratings.length; i++) {
new_user_ratings_ids.push(new_user_ratings[i].product());
}
// keep just those not on the ID list
var new_user_unrated_movies_RDD = complete_movies_data.filter(function( tuple, new_user_ratings_ids) {
if (new_user_ratings_ids.indexOf(tuple._1()) < 0) {
return true;
} else {
return false;
}
}, [new_user_ratings_ids])
.map(function( tuple, new_user_ID, Tuple2) {
return new Tuple2(new_user_ID, tuple._1());
}, [new_user_ID, Tuple2]);
// Use the input RDD, new_user_unrated_movies_RDD,
//with new_ratings_model.predictAll() to predict new ratings for the movies
var new_user_recommendations_RDD = new_ratings_model.predict(new_user_unrated_movies_RDD);
// Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
var new_user_recommendations_rating_RDD = new_user_recommendations_RDD.mapToPair( function( rating, Tuple2) {
return new Tuple2(rating.product(), rating.rating());
}, [Tuple2]);
var new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_RDD
.join(complete_movies_titles)
.join(movie_rating_counts_RDD);
"new_user_recommendations_rating_title_and_count_RDD " + new_user_recommendations_rating_title_and_count_RDD.count();
Out[13]:
In [14]:
var new_user_recommendations_rating_title_and_count_RDD2 =
new_user_recommendations_rating_title_and_count_RDD.map(function( t, Tuple3) {
var x = new Tuple3(t._2()._1()._2(), t._2()._1()._1(), t._2()._2());
return x;
}, [Tuple3]);
"count" + new_user_recommendations_rating_title_and_count_RDD2.count();
"new_user_recommendations_rating_title_and_count_RDD2" +
JSON.stringify(new_user_recommendations_rating_title_and_count_RDD2.take(3));
Out[14]:
In [15]:
var new_user_recommendations_rating_title_and_count_RDD2_filtered =
new_user_recommendations_rating_title_and_count_RDD2.filter(function( tuple3) {
if (tuple3._3() < 25) {
return false;
} else {
return true;
}
});
/*
list top 25
*/
var top_movies = new_user_recommendations_rating_title_and_count_RDD2_filtered.takeOrdered(25,
function(tuple3_a, tuple3_b){
var aRate = tuple3_a._2();
var bRate = tuple3_b._2();
return aRate > bRate ? -1 : aRate == bRate? 0 : 1;
});
var str = "TOP recommended movies (with more than 25 reviews):\n\n";
for (var i = 0; i < top_movies.length; i++) {
str += top_movies[i]._1() + " average rating " +
top_movies[i]._2() + " number of ratings " + top_movies[i]._3() + "\n";
}
Out[15]:
In [34]:
var DataTypes = require('eclairjs/sql/types/DataTypes');
var RowFactory = require('eclairjs/sql/RowFactory');
var SQLContext = require('eclairjs/sql/SQLContext');
var movieID = "1";
//Generate the schema
var sqlContext = new SQLContext(sc);
var fields = [];
fields.push(DataTypes.createStructField("id", DataTypes.IntegerType, true));
fields.push(DataTypes.createStructField("title", DataTypes.StringType, true));
var schema = DataTypes.createStructType(fields);
var rowRDD = complete_movies_data.map(function (tuple2, RowFactory) {
return RowFactory.create([tuple2._1(), tuple2._2()]);
}, [RowFactory]);
//Apply the schema to the RDD.
var complete_movies_titlesDF = sqlContext.createDataFrame(rowRDD, schema);
var col = complete_movies_titlesDF.col("id");
//var col2 = complete_movies_titlesDF.col("title");
var testCol = col.equalTo(movieID);
var result = complete_movies_titlesDF.filter(testCol).collect();
JSON.stringify(result);
Out[34]:
In [39]:
var movieTitle = "Toy Story";
//var col = complete_movies_titlesDF.col("id");
var col2 = complete_movies_titlesDF.col("title");
var testCol = col2.contains(movieTitle);
var result = complete_movies_titlesDF.filter(testCol).collect();
JSON.stringify(result);
Out[39]:
In [17]:
var my_movie = sc.parallelizePairs([new Tuple2(0, 500)]); // Quiz Show (1994)
var individual_movie_rating_RDD = new_ratings_model.predict(my_movie);
"Predicted rating for movie " + individual_movie_rating_RDD.take(1);
Out[17]:
In [ ]: