Wide and Deep Learning Model, proposed by Google in 2016, is a DNN-Linear mixed model. Wide and deep learning has been used for Google App Store for their app recommendation.
In this tutorial, we use Recommender API of Analytics Zoo to build a wide linear model and a deep neural network, which is called Wide&Deep model, and use optimizer of BigDL to train the neural network. Wide&Deep model combines the strength of memorization and generalization. It's useful for generic large-scale regression and classification problems with sparse input features (e.g., categorical features with a large number of possible feature values).
import necessary libraries
In [1]:
from zoo.models.recommendation import *
from zoo.models.recommendation.utils import *
from zoo.common.nncontext import init_nncontext
import os
import sys
import datetime as dt
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
%pylab inline
Initilaize NN context, it will get a SparkContext with optimized configuration for BigDL performance.
In [2]:
sc = init_nncontext("WideAndDeep Example")
Download and read movielens 1M rating data, understand the dimension.
In [3]:
from bigdl.dataset import movielens
movielens_data = movielens.get_id_ratings("/tmp/movielens/")
min_user_id = np.min(movielens_data[:,0])
max_user_id = np.max(movielens_data[:,0])
min_movie_id = np.min(movielens_data[:,1])
max_movie_id = np.max(movielens_data[:,1])
rating_labels= np.unique(movielens_data[:,2])
print(movielens_data.shape)
print(min_user_id, max_user_id, min_movie_id, max_movie_id, rating_labels)
Transform ratings into dataframe, read user and item data into dataframes. Transform labels to zero-based since the original labels start from 1.
In [4]:
sqlContext = SQLContext(sc)
from pyspark.sql.types import *
from pyspark.sql import Row
Rating = Row("userId", "itemId", "label")
User = Row("userId", "gender", "age" ,"occupation")
Item = Row("itemId", "title" ,"genres")
ratings = sc.parallelize(movielens_data)\
.map(lambda l: (int(l[0]), int(l[1]), int(l[2])-1))\
.map(lambda r: Rating(*r))
ratingDF = sqlContext.createDataFrame(ratings)
users= sc.textFile("/tmp/movielens/ml-1m/users.dat")\
.map(lambda l: l.split("::")[0:4])\
.map(lambda l: (int(l[0]), l[1], int(l[2]), int(l[3])))\
.map(lambda r: User(*r))
userDF = sqlContext.createDataFrame(users)
items = sc.textFile("/tmp/movielens/ml-1m/movies.dat")\
.map(lambda l: l.split("::")[0:3])\
.map(lambda l: (int(l[0]), l[1], l[2].split('|')[0]))\
.map(lambda r: Item(*r))
itemDF = sqlContext.createDataFrame(items)
Join data together, and transform data. For example, gender is going be used as categorical feature, occupation and gender will be used as crossed features.
In [5]:
from pyspark.sql.functions import col, udf
gender_udf = udf(lambda gender: categorical_from_vocab_list(gender, ["F", "M"], start=1))
bucket_cross_udf = udf(lambda feature1, feature2: hash_bucket(str(feature1) + "_" + str(feature2), bucket_size=100))
genres_list = ["Crime", "Romance", "Thriller", "Adventure", "Drama", "Children's",
"War", "Documentary", "Fantasy", "Mystery", "Musical", "Animation", "Film-Noir", "Horror",
"Western", "Comedy", "Action", "Sci-Fi"]
genres_udf = udf(lambda genres: categorical_from_vocab_list(genres, genres_list, start=1))
allDF = ratingDF.join(userDF, ["userId"]).join(itemDF, ["itemId"]) \
.withColumn("gender", gender_udf(col("gender")).cast("int")) \
.withColumn("age-gender", bucket_cross_udf(col("age"), col("gender")).cast("int")) \
.withColumn("genres", genres_udf(col("genres")).cast("int"))
allDF.show(5)
Speficy data feature information shared by the WideAndDeep model and its feature generation. Here, we use occupation gender for wide base part, age and gender crossed as wide cross part, genres and gender as indicators, userid and itemid for embedding.
In [6]:
bucket_size = 100
column_info = ColumnFeatureInfo(
wide_base_cols=["occupation", "gender"],
wide_base_dims=[21, 3],
wide_cross_cols=["age-gender"],
wide_cross_dims=[bucket_size],
indicator_cols=["genres", "gender"],
indicator_dims=[19, 3],
embed_cols=["userId", "itemId"],
embed_in_dims=[max_user_id, max_movie_id],
embed_out_dims=[64, 64],
continuous_cols=["age"])
Transform data to RDD of Sample. We use optimizer of BigDL directly to train the model, it requires data to be provided in format of RDD(Sample). A Sample is a BigDL data structure which can be constructed using 2 numpy arrays, feature and label respectively. The API interface is Sample.from_ndarray(feature, label). Wide&Deep model need two input tensors, one is SparseTensor for the Wide model, another is a DenseTensor for the Deep model.
In [7]:
rdds = allDF.rdd.map(lambda row: to_user_item_feature(row, column_info))
trainPairFeatureRdds, valPairFeatureRdds = rdds.randomSplit([0.8, 0.2], seed= 1)
valPairFeatureRdds.persist()
train_data= trainPairFeatureRdds.map(lambda pair_feature: pair_feature.sample)
test_data= valPairFeatureRdds.map(lambda pair_feature: pair_feature.sample)
In Analytics Zoo, it is simple to build Wide&Deep model by calling WideAndDeep API. You need specify model type, and class number, as well as column information of features according to your data. You can also change other default parameters in the network, like hidden layers. The model could be fed into an Optimizer of BigDL or NNClassifier of analytics-zoo. Please refer to the document for more details. In this example, we demostrate how to use optimizer of BigDL.
In [8]:
wide_n_deep = WideAndDeep(5, column_info, "wide_n_deep")
In [9]:
wide_n_deep.compile(optimizer = "adam",
loss= "sparse_categorical_crossentropy",
metrics=['accuracy'])
In [10]:
tmp_log_dir = create_tmp_path()
wide_n_deep.set_tensorboard(tmp_log_dir, "training_wideanddeep")
Train the network. Wait some time till it finished.. Voila! You've got a trained model
In [11]:
%%time
# Boot training process
wide_n_deep.fit(train_data,
batch_size = 8000,
nb_epoch = 10,
validation_data = test_data)
print("Optimization Done.")
Zoo models make inferences based on the given data using model.predict(val_rdd) API. A result of RDD is returned. predict_class returns the predicted label.
In [12]:
results = wide_n_deep.predict(test_data)
results.take(5)
results_class = wide_n_deep.predict_class(test_data)
results_class.take(5)
Out[12]:
In the Analytics Zoo, Recommender has provied 3 unique APIs to predict user-item pairs and make recommendations for users or items given candidates. Predict for user item pairs
In [13]:
userItemPairPrediction = wide_n_deep.predict_user_item_pair(valPairFeatureRdds)
for result in userItemPairPrediction.take(5): print(result)
Recommend 3 items for each user given candidates in the feature RDDs
In [14]:
userRecs = wide_n_deep.recommend_for_user(valPairFeatureRdds, 3)
for result in userRecs.take(5): print(result)
Recommend 3 users for each item given candidates in the feature RDDs
In [15]:
itemRecs = wide_n_deep.recommend_for_item(valPairFeatureRdds, 3)
for result in itemRecs.take(5): print(result)
In [16]:
#retrieve train and validation summary object and read the loss data into ndarray's.
train_loss = np.array(wide_n_deep.get_train_summary("Loss"))
val_loss = np.array(wide_n_deep.get_validation_summary("Loss"))
#plot the train and validation curves
# each event data is a tuple in form of (iteration_count, value, timestamp)
plt.figure(figsize = (12,6))
plt.plot(train_loss[:,0],train_loss[:,1],label='train loss')
plt.plot(val_loss[:,0],val_loss[:,1],label='val loss',color='green')
plt.scatter(val_loss[:,0],val_loss[:,1],color='green')
plt.legend();
plt.xlim(0,train_loss.shape[0]+10)
plt.grid(True)
plt.title("loss")
Out[16]:
plot accuracy
In [17]:
plt.figure(figsize = (12,6))
top1 = np.array(wide_n_deep.get_validation_summary("Top1Accuracy"))
plt.plot(top1[:,0],top1[:,1],label='top1')
plt.title("top1 accuracy")
plt.grid(True)
plt.legend();
plt.xlim(0,train_loss.shape[0]+10)
Out[17]:
In [18]:
valPairFeatureRdds.unpersist()
Out[18]:
In [19]:
sc.stop()
In [ ]: