This notebook demonstrates how to implement a WALS matrix refactorization approach to do collaborative filtering. Unlike wals.ipynb, this notebook uses TensorFlow Transform to carry out the preprocessing. This way, these steps are automated:
Apache Beam only works in Python 2 at the moment, so we're going to switch to the Python 2 kernel. In the above menu, click the dropdown arrow and select python2.
Only specific combinations of TensorFlow, Beam and TensorFlow Transform are supported. After running the following cell, Reset the notebook.
In [ ]:
%%bash
source activate py2env
pip uninstall -y google-cloud-dataflow
conda install -y pytz==2018.4
pip install apache-beam[gcp] tensorflow_transform==0.8.0
In [3]:
%%bash
pip freeze | grep -e 'flow\|beam'
Now reset notebook's session kernel!
In [4]:
import os
PROJECT = "cloud-training-demos" # REPLACE WITH YOUR PROJECT ID
BUCKET = "cloud-training-demos-ml" # REPLACE WITH YOUR BUCKET NAME
REGION = "us-central1" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1
# Do not change these
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "1.13"
In [5]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION
In [6]:
import tensorflow as tf
print tf.__version__
For collaborative filtering, we don't need to know anything about either the users or the content. Essentially, all we need to know is userId, itemId, and rating that the particular user gave the particular item.
In this case, we are working with newspaper articles. The company doesn't ask their users to rate the articles. However, we can use the time-spent on the page as a proxy for rating.
Normally, we would also add a time filter to this ("latest 7 days"), but our dataset is itself limited to a few days.
In [ ]:
from google.cloud import bigquery
bq = bigquery.Client(project = PROJECT)
sql = """
WITH CTE_visitor_page_content AS (
SELECT
# Schema: https://support.google.com/analytics/answer/3437719?hl=en
# For a completely unique visit-session ID, we combine combination of fullVisitorId and visitNumber:
CONCAT(fullVisitorID,'-',CAST(visitNumber AS STRING)) AS visitorId,
(SELECT MAX(IF(index=10, value, NULL)) FROM UNNEST(hits.customDimensions)) AS latestContentId,
(LEAD(hits.time, 1) OVER (PARTITION BY fullVisitorId ORDER BY hits.time ASC) - hits.time) AS session_duration
FROM
`cloud-training-demos.GA360_test.ga_sessions_sample`,
UNNEST(hits) AS hits
WHERE
# only include hits on pages
hits.type = "PAGE"
GROUP BY
fullVisitorId,
visitNumber,
latestContentId,
hits.time )
-- Aggregate web stats
SELECT
visitorId,
latestContentId as contentId,
SUM(session_duration) AS session_duration
FROM
CTE_visitor_page_content
WHERE
latestContentId IS NOT NULL
GROUP BY
visitorId,
latestContentId
HAVING
session_duration > 0
"""
df = bq.query(sql).to_dataframe()
df.head()
In [ ]:
from google.cloud import bigquery
bq = bigquery.Client(project = PROJECT)
df = bq.query(query + " LIMIT 100").to_dataframe()
df.head()
In [ ]:
%writefile requirements.txt
tensorflow-transform==0.8.0
In [ ]:
import datetime
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft
from tensorflow_transform.beam import impl as beam_impl
def preprocess_tft(rowdict):
median = 57937 #tft.quantiles(rowdict["session_duration"], 11, epsilon=0.001)[5]
result = {
"userId" : tft.string_to_int(rowdict["visitorId"], vocab_filename="vocab_users"),
"itemId" : tft.string_to_int(rowdict["contentId"], vocab_filename="vocab_items"),
"rating" : 0.3 * rowdict["session_duration"] / median
}
# cap the rating at 1.0
result["rating"] = tf.where(condition = tf.less(x = result["rating"], y = tf.ones(shape = tf.shape(input = result["rating"]))),
x = result["rating"],
y = tf.ones(shape = tf.shape(input = result["rating"])))
return result
def preprocess(query, in_test_mode):
import os
import os.path
import tempfile
import tensorflow as tf
from apache_beam.io import tfrecordio
from tensorflow_transform.coders import example_proto_coder
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
def write_count(a, outdir, basename):
filename = os.path.join(outdir, basename)
(a
| "{}_1".format(basename) >> beam.Map(lambda x: (1, 1))
| "{}_2".format(basename) >> beam.combiners.Count.PerKey()
| "{}_3".format(basename) >> beam.Map(lambda (k, v): v)
| "{}_write".format(basename) >> beam.io.WriteToText(file_path_prefix=filename, num_shards=1))
def to_tfrecord(key_vlist, indexCol):
(key, vlist) = key_vlist
return {
"key": [key],
"indices": [value[indexCol] for value in vlist],
"values": [value["rating"] for value in vlist]
}
job_name = "preprocess-wals-features" + "-" + datetime.datetime.now().strftime("%y%m%d-%H%M%S")
if in_test_mode:
import shutil
print "Launching local job ... hang on"
OUTPUT_DIR = "./preproc_tft"
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
else:
print "Launching Dataflow job {} ... hang on".format(job_name)
OUTPUT_DIR = "gs://{0}/wals/preproc_tft/".format(BUCKET)
import subprocess
subprocess.call("gsutil rm -r {}".format(OUTPUT_DIR).split())
options = {
"staging_location": os.path.join(OUTPUT_DIR, "tmp", "staging"),
"temp_location": os.path.join(OUTPUT_DIR, "tmp"),
"job_name": job_name,
"project": PROJECT,
"max_num_workers": 16,
"teardown_policy": "TEARDOWN_ALWAYS",
"save_main_session": False,
"requirements_file": "requirements.txt"
}
opts = beam.pipeline.PipelineOptions(flags=[], **options)
if in_test_mode:
RUNNER = "DirectRunner"
else:
RUNNER = "DataflowRunner"
# Set up metadata
raw_data_schema = {
colname : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation())
for colname in "visitorId,contentId".split(",")
}
raw_data_schema.update({
colname : dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())
for colname in "session_duration".split(",")
})
raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))
# Run Beam
with beam.Pipeline(RUNNER, options=opts) as p:
with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, "tmp")):
# read raw data
selquery = query
if in_test_mode:
selquery = selquery + " LIMIT 100"
raw_data = (p
| "read" >> beam.io.Read(beam.io.BigQuerySource(query=selquery, use_standard_sql=True)))
# analyze and transform
raw_dataset = (raw_data, raw_data_metadata)
transformed_dataset, transform_fn = (
raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
transformed_data, transformed_metadata = transformed_dataset
_ = (transform_fn
| "WriteTransformFn" >>
transform_fn_io.WriteTransformFn(os.path.join(OUTPUT_DIR, "transform_fn")))
# do a group-by to create users_for_item and items_for_user
users_for_item = (transformed_data
| "map_items" >> beam.Map(lambda x : (x["itemId"], x))
| "group_items" >> beam.GroupByKey()
| "totfr_items" >> beam.Map(lambda item_userlist : to_tfrecord(item_userlist, "userId")))
items_for_user = (transformed_data
| "map_users" >> beam.Map(lambda x : (x["userId"], x))
| "group_users" >> beam.GroupByKey()
| "totfr_users" >> beam.Map(lambda item_userlist : to_tfrecord(item_userlist, "itemId")))
output_schema = {
"key" : dataset_schema.ColumnSchema(tf.int64, [1], dataset_schema.FixedColumnRepresentation()),
"indices": dataset_schema.ColumnSchema(tf.int64, [], dataset_schema.ListColumnRepresentation()),
"values": dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.ListColumnRepresentation())
}
_ = users_for_item | "users_for_item" >> tfrecordio.WriteToTFRecord(
os.path.join(OUTPUT_DIR, "users_for_item"),
coder = example_proto_coder.ExampleProtoCoder(
dataset_schema.Schema(output_schema)))
_ = items_for_user | "items_for_user" >> tfrecordio.WriteToTFRecord(
os.path.join(OUTPUT_DIR, "items_for_user"),
coder = example_proto_coder.ExampleProtoCoder(
dataset_schema.Schema(output_schema)))
write_count(users_for_item, OUTPUT_DIR, "nitems")
write_count(items_for_user, OUTPUT_DIR, "nusers")
preprocess(query, in_test_mode=False)
In [ ]:
%%bash
gsutil ls gs://${BUCKET}/wals/preproc_tft/
In [ ]:
%%bash
gsutil ls gs://${BUCKET}/wals/preproc_tft/transform_fn/transform_fn/
gsutil ls gs://${BUCKET}/wals/preproc_tft/transform_fn/transform_fn/assets/
To summarize, we created the following data files from the BiqQuery resultset:
Once you have the dataset, do matrix factorization with WALS using the WALSMatrixFactorization in the contrib directory. This is an estimator model, so it should be relatively familiar.
As usual, we write an input_fn to provide the data to the model, and then create the Estimator to do train_and_evaluate. Because it is in contrib and hasn't moved over to tf.estimator yet, we use tf.contrib.learn.Experiment to handle the training loop.
In [ ]:
import os
import tensorflow as tf
from tensorflow.python.lib.io import file_io
from tensorflow.contrib.factorization import WALSMatrixFactorization
def read_dataset(mode, args):
def decode_example(protos, vocab_size):
features = {
"key": tf.FixedLenFeature(shape = [1], dtype = tf.int64),
"indices": tf.VarLenFeature(dtype = tf.int64),
"values": tf.VarLenFeature(dtype = tf.float32)}
parsed_features = tf.parse_single_example(serialized = protos, features = features)
values = tf.sparse_merge(sp_ids = parsed_features["indices"], sp_values = parsed_features["values"], vocab_size = vocab_size)
# Save key to remap after batching
# This is a temporary workaround to assign correct row numbers in each batch.
# You can ignore details of this part and remap_keys().
key = parsed_features["key"]
decoded_sparse_tensor = tf.SparseTensor(indices = tf.concat(values = [values.indices, [key]], axis = 0),
values = tf.concat(values = [values.values, [0.0]], axis = 0),
dense_shape = values.dense_shape)
return decoded_sparse_tensor
def remap_keys(sparse_tensor):
# Current indices of our SparseTensor that we need to fix
bad_indices = sparse_tensor.indices # shape = (current_batch_size * (number_of_items/users[i] + 1), 2)
# Current values of our SparseTensor that we need to fix
bad_values = sparse_tensor.values # shape = (current_batch_size * (number_of_items/users[i] + 1),)
# Since batch is ordered, the last value for a batch index is the user
# Find where the batch index chages to extract the user rows
# 1 where user, else 0
user_mask = tf.concat(values = [bad_indices[1:,0] - bad_indices[:-1,0], tf.constant(value = [1], dtype = tf.int64)], axis = 0) # shape = (current_batch_size * (number_of_items/users[i] + 1), 2)
# Mask out the user rows from the values
good_values = tf.boolean_mask(tensor = bad_values, mask = tf.equal(x = user_mask, y = 0)) # shape = (current_batch_size * number_of_items/users[i],)
item_indices = tf.boolean_mask(tensor = bad_indices, mask = tf.equal(x = user_mask, y = 0)) # shape = (current_batch_size * number_of_items/users[i],)
user_indices = tf.boolean_mask(tensor = bad_indices, mask = tf.equal(x = user_mask, y = 1))[:, 1] # shape = (current_batch_size,)
good_user_indices = tf.gather(params = user_indices, indices = item_indices[:,0]) # shape = (current_batch_size * number_of_items/users[i],)
# User and item indices are rank 1, need to make rank 1 to concat
good_user_indices_expanded = tf.expand_dims(input = good_user_indices, axis = -1) # shape = (current_batch_size * number_of_items/users[i], 1)
good_item_indices_expanded = tf.expand_dims(input = item_indices[:, 1], axis = -1) # shape = (current_batch_size * number_of_items/users[i], 1)
good_indices = tf.concat(values = [good_user_indices_expanded, good_item_indices_expanded], axis = 1) # shape = (current_batch_size * number_of_items/users[i], 2)
remapped_sparse_tensor = tf.SparseTensor(indices = good_indices, values = good_values, dense_shape = sparse_tensor.dense_shape)
return remapped_sparse_tensor
def parse_tfrecords(filename, vocab_size):
if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
else:
num_epochs = 1 # end-of-input after this
files = tf.gfile.Glob(filename = os.path.join(args["input_path"], filename))
# Create dataset from file list
dataset = tf.data.TFRecordDataset(files)
dataset = dataset.map(map_func = lambda x: decode_example(x, vocab_size))
dataset = dataset.repeat(count = num_epochs)
dataset = dataset.batch(batch_size = args["batch_size"])
dataset = dataset.map(map_func = lambda x: remap_keys(x))
return dataset.make_one_shot_iterator().get_next()
def _input_fn():
features = {
WALSMatrixFactorization.INPUT_ROWS: parse_tfrecords('items_for_user-*-of-*', args['nitems']),
WALSMatrixFactorization.INPUT_COLS: parse_tfrecords('users_for_item-*-of-*', args['nusers']),
WALSMatrixFactorization.PROJECT_ROW: tf.constant(True)
}
return features, None
return _input_fn
In [ ]:
def find_top_k(user, item_factors, k):
all_items = tf.matmul(a = tf.expand_dims(input = user, axis = 0), b = tf.transpose(a = item_factors))
topk = tf.nn.top_k(input = all_items, k = k)
return tf.cast(x = topk.indices, dtype = tf.int64)
def batch_predict(args):
import numpy as np
# Read vocabulary into Python list for quick index-ed lookup
def create_lookup(filename):
from tensorflow.python.lib.io import file_io
dirname = os.path.join(args["input_path"], "transform_fn/transform_fn/assets/")
with file_io.FileIO(os.path.join(dirname, filename), mode = "r") as ifp:
return [x.rstrip() for x in ifp]
originalItemIds = create_lookup("vocab_items")
originalUserIds = create_lookup("vocab_users")
with tf.Session() as sess:
estimator = tf.contrib.factorization.WALSMatrixFactorization(
num_rows = args["nusers"],
num_cols = args["nitems"],
embedding_dimension = args["n_embeds"],
model_dir = args["output_dir"])
# But for in-vocab data, the row factors are already in the checkpoint
user_factors = tf.convert_to_tensor(value = estimator.get_row_factors()[0]) # (nusers, nembeds)
# In either case, we have to assume catalog doesn"t change, so col_factors are read in
item_factors = tf.convert_to_tensor(value = estimator.get_col_factors()[0])# (nitems, nembeds)
# For each user, find the top K items
topk = tf.squeeze(input = tf.map_fn(fn = lambda user: find_top_k(user, item_factors, args["topk"]), elems = user_factors, dtype = tf.int64))
with file_io.FileIO(os.path.join(args["output_dir"], "batch_pred.txt"), mode = 'w') as f:
for userId, best_items_for_user in enumerate(topk.eval()):
f.write(originalUserIds[userId] + "\t") # write userId \t item1,item2,item3...
f.write(','.join(originalItemIds[itemId] for itemId in best_items_for_user) + '\n')
def train_and_evaluate(args):
train_steps = int(0.5 + (1.0 * args["num_epochs"] * args["nusers"]) / args["batch_size"])
steps_in_epoch = int(0.5 + args["nusers"] / args["batch_size"])
print("Will train for {} steps, evaluating once every {} steps".format(train_steps, steps_in_epoch))
def experiment_fn(output_dir):
return tf.contrib.learn.Experiment(
tf.contrib.factorization.WALSMatrixFactorization(
num_rows = args["nusers"],
num_cols = args["nitems"],
embedding_dimension = args["n_embeds"],
model_dir = args["output_dir"]),
train_input_fn = read_dataset(tf.estimator.ModeKeys.TRAIN, args),
eval_input_fn = read_dataset(tf.estimator.ModeKeys.EVAL, args),
train_steps = train_steps,
eval_steps = 1,
min_eval_frequency = steps_in_epoch
)
from tensorflow.contrib.learn.python.learn import learn_runner
learn_runner.run(experiment_fn = experiment_fn, output_dir = args["output_dir"])
batch_predict(args)
In [ ]:
import shutil
shutil.rmtree(path = "wals_trained", ignore_errors=True)
train_and_evaluate({
"output_dir": "wals_trained",
"input_path": "gs://{}/wals/preproc_tft".format(BUCKET),
"num_epochs": 0.05,
"nitems": 5668,
"nusers": 82802,
"batch_size": 512,
"n_embeds": 10,
"topk": 3
})
In [ ]:
!ls wals_trained
In [ ]:
!head wals_trained/batch_pred.txt
In [ ]:
%%bash
NITEMS=$(gsutil cat gs://${BUCKET}/wals/preproc_tft/nitems-00000-of-00001)
NUSERS=$(gsutil cat gs://${BUCKET}/wals/preproc_tft/nusers-00000-of-00001)
rm -rf wals_trained
export PYTHONPATH=${PYTHONPATH}:${PWD}/wals_tft
python -m trainer.task \
--output_dir=${PWD}/wals_trained \
--input_path=gs://${BUCKET}/wals/preproc_tft \
--num_epochs=0.01 --nitems=$NITEMS --nusers=$NUSERS \
--job-dir=./tmp
In [ ]:
%%bash
OUTDIR=gs://${BUCKET}/wals_tft/model_trained
JOBNAME=wals_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
NITEMS=$(gsutil cat gs://${BUCKET}/wals/preproc_tft/nitems-00000-of-00001)
NUSERS=$(gsutil cat gs://${BUCKET}/wals/preproc_tft/nusers-00000-of-00001)
gcloud ml-engine jobs submit training $JOBNAME \
--region=$REGION \
--module-name=trainer.task \
--package-path=${PWD}/wals_tft/trainer \
--job-dir=$OUTDIR \
--staging-bucket=gs://$BUCKET \
--scale-tier=BASIC_GPU \
--runtime-version=1.6 \
-- \
--output_dir=$OUTDIR \
--input_path=gs://${BUCKET}/wals/preproc_tft \
--num_epochs=10 --nitems=$NITEMS --nusers=$NUSERS
This took 10 minutes for me.
In [ ]:
def get_factors(args):
with tf.Session() as sess:
estimator = tf.contrib.factorization.WALSMatrixFactorization(
num_rows = args["nusers"],
num_cols = args["nitems"],
embedding_dimension = args["n_embeds"],
model_dir = args["output_dir"])
row_factors = estimator.get_row_factors()[0]
col_factors = estimator.get_col_factors()[0]
return row_factors, col_factors
In [ ]:
args = {
"output_dir": "gs://{}/wals_tft/model_trained".format(BUCKET),
"nitems": 5668,
"nusers": 82802,
"n_embeds": 10
}
user_embeddings, item_embeddings = get_factors(args)
print user_embeddings[:3]
print item_embeddings[:3]
You can visualize the embedding vectors using dimensional reduction techniques such as PCA.
In [ ]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.decomposition import PCA
pca = PCA(n_components = 3)
pca.fit(user_embeddings)
user_embeddings_pca = pca.transform(user_embeddings)
fig = plt.figure(figsize = (8,8))
ax = fig.add_subplot(111, projection = "3d")
xs, ys, zs = user_embeddings_pca[::150].T
ax.scatter(xs, ys, zs)
# Copyright 2018 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.
In [ ]: