In [1]:
import findspark
findspark.init()

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark import SparkContext, SQLContext

sc = SparkContext(appName="Pi")
sqlCtx = SQLContext(sc)

In [2]:
import pandas as pd
import numpy as np
import scipy as sp
from sklearn.metrics.pairwise import cosine_similarity
import operator

%matplotlib inline

In [3]:
from six.moves import cPickle as pickle
def make_pickle(file_name, data, force=False):
    import os
    if not os.path.exists("pickle"):
        os.makedirs("pickle")
        
    if os.path.exists(file_name) and not force:
        # You may override by setting force=True.
        print('%s already present - Skipping pickling.' % file_name)
    else:
        print('Pickling %s.' % file_name)
        try:
            with open(file_name, 'wb') as f:
                pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
        except Exception as e:
            print('Unable to save data to', file_name, ':', e)
    
    return file_name

In [4]:
INPUT_DATA_PATH = 'input/'
df_test = pd.read_csv(INPUT_DATA_PATH + 'test.csv',dtype={'msno' : 'category',
                                                'source_system_tab' : 'category',
                                                'source_screen_name' : 'category',
                                                'source_type' : 'category',
                                                'song_id' : 'category'})

df_train = pd.read_csv(INPUT_DATA_PATH + 'train.csv',dtype={'msno' : 'category',
                                                 'source_system_tab' : 'category',
                                                  'source_screen_name' : 'category',
                                                  'source_type' : 'category',
                                                  'target' : np.uint8,
                                                  'song_id' : 'category'})

In [5]:
df_train = df_train[['msno', 'song_id', 'target']]
df_test = df_test[['msno', 'song_id']]

In [6]:
def create_lookup_tables(words):
    """
    Create lookup tables for vocabulary
    :param words: Input list of words
    :return: A tuple of dicts.  The first dict....
    """
    from collections import Counter
    word_counts = Counter(words)
    sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
    int_to_vocab = {ii: word for ii, word in enumerate(sorted_vocab)}
    vocab_to_int = {word: ii for ii, word in int_to_vocab.items()}

    return vocab_to_int, int_to_vocab

tot_user_ids = df_train['msno'].astype(object).unique()
userid_to_int, int_to_userid = create_lookup_tables(tot_user_ids)
tot_song_ids = df_train['song_id'].astype(object).unique()
songid_to_int, int_to_songid = create_lookup_tables(tot_song_ids)

In [7]:
df_train['user_no'] = [userid_to_int[user_id] for user_id in df_train['msno']]
df_train['song_no'] = [songid_to_int[song_id] for song_id in df_train['song_id']]

In [8]:
ratings = sqlCtx.createDataFrame(df_train)
(training, test) = ratings.randomSplit([0.8, 0.2])
print('train/test split')


train/test split

In [9]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="user_no", itemCol="song_no", ratingCol="target",
          coldStartStrategy="drop")
model = als.fit(training)
print('model fit')


model fit

In [10]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
print('predict')

evaluator = RegressionEvaluator(metricName="rmse", labelCol="target",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


predict
Root-mean-square error = 0.4763473531509393

In [11]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
print('rcmd generate finish')


rcmd generate finish

In [12]:
make_pickle('pickle/cf_result_df_train_pred', predictions.toPandas())
make_pickle('pickle/cf_result_user_recs', userRecs.toPandas())
make_pickle('pickle/cf_result_song_recs', movieRecs.toPandas())


Pickling pickle/cf_result_df_train_pred.
pickle/cf_result_user_recs already present - Skipping pickling.
pickle/cf_result_song_recs already present - Skipping pickling.
Out[12]:
'pickle/cf_result_song_recs'

In [13]:
def convert_userid_to_userno(user_id):
    if user_id in userid_to_int:
        return userid_to_int[user_id]
    
    return -1

def convert_songid_to_songno(song_id):
    if song_id in songid_to_int:
        return songid_to_int[song_id]
    
    return -1

df_test['user_no'] = [convert_userid_to_userno(user_id) for user_id in df_test['msno']]
df_test['song_no'] = [convert_songid_to_songno(song_id) for song_id in df_test['song_id']]
sp_df_test = sqlCtx.createDataFrame(df_test)

In [14]:
test_predctions = model.transform(sp_df_test)
df_test_pred = test_predctions.toPandas()

In [15]:
make_pickle('pickle/cf_result_df_test_pred', df_test_pred)


Pickling pickle/cf_result_df_test_pred.
Out[15]:
'pickle/cf_result_df_test_pred'

In [16]:
df_test_pred.head()


Out[16]:
msno song_id user_no song_no prediction
0 UYsDRcm4UqNQVN+IrTfJX1MDsFCiGu/5EZT6MKb3B4o= cy10N2j2sdY/X4BDUcMu2Iumfz7pV3tqE5iEaup2yGI= 6658 148 0.682439
1 feaQJ9P/e04zHIiRjxpb9oqcrTrGO6Pei5iqswNAYTc= cy10N2j2sdY/X4BDUcMu2Iumfz7pV3tqE5iEaup2yGI= 7554 148 0.512002
2 RYjTxubYo8PnNricR8ep8ZlR/I9jiHmkWnVywlNbPZ4= cy10N2j2sdY/X4BDUcMu2Iumfz7pV3tqE5iEaup2yGI= 9900 148 0.302709
3 E47lQ6gQZfojmYN3St9RL5t5pE2tcwNbS7z/hVhR9AE= cy10N2j2sdY/X4BDUcMu2Iumfz7pV3tqE5iEaup2yGI= 19553 148 0.408393
4 mdc75mqmDTdotiOVa9DVyNPQ+wivAJTczw1g9/rj+EU= cy10N2j2sdY/X4BDUcMu2Iumfz7pV3tqE5iEaup2yGI= 23271 148 0.852764

In [ ]: