Basic Data Ingestion, Aggregations, and Linear Models


In [1]:
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))


Attaching package: 'SparkR'

The following objects are masked from 'package:stats':

    cov, filter, lag, na.omit, predict, sd, var

The following objects are masked from 'package:base':

    colnames, colnames<-, intersect, rank, rbind, sample, subset,
    summary, table, transform

Initialize SparkContext, SQLContext, and HiveContext


In [2]:
sc <- sparkR.init(sparkJars="/usr/share/java/mysql-connector-java.jar", 
  sparkPackages="com.databricks:spark-csv_2.10:1.4.0")
sqlContext <- sparkRSQL.init(sc)
hiveContext <- sparkRHive.init(sc)


Launching java with spark-submit command /root/spark-1.6.1-bin-fluxcapacitor/bin/spark-submit --jars /usr/share/java/mysql-connector-java-5.1.28.jar --packages com.databricks:spark-csv_2.10:1.4.0 sparkr-shell /tmp/RtmpScARtp/backend_port4b7c16ad5660 

Read Movie Ratings CSV


In [3]:
movieRatingsCsvDF <- read.df(sqlContext, 
  "/root/pipeline/datasets/movielens/ml-latest/ratings.csv", 
  "com.databricks.spark.csv", header="true") 
head(movieRatingsCsvDF)


Out[3]:
userIdmovieIdratingtimestamp
11504.01329753504
212964.01329753602
313184.51329753494
415274.51329753507
515413.01329753607
616084.01329753638

Read Movie Ratings From Hive


In [4]:
movieRatingsHiveDF <- sql(hiveContext, "SELECT * FROM movie_ratings")
head(results)


Error in head(results): object 'results' not found

Show Only Ratings == 5


In [5]:
head(filter(movieRatingsHiveDF, movieRatingsHiveDF$rating == 5))


Out[5]:
userIdmovieIdratingtimestamp
11389751329753716
21478351329754027
31497951329753751
41499551329753888
51638051329753988
61736151329753448

Aggregate and Count By UserId


In [6]:
userIdCounts <- 
  summarize(groupBy(movieRatingsHiveDF, movieRatingsHiveDF$userId), 
  count = n(movieRatingsHiveDF$userId))
head(arrange(userIdCounts, desc(userIdCounts$count)))


Out[6]:
userIdcount
1923029269
21427887515
384526779
41650055679
52214985644
62166325601

Train Linear Regression Model


In [7]:
linearRegressionModel <- glm(rating ~ userId + movieId, 
  data = movieRatingsHiveDF, family = "gaussian")

Predict Using Trained Linear Regression Model


In [8]:
predictionsDF <- predict(linearRegressionModel, movieRatingsHiveDF)

Calculate Errors


In [9]:
errorsDF <- select(
    predictionsDF, predictionsDF$label, predictionsDF$prediction, 
    predictionsDF$userId, predictionsDF$movieId, 
    alias(predictionsDF$label - predictionsDF$prediction, "error"))
head(errorsDF)


Out[9]:
labelpredictionuserIdmovieIderror
143.5221741500.4778256
243.52220912960.4777909
34.53.52221213180.9777878
44.53.52224215270.9777584
533.5222441541-0.5222436
643.52225316080.4777469