In [1]:
import os
master = '--master local[1]'
#master = '--master spark://apachespark-master-2-1-0:7077'
conf = '--conf spark.cores.max=1 --conf spark.executor.memory=1024m'
packages = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1'
jars = '--jars /root/lib/jpmml-sparkml-package-1.0-SNAPSHOT.jar'
py_files = '--py-files /root/lib/jpmml.py'
os.environ['PYSPARK_SUBMIT_ARGS'] = master \
+ ' ' + conf \
+ ' ' + packages \
+ ' ' + jars \
+ ' ' + py_files \
+ ' ' + 'pyspark-shell'
print(os.environ['PYSPARK_SUBMIT_ARGS'])
In [2]:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.getOrCreate()
In [3]:
df = spark_session.read.format('csv') \
.options(header='true', inferSchema='true') \
.load('s3a://datapalooza/movielens/ml-latest/ratings-sm.csv')
(df_train, df_test) = df.randomSplit([0.8, 0.2])
In [4]:
df_train.take(5)
Out[4]:
In [5]:
df_test.take(5)
Out[5]:
In [6]:
%%time
from pyspark.ml.recommendation import ALS
rank = 10
numIterations = 10
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True)
model = als.fit(df_train)
In [ ]:
from pyspark.ml.evaluation import RegressionEvaluator
# Fail for hdfs-namenode
predictions = model.transform(df_test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
# Note: This will resolve to "nan" until Spark 2.2.0 per this Jira: https://issues.apache.org/jira/browse/SPARK-14489
print("Root-mean-square error = " + str(rmse))
In [ ]:
model.userFactors.take(5)
In [ ]:
model.itemFactors.take(5)
In [ ]:
%%time
from pyspark.sql import Row
predict_dict = [Row(userId=1, movieId=1),
Row(userId=1, movieId=2)]
df_predict_request = \
spark_session.createDataFrame(predict_dict)
df_predict_result = model.transform(df_predict_request)
print(df_predict_result.collect())
In [ ]:
In [ ]: