Data: Movie Ratings and Recommendation
An independent movie company is looking to invest in a new movie project. With limited finance, the company wants to analyze the reaction of audiences, particularly toward various movie genres, in order to identify beneficial movie project to focus on. The company relies on data collected from a publicly available recommendation service by MovieLens. This dataset contains 27,000,000 ratings and 1,100,000 tag applications applied to 58,000 movies by 280,000 users. It includes tag genome data with 14 million relevance scores across 1,100 tags and was last updated 9/2018.
From this dataset, several analyses are possible, include the followings:
These types of analyses, which are somewhat ambiguous, demand the ability to quickly process large amount of data in elatively short amount of time for decision support purposes. In these situations, the sizes of the data typically make analysis done on a single machine impossible and analysis done using a remote storage system impractical. For remainder of the lessons, we will learn how HDFS provides the basis to store massive amount of data and to enable the programming approach to analyze these data.
$ hdfs dfs -ls -h /repository/movielens
$ hdfs dfs -ls /repository/movielens
$ hdfs dfs -cat /repository/movielens/README.txt
$ hdfs dfs -cat /repository/movielens/links.csv \
2>/dev/null | head -n 5
$ hdfs dfs -cat /repository/movielens/movies.csv \
2>/dev/null | head -n 5
$ hdfs dfs -cat /repository/movielens/ratings.csv \
2>/dev/null | head -n 5
$ hdfs dfs -cat /repository/movielens/tags.csv \
2>/dev/null | head -n 5
To write a MapReduce program, you have to be able to identify the necessary (Key,Value) that can contribute to the final realization of the required results. This is the reducing phase. From this (Key,Value) pair format, you will be able to develop the mapping phase.
Run the following:
$ cp -R /local/repository/codes .
codes/avgRatingMapper01.py
#!/usr/bin/env python
import sys
for oneMovie in sys.stdin:
oneMovie = oneMovie.strip()
ratingInfo = oneMovie.split(",")
movieID = ratingInfo[1]
rating = ratingInfo[2]
print ("%s\t%s" % (movieID, rating))
hdfs dfs -cat /repository/movielens/ratings.csv \
2>/dev/null | head -n 5 | python ./codes/avgRatingMapper01.py
codes/avgRatingMapper02.py
#!/usr/bin/env python
import sys
for oneMovie in sys.stdin:
oneMovie = oneMovie.strip()
ratingInfo = oneMovie.split(",")
try:
movieID = ratingInfo[1]
rating = float(ratingInfo[2])
print ("%s\t%s" % (movieID, rating))
except ValueError:
continue
$ hdfs dfs -cat /repository/movielens/ratings.csv \
2>/dev/null | head -n 5 | python ./codes/avgRatingMapper02.py
Getting additional file
$ mkdir movielens
$ hdfs dfs -get /repository/movielens/movies.csv movielens/movies.csv
codes/avgRatingMapper03.py
#!/usr/bin/env python
import sys
import csv
movieFile = "./movielens/movies.csv"
movieList = {}
with open(movieFile, mode = 'r') as infile:
reader = csv.reader(infile)
for row in reader:
movieList[row[0]] = {}
movieList[row[0]]["title"] = row[1]
movieList[row[0]]["genre"] = row[2]
for oneMovie in sys.stdin:
oneMovie = oneMovie.strip()
ratingInfo = oneMovie.split(",")
try:
movieTitle = movieList[ratingInfo[1]]["title"]
movieGenre = movieList[ratingInfo[1]]["genre"]
rating = float(ratingInfo[2])
print ("%s\t%s\t%s" % (movieTitle, rating, movieGenre))
except ValueError:
continue
$ hdfs dfs -cat /repository/movielens/ratings.csv \
2>/dev/null | head -n 5 | python ./codes/avgRatingMapper03.py
codes/avgRatingReducer01.py
#!/usr/bin/env python
import sys
current_movie = None
current_rating_sum = 0
current_rating_count = 0
for line in sys.stdin:
line = line.strip()
movie, rating, genre = line.split("\t", 2)
try:
rating = float(rating)
except ValueError:
continue
if current_movie == movie:
current_rating_sum += rating
current_rating_count += 1
else:
if current_movie:
rating_average = current_rating_sum / current_rating_count
print ("%s\t%s\t%s" % (current_movie, rating_average, genre))
current_movie = movie
current_rating_sum = rating
current_rating_count = 1
if current_movie == movie:
rating_average = current_rating_sum / current_rating_count
print ("%s\t%s\t%s" % (current_movie, rating_average, genre))
$ hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
| head -n 5 \
| python ./codes/avgRatingMapper03.py \
| sort \
| python ./codes/avgRatingReducer01.py
$ hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
| head -n 2000 \
| python ./codes/avgRatingMapper03.py \
| grep Matrix
$ hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
| head -n 2000 \
| python ./codes/avgRatingMapper03.py \
| grep Matrix \
| sort \
| python ./codes/avgRatingReducer01.py
In [ ]:
# Manual calculation check via python
(4.0+1.0+5.0)/3
$ yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
-input /repository/movielens/ratings.csv \
-output intro-to-hadoop/output-movielens-01 \
-file ./codes/avgRatingMapper03.py \
-mapper avgRatingMapper03.py \
-file ./codes/avgRatingReducer01.py \
-reducer avgRatingReducer01.py \
Go back to the first few lines of the previously and look for the INFO line Submitted application application_xxxx_xxxx. Running the logs command of yarn with the provided application ID is a straightforward way to access all available log information for that application. The syntax to view yarn log is:
$ yarn logs -applicationId APPLICATION_ID
In [ ]:
# Run the yarn view log command here
# Do not run this command in a notebook browser, it will likely crash the browser
#$ yarn logs -applicationId application_1476193845089_0123
However, this information is often massive, as it contains the aggregated logs from all tasks (map and reduce) of the job, which can be in the hundreds. The example below demonstrates this problem by displaying all the possible information of a single-task MapReduce job. In this example, the log of a container has three types of log (LogType):
One approach to reduce the number of possible output is to comment out all non-essential lines (lines containing INFO)
$ yarn logs -applicationId application_1505269880969_0056 | grep -v INFO
Can we refine the information further:
$ yarn logs -applicationId APPLICATION_ID | grep '^Container:'
$ yarn logs -applicationId application_1505269880969_0056 | grep '^Container:'
Looking at the previous report, we can further identify container information:
Container: container_XXXXXX on YYYY.palmetto.clemson.edu_ZZZZZ
To request yarn to provide a more detailed log at container level, we run:
$ yarn logs -applicationId APPLICATION_ID -containerId CONTAINER_ID --nodeAddress NODE_ADDRESS \
| grep -v INFO
$ yarn logs -applicationId application_1505269880969_0056 \ -containerId container_e30_1505269880969_0056_01_000012 \ --nodeAddress dsci035.palmetto.clemson.edu \ | grep -v INFO
This error message gives us some insights into the mechanism of Hadoop MapReduce.
codes/avgRatingMapper04.py
#!/usr/bin/env python
import sys
import csv
movieFile = "./movies.csv"
movieList = {}
with open(movieFile, mode = 'r') as infile:
reader = csv.reader(infile)
for row in reader:
movieList[row[0]] = {}
movieList[row[0]]["title"] = row[1]
movieList[row[0]]["genre"] = row[2]
for oneMovie in sys.stdin:
oneMovie = oneMovie.strip()
ratingInfo = oneMovie.split(",")
try:
movieTitle = movieList[ratingInfo[1]]["title"]
movieGenre = movieList[ratingInfo[1]]["genre"]
rating = float(ratingInfo[2])
print ("%s\t%s\t%s" % (movieTitle, rating, movieGenre))
except ValueError:
continue
$ yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
-input /repository/movielens/ratings.csv \
-output intro-to-hadoop/output-movielens-01 \
-file ./codes/avgRatingMapper04.py \
-mapper avgRatingMapper04.py \
-file ./codes/avgRatingReducer01.py \
-reducer avgRatingReducer01.py \
-file ./movielens/movies.csv
$ yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
-input /repository/movielens/ratings.csv \
-output intro-to-hadoop/output-movielens-02 \
-file ./codes/avgRatingMapper04.py \
-mapper avgRatingMapper04.py \
-file ./codes/avgRatingReducer01.py \
-reducer avgRatingReducer01.py \
-file ./movielens/movies.csv
$ hdfs dfs -ls intro-to-hadoop/output-movielens-02
$ hdfs dfs -cat intro-to-hadoop/output-movielens-02/part-00000 \
2>/dev/null | head -n 20
codes/avgRatingMapper04challenge.py
#!/usr/bin/env python
import sys
import csv
movieFile = "./movies.csv"
movieList = {}
with open(movieFile, mode = 'r') as infile:
reader = csv.reader(infile)
for row in reader:
movieList[row[0]] = {}
movieList[row[0]]["title"] = row[1]
movieList[row[0]]["genre"] = row[2]
for oneMovie in sys.stdin:
oneMovie = oneMovie.strip()
ratingInfo = oneMovie.split(",")
try:
movieTitle = movieList[ratingInfo[1]]["title"]
movieGenre = movieList[ratingInfo[1]]["genre"]
rating = float(ratingInfo[2])
if _________:
print ("%s\t%s\t%s" % (movieTitle, rating, movieGenre))
except ValueError:
continue
$ yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
-input /repository/movielens/ratings.csv \
-output intro-to-hadoop/output-movielens-challenge \
-file ____________ \
-mapper ___________ \
-file ./codes/avgRatingReducer01.py \
-reducer avgRatingReducer01.py \
-file ./codes/movielens/movies.csv
In [ ]: