This demonstrates using SystemML for product recommendationg using Poisson NonNegative Matrix Factorization (PNMF) with PNMF algorithm written using R like syntax. This includes following steps:
In [ ]:
!pip show systemml
In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from systemml import MLContext, dml # pip install systeml
plt.rcParams['figure.figsize'] = (10, 6)
In [ ]:
%%sh
# Download dataset
curl -O http://snap.stanford.edu/data/amazon0601.txt.gz
gunzip amazon0601.txt.gz
In [ ]:
# Load data
import pyspark.sql.functions as F
dataPath = "amazon0601.txt"
X_train = (sc.textFile(dataPath)
.filter(lambda l: not l.startswith("#"))
.map(lambda l: l.split("\t"))
.map(lambda prods: (int(prods[0]), int(prods[1]), 1.0))
.toDF(("prod_i", "prod_j", "x_ij"))
.filter("prod_i < 500 AND prod_j < 500") # Filter for memory constraints
.cache())
max_prod_i = X_train.select(F.max("prod_i")).first()[0]
max_prod_j = X_train.select(F.max("prod_j")).first()[0]
numProducts = max(max_prod_i, max_prod_j) + 1 # 0-based indexing
print("Total number of products: {}".format(numProducts))
In [ ]:
# Create SystemML MLContext
ml = MLContext(sc)
In [ ]:
# Define PNMF kernel in SystemML's DSL using the R-like syntax for PNMF
pnmf = """
# data & args
X = X+1 # change product IDs to be 1-based, rather than 0-based
V = table(X[,1], X[,2])
size = ifdef($size, -1)
if(size > -1) {
V = V[1:size,1:size]
}
n = nrow(V)
m = ncol(V)
range = 0.01
W = Rand(rows=n, cols=rank, min=0, max=range, pdf="uniform")
H = Rand(rows=rank, cols=m, min=0, max=range, pdf="uniform")
losses = matrix(0, rows=max_iter, cols=1)
# run PNMF
i=1
while(i <= max_iter) {
# update params
H = (H * (t(W) %*% (V/(W%*%H))))/t(colSums(W))
W = (W * ((V/(W%*%H)) %*% t(H)))/t(rowSums(H))
# compute loss
losses[i,] = -1 * (sum(V*log(W%*%H)) - as.scalar(colSums(W)%*%rowSums(H)))
i = i + 1;
}
"""
In [ ]:
# Run the PNMF script on SystemML with Spark
script = dml(pnmf).input(X=X_train, max_iter=100, rank=10).output("W", "H", "losses")
W, H, losses = ml.execute(script).get("W", "H", "losses")
In [ ]:
# Plot training loss over time
xy = losses.toDF().sort("__INDEX").rdd.map(lambda r: (r[0], r[1])).collect()
x, y = zip(*xy)
plt.plot(x, y)
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('PNMF Training Loss')