First principle of optimizing Hadoop workflow: Reduce data movement in the shuffle phase
In [ ]:
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-02
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
-input /repository/movielens/ratings.csv \
-output intro-to-hadoop/output-movielens-02 \
-file ./codes/avgRatingMapper04.py \
-mapper avgRatingMapper04.py \
-file ./codes/avgRatingReducer01.py \
-reducer avgRatingReducer01.py \
-file ./movielens/movies.csv
In [ ]:
%%writefile codes/avgRatingReducer02.py
#!/usr/bin/env python
import sys
import csv
movieFile = "./movies.csv"
movieList = {}
with open(movieFile, mode = 'r') as infile:
reader = csv.reader(infile)
for row in reader:
movieList[row[0]] = {}
movieList[row[0]]["title"] = row[1]
movieList[row[0]]["genre"] = row[2]
current_movie = None
current_rating_sum = 0
current_rating_count = 0
for line in sys.stdin:
line = line.strip()
movie, rating = line.split("\t", 1)
try:
rating = float(rating)
except ValueError:
continue
if current_movie == movie:
current_rating_sum += rating
current_rating_count += 1
else:
if current_movie:
rating_average = current_rating_sum / current_rating_count
movieTitle = movieList[current_movie]["title"]
movieGenres = movieList[current_movie]["genre"]
print ("%s\t%s\t%s" % (movieTitle, rating_average, movieGenres))
current_movie = movie
current_rating_sum = rating
current_rating_count = 1
if current_movie == movie:
rating_average = current_rating_sum / current_rating_count
movieTitle = movieList[current_movie]["title"]
movieGenres = movieList[current_movie]["genre"]
print ("%s\t%s\t%s" % (movieTitle, rating_average, movieGenres))
In [ ]:
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-03
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
-input /repository/movielens/ratings.csv \
-output intro-to-hadoop/output-movielens-03 \
-file ./codes/avgRatingMapper02.py \
-mapper avgRatingMapper02.py \
-file ./codes/avgRatingReducer02.py \
-reducer avgRatingReducer02.py \
-file ./movielens/movies.csv
In [ ]:
!hdfs dfs -ls intro-to-hadoop/output-movielens-02
!hdfs dfs -ls intro-to-hadoop/output-movielens-03
In [ ]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-03/part-00000 \
2>/dev/null | head -n 10
How does the number shuffle bytes in this example compare to the previous example?
In [ ]:
%%writefile codes/avgGenreMapper01.py
#!/usr/bin/env python
import sys
import csv
# for nonHDFS run
movieFile = "./movielens/movies.csv"
# for HDFS run
#movieFile = "./movies.csv"
movieList = {}
with open(movieFile, mode = 'r') as infile:
reader = csv.reader(infile)
for row in reader:
movieList[row[0]] = {}
movieList[row[0]]["title"] = row[1]
movieList[row[0]]["genre"] = row[2]
for oneMovie in sys.stdin:
oneMovie = oneMovie.strip()
ratingInfo = oneMovie.split(",")
try:
genreList = movieList[ratingInfo[1]]["genre"]
rating = float(ratingInfo[2])
for genre in genreList.split("|"):
print ("%s\t%s" % (genre, rating))
except ValueError:
continue
In [ ]:
%%writefile codes/avgGenreReducer01.py
#!/usr/bin/env python
import sys
import csv
import json
current_genre = None
current_rating_sum = 0
current_rating_count = 0
for line in sys.stdin:
line = line.strip()
genre, rating = line.split("\t", 1)
if current_genre == genre:
try:
current_rating_sum += float(rating)
current_rating_count += 1
except ValueError:
continue
else:
if current_genre:
rating_average = current_rating_sum / current_rating_count
print ("%s\t%s" % (current_genre, rating_average))
current_genre = genre
try:
current_rating_sum = float(rating)
current_rating_count = 1
except ValueError:
continue
if current_genre == genre:
rating_average = current_rating_sum / current_rating_count
print ("%s\t%s" % (current_genre, rating_average))
In [ ]:
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-04
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
-input /repository/movielens/ratings.csv \
-output intro-to-hadoop/output-movielens-04 \
-file ./codes/avgGenreMapper01.py \
-mapper avgGenreMapper01.py \
-file ./codes/avgGenreReducer01.py \
-reducer avgGenreReducer01.py \
-file ./movielens/movies.csv
In [ ]:
!hdfs dfs -ls intro-to-hadoop/output-movielens-04
In [ ]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-04/part-00000
In [ ]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
| head -n 10
In [ ]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
| head -n 10 \
| python ./codes/avgGenreMapper01.py \
In [ ]:
%%writefile codes/avgGenreMapper02.py
#!/usr/bin/env python
import sys
import csv
import json
# for nonHDFS run
# movieFile = "./movielens/movies.csv"
# for HDFS run
movieFile = "./movies.csv"
movieList = {}
genreList = {}
with open(movieFile, mode = 'r') as infile:
reader = csv.reader(infile)
for row in reader:
movieList[row[0]] = {}
movieList[row[0]]["title"] = row[1]
movieList[row[0]]["genre"] = row[2]
for oneMovie in sys.stdin:
oneMovie = oneMovie.strip()
ratingInfo = oneMovie.split(",")
try:
genres = movieList[ratingInfo[1]]["genre"]
rating = float(ratingInfo[2])
for genre in genres.split("|"):
if genre in genreList:
genreList[genre]["total_rating"] += rating
genreList[genre]["total_count"] += 1
else:
genreList[genre] = {}
genreList[genre]["total_rating"] = rating
genreList[genre]["total_count"] = 1
except ValueError:
continue
for genre in genreList:
print ("%s\t%s" % (genre, json.dumps(genreList[genre])))
In [ ]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
| head -n 10 \
| python ./codes/avgGenreMapper02.py \
In [ ]:
%%writefile codes/avgGenreReducer02.py
#!/usr/bin/env python
import sys
import csv
import json
current_genre = None
current_rating_sum = 0
current_rating_count = 0
for line in sys.stdin:
line = line.strip()
genre, ratingString = line.split("\t", 1)
ratingInfo = json.loads(ratingString)
if current_genre == genre:
try:
current_rating_sum += ratingInfo["total_rating"]
current_rating_count += ratingInfo["total_count"]
except ValueError:
continue
else:
if current_genre:
rating_average = current_rating_sum / current_rating_count
print ("%s\t%s" % (current_genre, rating_average))
current_genre = genre
try:
current_rating_sum = ratingInfo["total_rating"]
current_rating_count = ratingInfo["total_count"]
except ValueError:
continue
if current_genre == genre:
rating_average = current_rating_sum / current_rating_count
print ("%s\t%s" % (current_genre, rating_average))
In [ ]:
!hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null \
| head -n 10 \
| python ./codes/avgGenreMapper02.py \
| sort \
| python ./codes/avgGenreReducer02.py
In [ ]:
# make sure that the path to movies.csv is correct inside avgGenreMapper02.py
!hdfs dfs -rm -R intro-to-hadoop/output-movielens-05
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
-input /repository/movielens/ratings.csv \
-output intro-to-hadoop/output-movielens-05 \
-file ./codes/avgGenreMapper02.py \
-mapper avgGenreMapper02.py \
-file ./codes/avgGenreReducer02.py \
-reducer avgGenreReducer02.py \
-file ./movielens/movies.csv
In [ ]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-05/part-00000
In [ ]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-04/part-00000
How different are the number of shuffle bytes between the two jobs?
In [4]:
!hdfs dfs -ls /repository/
In [5]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
-input /repository/complete-shakespeare.txt \
-output intro-to-hadoop/output-wordcount-01 \
-file ./codes/wordcountMapper.py \
-mapper wordcountMapper.py \
-file ./codes/wordcountReducer.py \
-reducer wordcountReducer.py
In [6]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
-input /repository/complete-shakespeare.txt \
-output intro-to-hadoop/output-wordcount-02 \
-file ./codes/wordcountMapper.py \
-mapper wordcountMapper.py \
-file ./codes/wordcountReducer.py \
-reducer wordcountReducer.py \
-combiner wordcountReducer.py
In [7]:
%%writefile codes/avgGenreCombiner.py
#!/usr/bin/env python
import sys
import csv
import json
genreList = {}
for line in sys.stdin:
line = line.strip()
genre, ratingString = line.split("\t", 1)
ratingInfo = json.loads(ratingString)
if genre in genreList:
genreList[genre]["total_rating"] += ratingInfo["total_rating"]
genreList[genre]["total_count"] += ratingInfo["total_count"]
else:
genreList[genre] = {}
genreList[genre]["total_rating"] = ratingInfo["total_rating"]
genreList[genre]["total_count"] = 1
for genre in genreList:
print ("%s\t%s" % (genre, json.dumps(genreList[genre])))
In [8]:
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-06
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
-input /repository/movielens/ratings.csv \
-output intro-to-hadoop/output-movielens-06 \
-file ./codes/avgGenreMapper02.py \
-mapper avgGenreMapper02.py \
-file ./codes/avgGenreReducer02.py \
-reducer avgGenreReducer02.py \
-file ./codes/avgGenreCombiner.py \
-combiner avgGenreCombiner.py \
-file ./movielens/movies.csv
How different are the number of shuffle bytes between the two jobs?