In [168]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import pandas as pd
import numpy as np
import sys
import itertools
from math import sqrt
from operator import add
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
In [13]:
data_path = "wiki-edits.json.gz"
In [14]:
#read in the data
wiki_edits = sqlCtx.read.json(data_path)
In [188]:
#quick view of the data we are working with
wiki_edits.take(5)
Out[188]:
In [16]:
wiki_edits.printSchema()
In [17]:
wiki_edits.registerTempTable('wiki')
In [21]:
#simple query to check that the IP addresses are those without a user_id
sqlCtx.sql("select * from wiki where user_id is null").take(10)
Out[21]:
In [20]:
#total number of revisions in our file
wiki_edits.count()
Out[20]:
In [38]:
users = sqlCtx.sql("select user_id, count(1) as rev_count, count(distinct(article_id)) as article_num \
from wiki where user_id is not null group by user_id")
In [39]:
users_local = users.collect()
In [40]:
users_local[:10]
Out[40]:
In [42]:
users_local_df = pd.DataFrame(users_local, columns=["user_id", "rev_count", "article_num"])
In [43]:
users_local_df.head()
Out[43]:
In [44]:
print max(users_local_df.rev_count), min(users_local_df.rev_count), np.mean(users_local_df.rev_count)
print max(users_local_df.article_num), min(users_local_df.article_num), np.mean(users_local_df.rev_count)
In [73]:
#count the number of articles represented (even those outside of namespace=0)
sqlCtx.sql("select distinct(article_id) from wiki").count()
Out[73]:
In [167]:
#count the number of users represented
sqlCtx.sql("select distinct(user_id) from wiki where user_id is not null").count()
Out[167]:
In [183]:
article_raw = sqlCtx.sql("select user_id, article_id, count(1) as article_count from wiki \
where user_id is not null and article_namespace=0 group by user_id, article_id")
In [184]:
#Some users look at a lot of data, this is where the 5 edit ceiling was developed in the data version above
#the code below did not use the .map(lambda (u, a, a_c): (u, a, min(a_c, 5))) code
article_user_local = article_raw.collect()
In [185]:
article_user_local[:10]
Out[185]:
In [186]:
article_user_df = pd.DataFrame(article_user_local, columns=["user_id", "article", "art_count"])
In [187]:
print min(article_user_df.art_count), max(article_user_df.art_count), np.mean(article_user_df.art_count)
In [140]:
len(article_user_df)
Out[140]:
In [149]:
len(article_user_df[article_user_df.art_count>5])
Out[149]:
In [150]:
#very small proportion of the users edit a particular article more than 5 times
24154/413484.0*100
Out[150]:
In [146]:
c = article_user_df[article_user_df.art_count<5]
In [127]:
np.mean(article_user_df.art_count)
Out[127]:
In [148]:
plt.hist(c.art_count)
Out[148]:
In [169]:
article_user_train = sqlCtx.sql("select user_id, article_id, count(1) as article_count from wiki \
where user_id is not null and article_namespace=0 and timestamp not like '%9Z' group by user_id, article_id")\
.map(lambda (u, a, a_c): (u, a, min(a_c, 5)))
In [170]:
article_user_test = sqlCtx.sql("select user_id, article_id, count(1) as article_count from wiki \
where user_id is not null and article_namespace=0 and timestamp like '%9Z' group by user_id, article_id")\
.map(lambda (u, a, a_c): (u, a, min(a_c, 5)))
In [171]:
print article_user_train.count()
print article_user_test.count()
In [87]:
#checking to see that the split on timestamp is OK
77077/444373.0*100
Out[87]:
In [162]:
model = ALS.train(article_user_train, rank = 10, iterations = 5)
In [96]:
##from MovieLensALS.py
def computeRmse(model, data):
"""
Compute RMSE (Root Mean Squared Error).
"""
predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
.join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
.values()
n = data.count()
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
In [90]:
#using the edit count as the "rating"
computeRmse(model, article_user_test)
Out[90]:
In [163]:
#using the edit count as the "rating" with a ceiling of 5 edits
computeRmse(model, article_user_test)
Out[163]:
In [97]:
#using a bollean edited, not edited
#this may be a bit more accurate to if a user will interact with an article or not
computeRmse(model, article_user_test)
Out[97]:
In [173]:
predictions = model.predictAll(article_user_test.map(lambda x: (x[0], x[1])))
In [174]:
#View of what the preditions look like
predictions.take(10)
Out[174]:
In [175]:
predictions.count()
Out[175]:
In [176]:
article_user_test.count()
Out[176]:
In [182]:
coverage = predictions.count()/float(article_user_test.count())*100
print coverage