In [1]:
import findspark
findspark.init()

from pyspark.mllib.recommendation import ALS, Rating
from pyspark import SparkContext
sc = SparkContext("local", "test")

In [37]:
TRAIN_FILE = "./data//ratings-train.dat"
VALIDATION_FILE = "./data//ratings-validation.dat"
TEST_FILE = "./data/ratings-test.dat"

# TRAIN_FILE = "./data-test/ratings-train-1000.dat"
# VALIDATION_FILE = "./data-test/ratings-validation-1000.dat"
# TEST_FILE = "./data-test/ratings-test-1000.dat"

In [38]:
def prepare_data(data):
    return (
        data
        .map(lambda l: l.split(','))
        .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
    )

In [39]:
# Load and parse the data
ratings_train_text = sc.textFile(TRAIN_FILE)
ratings_train = prepare_data(ratings_train_text)

In [40]:
ratings_validation_text = sc.textFile(VALIDATION_FILE)
ratings_validation = prepare_data(ratings_validation_text)

In [41]:
ratings_test_text = sc.textFile(TEST_FILE)
ratings_test = prepare_data(ratings_validation_text)

test.first()


In [42]:
ratings_train.take(3)


Out[42]:
[Rating(user=36955, product=21, rating=3.0),
 Rating(user=36955, product=47, rating=5.0),
 Rating(user=36955, product=1079, rating=3.0)]

In [43]:
def prepare_validation(validation):
    return validation.map(lambda p: (p[0], p[1]))

In [44]:
# Evaluate the model on training data
def train_evaluate_als(train, validation, rank, iterations_num, lambda_val):
    model = ALS.train(train, rank, iterations_num, lambda_val)
    predictions = model.predictAll(prepare_validation(validation)).map(lambda r: ((r[0], r[1]), r[2]))
    ratesAndPreds = train.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
    MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    return MSE

In [45]:
ranks = [10, 20, 30, 40, 50]
lambda_values = [0.01,0.1,1.0,10.0]
ITERATIONS = 10

In [46]:
def report_mse_results(rank, lambda_value, mse):
    print("Rank=%d, Lambda=%0.2f, MSE=%s" % (rank, lambda_value, mse))

In [47]:
def evaluate_parameters(train, validation, ranks, lambda_values):
    for r in ranks:
        for l in lambda_values:
            mse = train_evaluate_als(ratings_train, ratings_validation, r, ITERATIONS, l)
            report_mse_results(r, l, mse)

In [ ]:
evaluate_parameters(ratings_train, ratings_validation, ranks, lambda_values)

In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]: