In [1]:
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
In [2]:
sc <- sparkR.init(sparkJars="/usr/share/java/mysql-connector-java.jar",
sparkPackages="com.databricks:spark-csv_2.10:1.4.0")
sqlContext <- sparkRSQL.init(sc)
hiveContext <- sparkRHive.init(sc)
In [3]:
movieRatingsCsvDF <- read.df(sqlContext,
"/root/pipeline/datasets/movielens/ml-latest/ratings.csv",
"com.databricks.spark.csv", header="true")
head(movieRatingsCsvDF)
Out[3]:
In [4]:
movieRatingsHiveDF <- sql(hiveContext, "SELECT * FROM movie_ratings")
head(results)
In [5]:
head(filter(movieRatingsHiveDF, movieRatingsHiveDF$rating == 5))
Out[5]:
In [6]:
userIdCounts <-
summarize(groupBy(movieRatingsHiveDF, movieRatingsHiveDF$userId),
count = n(movieRatingsHiveDF$userId))
head(arrange(userIdCounts, desc(userIdCounts$count)))
Out[6]:
In [7]:
linearRegressionModel <- glm(rating ~ userId + movieId,
data = movieRatingsHiveDF, family = "gaussian")
In [8]:
predictionsDF <- predict(linearRegressionModel, movieRatingsHiveDF)
In [9]:
errorsDF <- select(
predictionsDF, predictionsDF$label, predictionsDF$prediction,
predictionsDF$userId, predictionsDF$movieId,
alias(predictionsDF$label - predictionsDF$prediction, "error"))
head(errorsDF)
Out[9]: