In [1]:
import findspark
findspark.init()
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark import SparkContext, SQLContext
sc = SparkContext(appName="Pi")
sqlCtx = SQLContext(sc)
In [2]:
import pandas as pd
import numpy as np
import scipy as sp
from sklearn.metrics.pairwise import cosine_similarity
import operator
%matplotlib inline
In [3]:
from six.moves import cPickle as pickle
def make_pickle(file_name, data, force=False):
import os
if not os.path.exists("pickle"):
os.makedirs("pickle")
if os.path.exists(file_name) and not force:
# You may override by setting force=True.
print('%s already present - Skipping pickling.' % file_name)
else:
print('Pickling %s.' % file_name)
try:
with open(file_name, 'wb') as f:
pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
except Exception as e:
print('Unable to save data to', file_name, ':', e)
return file_name
In [4]:
INPUT_DATA_PATH = 'input/'
df_test = pd.read_csv(INPUT_DATA_PATH + 'test.csv',dtype={'msno' : 'category',
'source_system_tab' : 'category',
'source_screen_name' : 'category',
'source_type' : 'category',
'song_id' : 'category'})
df_train = pd.read_csv(INPUT_DATA_PATH + 'train.csv',dtype={'msno' : 'category',
'source_system_tab' : 'category',
'source_screen_name' : 'category',
'source_type' : 'category',
'target' : np.uint8,
'song_id' : 'category'})
In [5]:
df_train = df_train[['msno', 'song_id', 'target']]
df_test = df_test[['msno', 'song_id']]
In [6]:
def create_lookup_tables(words):
"""
Create lookup tables for vocabulary
:param words: Input list of words
:return: A tuple of dicts. The first dict....
"""
from collections import Counter
word_counts = Counter(words)
sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
int_to_vocab = {ii: word for ii, word in enumerate(sorted_vocab)}
vocab_to_int = {word: ii for ii, word in int_to_vocab.items()}
return vocab_to_int, int_to_vocab
tot_user_ids = df_train['msno'].astype(object).unique()
userid_to_int, int_to_userid = create_lookup_tables(tot_user_ids)
tot_song_ids = df_train['song_id'].astype(object).unique()
songid_to_int, int_to_songid = create_lookup_tables(tot_song_ids)
In [7]:
df_train['user_no'] = [userid_to_int[user_id] for user_id in df_train['msno']]
df_train['song_no'] = [songid_to_int[song_id] for song_id in df_train['song_id']]
In [8]:
ratings = sqlCtx.createDataFrame(df_train)
(training, test) = ratings.randomSplit([0.8, 0.2])
print('train/test split')
In [9]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="user_no", itemCol="song_no", ratingCol="target",
coldStartStrategy="drop")
model = als.fit(training)
print('model fit')
In [10]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
print('predict')
evaluator = RegressionEvaluator(metricName="rmse", labelCol="target",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
In [11]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
print('rcmd generate finish')
In [12]:
make_pickle('pickle/cf_result_df_train_pred', predictions.toPandas())
make_pickle('pickle/cf_result_user_recs', userRecs.toPandas())
make_pickle('pickle/cf_result_song_recs', movieRecs.toPandas())
Out[12]:
In [13]:
def convert_userid_to_userno(user_id):
if user_id in userid_to_int:
return userid_to_int[user_id]
return -1
def convert_songid_to_songno(song_id):
if song_id in songid_to_int:
return songid_to_int[song_id]
return -1
df_test['user_no'] = [convert_userid_to_userno(user_id) for user_id in df_test['msno']]
df_test['song_no'] = [convert_songid_to_songno(song_id) for song_id in df_test['song_id']]
sp_df_test = sqlCtx.createDataFrame(df_test)
In [14]:
test_predctions = model.transform(sp_df_test)
df_test_pred = test_predctions.toPandas()
In [15]:
make_pickle('pickle/cf_result_df_test_pred', df_test_pred)
Out[15]:
In [16]:
df_test_pred.head()
Out[16]:
In [ ]: