Generate Recommendations with Spark

Using Alternating Least Squares (ALS) from Spark ML

Configure Spark


In [1]:
import os

master = '--master local[1]'
#master = '--master spark://apachespark-master-2-1-0:7077'
conf = '--conf spark.cores.max=1 --conf spark.executor.memory=1024m'
packages = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1'
jars = '--jars /root/lib/jpmml-sparkml-package-1.0-SNAPSHOT.jar'
py_files = '--py-files /root/lib/jpmml.py'

os.environ['PYSPARK_SUBMIT_ARGS'] = master \
  + ' ' + conf \
  + ' ' + packages \
  + ' ' + jars \
  + ' ' + py_files \
  + ' ' + 'pyspark-shell'

print(os.environ['PYSPARK_SUBMIT_ARGS'])


--master local[1] --conf spark.cores.max=1 --conf spark.executor.memory=1024m --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1 --jars /root/lib/jpmml-sparkml-package-1.0-SNAPSHOT.jar --py-files /root/lib/jpmml.py pyspark-shell

Create Spark Session


In [2]:
from pyspark.sql import SparkSession

spark_session = SparkSession.builder.getOrCreate()

In [3]:
df = spark_session.read.format('csv') \
      .options(header='true', inferSchema='true') \
      .load('s3a://datapalooza/movielens/ml-latest/ratings-sm.csv')


(df_train, df_test) = df.randomSplit([0.8, 0.2])

In [4]:
df_train.take(5)


Out[4]:
[Row(userId=1, movieId=50, rating=4.0, timestamp=1329753504),
 Row(userId=1, movieId=318, rating=4.5, timestamp=1329753494),
 Row(userId=1, movieId=527, rating=4.5, timestamp=1329753507),
 Row(userId=1, movieId=750, rating=3.0, timestamp=1329753525),
 Row(userId=1, movieId=858, rating=4.5, timestamp=1329753498)]

In [5]:
df_test.take(5)


Out[5]:
[Row(userId=1, movieId=296, rating=4.0, timestamp=1329753602),
 Row(userId=1, movieId=541, rating=3.0, timestamp=1329753607),
 Row(userId=1, movieId=608, rating=4.0, timestamp=1329753638),
 Row(userId=1, movieId=2375, rating=3.5, timestamp=1329753221),
 Row(userId=1, movieId=2858, rating=4.5, timestamp=1329753561)]

Train model


In [6]:
%%time

from pyspark.ml.recommendation import ALS

rank = 10
numIterations = 10

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True)

model = als.fit(df_train)


CPU times: user 108 ms, sys: 16 ms, total: 124 ms
Wall time: 12.7 s

Validate Model


In [ ]:
from pyspark.ml.evaluation import RegressionEvaluator

# Fail for hdfs-namenode
predictions = model.transform(df_test)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

# Note:  This will resolve to "nan" until Spark 2.2.0 per this Jira:  https://issues.apache.org/jira/browse/SPARK-14489
print("Root-mean-square error = " + str(rmse))

Show User Factor Matrix


In [ ]:
model.userFactors.take(5)

Show Item Factor Matrix


In [ ]:
model.itemFactors.take(5)

In [ ]:
%%time

from pyspark.sql import Row

predict_dict = [Row(userId=1, movieId=1),
                Row(userId=1, movieId=2)]

df_predict_request = \
  spark_session.createDataFrame(predict_dict)

df_predict_result = model.transform(df_predict_request)

print(df_predict_result.collect())

In [ ]:


In [ ]: