Important: Before running this notebook, ensure you have installed spark-cloudant 1.6.4 by running the notebook: Step 05 - Install Spark Cloudant
Next, add your cloudant credentials below, delete the hash before the 'echo' command and run the cell to save your credentials
In [ ]:
! # echo '{ "username": "changeme", "password": "changeme", "host": "changeme", "port": 443, "url": "changeme" }' > cloudant_credentials.json
In [ ]:
! python -c 'import cloudant' || pip install cloudant --user
In [ ]:
# utility method for timestamps
import time
def ts():
return time.strftime("%Y-%m-%d %H:%M:%S %Z")
In [ ]:
# utility method for logging
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger("CloudantRecommender")
def info(*args):
# sends output to notebook
print(args)
# sends output to kernel log file
LOGGER.info(args)
def error(*args):
# sends output to notebook
print(args)
# sends output to kernel log file
LOGGER.error(args)
In [ ]:
# utility class for holding cloudant connection details
import json
def set_attr_if_exists(obj, data, k):
try:
setattr(obj, k, data[k])
except AttributeError:
pass
class CloudantConfig:
def __init__(self, database, json_file=None, host=None, username=None, password=None):
self.database = database
self.host = None
self.username = None
self.password = None
with open(json_file) as data_file:
data = json.load(data_file)
set_attr_if_exists(self, data, 'host')
set_attr_if_exists(self, data, 'username')
set_attr_if_exists(self, data, 'password')
# override json attributes if provided
if host: self.host = host
if username: self.username = username
if password: self.password = password
In [ ]:
sourceDB = CloudantConfig(
json_file='cloudant_credentials.json',
database="ratingdb"
)
In [ ]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.sql.functions import monotonically_increasing_id
import json
import numpy as np
# we use the cloudant python library to save the recommendations
from cloudant.client import Cloudant
from cloudant.adapters import Replay429Adapter
class CloudantMovieRecommender:
def __init__(self, sc):
self.sc = sc
def train(self, sourceDB):
info("Starting load from Cloudant: ", ts())
dfReader = sqlContext.read.format("com.cloudant.spark")
dfReader.option("cloudant.host", sourceDB.host)
dfReader.option("schemaSampleSize", -1)
if sourceDB.username:
dfReader.option("cloudant.username", sourceDB.username)
if sourceDB.password:
dfReader.option("cloudant.password", sourceDB.password)
df = dfReader.load(sourceDB.database).cache()
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
# the Cloudant userId may be a string type
def get_cloudant_user_id(data):
(user_id, _) = data.split('/')
return user_id.replace('user_', '')
# extract the user id from the cloudant record
cloudant_user_id = udf(lambda _id: get_cloudant_user_id(_id), StringType())
df = df.withColumn("cloudant_user_id", cloudant_user_id(df['_id']))
def get_movie_id(data):
(_, movie_id) = data.split('/')
return long(movie_id.replace('movie_', ''))
# extract the movie id from the cloudant record
movie_id = udf(lambda _id: get_movie_id(_id), IntegerType())
df = df.withColumn("movie_id", movie_id(df['_id']))
# The Cloudant user Id is a string type (uuid), but ALS requires an int type
# so we create a new integer user ID field here. The generated recommendations
# will have the new user ID, so we will need to map it back to the original
# Cloudant user Id when we have finished.
df_user_ids = df.select("cloudant_user_id").distinct().cache()
# create a new schema with user_id field appended
newSchema = StructType([StructField("als_user_id", IntegerType(), False)]
+ df_user_ids.schema.fields)
# zip with the index, map it to a dictionary which includes new field
df_user_ids = df_user_ids.rdd.zipWithIndex()\
.map(lambda (row, id): {k:v
for k, v
in row.asDict().items() + [("als_user_id", id)]})\
.toDF(newSchema)
# add the new user id (als_user_id) to the dataframe
df = df.join(df_user_ids, df.cloudant_user_id == df_user_ids.cloudant_user_id)
# save the dataframe as we will need to map back from the
# new user id to the orgional Cloudant user id.
self.df_user_ids = df_user_ids.cache()
info("Finished load from Cloudant: ", ts())
info("Found", df.count(), "records in Cloudant")
# convert cloudant docs into Rating objects
ratings = df.map( lambda row: Rating(row.als_user_id, row.movie_id, float(row.rating)) )
rank = 50
numIterations = 20
lambdaParam = 0.1
info("Starting train model: ", ts())
self.model = ALS.train(ratings, rank, numIterations, lambdaParam)
info("Finished train model: ", ts())
def get_top_recommendations(self):
info("Starting __get_top_recommendations: ", ts())
df = self.model.recommendProductsForUsers(10).toDF()
df.cache()
info("Finished __get_top_recommendations: ", ts())
return df
def del_old_recommendationdbs(self, cloudant_client, db_name_prefix):
dbs_to_del = cloudant_client.all_dbs()
# only delete dbs we are using for recommendations
dbs_to_del = [db for db in dbs_to_del if db.startswith(db_name_prefix + '_') ]
# ensure the list is in timestamp order
dbs_to_del.sort()
# keeping the last 5 dbs and delete the rest
for db in dbs_to_del[:-5]:
cloudant_client.delete_database(db)
info("Deleted old recommendations db", db)
def update_meta_document(self, cloudant_client, meta_db_name, latest_db_name):
meta_db = cloudant_client[meta_db_name]
from datetime import datetime
ts = datetime.utcnow().isoformat()
try:
# update doc if exists
meta_doc = meta_db['recommendation_metadata']
meta_doc['latest_db'] = latest_db_name
meta_doc['timestamp_utc'] = ts
meta_doc.save()
info("Updated recommendationdb metadata record with latest_db", latest_db_name, meta_doc)
except KeyError:
# create a new doc
data = {
'_id': 'recommendation_metadata',
'latest_db': latest_db_name,
'timestamp_utc': ts,
}
meta_doc = meta_db.create_document(data)
meta_doc.save()
if meta_doc.exists():
info("Saved recommendationdb metadata record", str(data))
# save product features to enable later generationg of Vt
# see: http://stackoverflow.com/questions/41537470/als-model-how-to-generate-full-u-vt-v
pf = self.model.productFeatures().sortByKey()
pf_keys = json.dumps(pf.sortByKey().keys().collect())
pf_vals = json.dumps(pf.sortByKey().map(lambda x: list(x[1])).collect())
# the pf_keys/pf_vals are too big and exceed the >1mb document size limit
# so we save them as attachments
meta_doc.put_attachment(
attachment='product_feature_keys',
content_type='application/json',
data=pf_keys
)
meta_doc.put_attachment(
attachment='product_feature_vals',
content_type='application/json',
data=pf_vals
)
def create_recommendationdb(self, cloudant_client):
# create a database for recommendations
import time
db_name = destDB.database + '_' + str(int(time.time()))
db = cloudant_client.create_database(db_name)
info("Created new recommendations db", db_name)
return db
def save_recommendations(self, destDB):
df = movieRecommender.get_top_recommendations()
cloudant_client = Cloudant(
destDB.username,
destDB.password,
account=destDB.username,
adapter=Replay429Adapter(retries=10, initialBackoff=1)
)
cloudant_client.connect()
self.del_old_recommendationdbs(cloudant_client, destDB.database)
recommendations_db = self.create_recommendationdb(cloudant_client)
# get the user ids stored in cloudant so that we can map them back
user_id_map = {row[0]: row[1] for row in self.df_user_ids.collect() }
# reformat data for saving
docs = df.map(lambda x: {'_id':str(user_id_map[x[0]]), 'recommendations':x[1]}).collect()
# we could hit cloudant resource limits if trying to save entire doc
# so we save it in smaller sized chunks
for i in range(0, len(docs), 100):
chunk = docs[i:i + 100]
recommendations_db.bulk_docs(chunk) # TODO check for errors saving the chunk
info("Saved recommendations chunk", i, ts())
self.update_meta_document(cloudant_client, destDB.database, recommendations_db.database_name)
info("Saved recommendations to: ", recommendations_db.database_name, ts())
cloudant_client.disconnect()
Now the code to start the recommender ...
In [ ]:
sourceDB = CloudantConfig(
json_file='cloudant_credentials.json',
database="ratingdb"
)
destDB = CloudantConfig(
json_file='cloudant_credentials.json',
database="recommendationdb",
)
import traceback
try:
movieRecommender = CloudantMovieRecommender(sc)
movieRecommender.train(sourceDB)
movieRecommender.save_recommendations(destDB)
except Exception as e:
error(str(e), traceback.format_exc(), ts())
raise e
After you have successfully run the notebook interactively you can schedule your notebook to run on a timer, for example hourly.
Timer jobs work again a specific saved version of a notebook. If you haven't saved a version when you first create the schedule a version will be saved for you.
Note that if you make changes to your notebook, you need to save a new version and re-schedule the notebook with the new version of the notebook selected in the schedule configuration form.
In [ ]:
# Get the local time on the cluster
! date
In [ ]:
# dump the latest kernel log
! cat $(ls -1 $HOME/logs/notebook/*pyspark* | sort -r | head -1)
In [ ]:
# look for our log output in the latest kernel log file
! grep 'CloudantRecommender' $(ls -1 $HOME/logs/notebook/*pyspark* | sort -r | head -1)
In [ ]:
# look for our log output in all kernel log files
! grep 'CloudantRecommender' $HOME/logs/notebook/*pyspark*
In [ ]:
! ls -l $HOME/logs/notebook/*pyspark*
In [ ]: