ml-latest-small
dataset of 100k ratings, 9k movies and 700 usersNote set up index mappings before loading data
Using Spark 1.6.1
In [24]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *
ms_ts = udf(lambda x: int(x) * 1000, LongType())
In [ ]:
import csv
from pyspark.sql.types import *
with open("data/ml-latest-small/ratings.csv") as f:
reader = csv.reader(f)
cols = reader.next()
ratings = [l for l in reader]
ratings_df = sqlContext.createDataFrame(ratings, cols) \
.select("userId", "movieId", col("rating").cast(DoubleType()), ms_ts("timestamp").alias("timestamp"))
ratings_df.write.format("org.elasticsearch.spark.sql").save("demo/ratings")
In [30]:
import names
# define UDF to create random user names
random_name = udf(lambda x: names.get_full_name(), StringType())
In [31]:
users = ratings_df.select("userId").distinct().select("userId", random_name("userId").alias("name"))
users.write.format("org.elasticsearch.spark.sql").option("es.mapping.id", "userId").save("demo/users")
In [8]:
with open("data/ml-latest-small/movies.csv") as f:
reader = csv.reader(f)
cols = reader.next()
raw_movies = sqlContext.createDataFrame([l for l in reader], cols)
with open("data/ml-latest-small/links.csv") as f:
reader = csv.reader(f)
cols = reader.next()
link_data = sqlContext.createDataFrame([l for l in reader], cols)
movie_data = raw_movies.join(link_data, raw_movies.movieId == link_data.movieId)\
.select(raw_movies.movieId, raw_movies.title, raw_movies.genres, link_data.tmdbId)
num_movies = movie_data.count()
movie_data.show(5)
data = movie_data.collect()
In [9]:
import tmdbsimple as tmdb
tmdb.API_KEY = 'YOUR_KEY'
# base URL for TMDB poster images
IMAGE_URL = 'https://image.tmdb.org/t/p/w500'
import csv
from requests import HTTPError
In [ ]:
enriched = []
i = 0
for row in data:
try:
m = tmdb.Movies(row.tmdbId).info()
poster_url = IMAGE_URL + m['poster_path'] if 'poster_path' in m and m['poster_path'] is not None else ""
movie = {
"movieId": row.movieId,
"title": m['title'],
"originalTitle": row.title,
"genres": row.genres,
"overview": m['overview'],
"release_date": m['release_date'],
"popularity": m['popularity'],
"original_language": m['original_language'],
"image_url": poster_url
}
enriched.append(movie)
except HTTPError as e:
print "Encountered error: %s for movieId=%d title=%s" % (e, row.movieId, row.title)
movie = {
"movieId": row.movieId,
"title": row.title,
"originalTitle": row.title,
"genres": row.genres,
"overview": "",
"release_date": "",
"popularity": 0,
"original_language": "",
"image_url": ""
}
enriched.append(movie)
i += 1
if i % 1 == 0: print "Enriched movie %s of %s" % (i, num_movies)
In [52]:
from elasticsearch import Elasticsearch
es = Elasticsearch()
for m in enriched:
if 'release_date' in m and m['release_date'] == "": m.pop('release_date')
es.index("demo", "movies", id=m['movieId'], body=m)