In [1]:
from pyspark import SparkConf
from pyspark import SparkContext
In [2]:
conf = SparkConf()
conf.setMaster('spark://ip-172-31-9-200:7077')
conf.setAppName('spark_analytics_chpt_3')
conf.set("spark.executor.memory", "10g")
sc = SparkContext(conf=conf)
In [3]:
raw_user_artist_data = sc.textFile('user_artist_data.txt', 12)
raw_artist_data = sc.textFile('artist_data.txt')
raw_artist_alias = sc.textFile('artist_alias.txt')
In [4]:
raw_user_artist_data.take(5)
Out[4]:
In [5]:
raw_artist_data.take(5)
Out[5]:
In [6]:
raw_artist_alias.take(5)
Out[6]:
In [7]:
raw_user_artist_data.map(lambda x: int(x.split(' ')[0])).stats()
Out[7]:
In [8]:
raw_user_artist_data.map(lambda x: int(x.split(' ')[1])).stats()
Out[8]:
In [9]:
def clean_artist_data(s):
if '\t' in s:
try:
artist_id = int(s.split('\t')[0])
artist = s.split('\t')[1].strip()
return [artist_id, artist]
except:
return [0, None]
else:
return [0, None]
In [10]:
artist_by_id = raw_artist_data.map(clean_artist_data)
In [11]:
artist_by_id.take(5)
Out[11]:
In [12]:
def clean_artist_alias(s):
if '\t' in s:
try:
bad = int(s.split('\t')[0])
good = int(s.split('\t')[1])
return [bad, good]
except:
return [0, 0]
else:
return [0, 0]
In [13]:
artist_alias = raw_artist_alias.map(clean_artist_alias) \
.collectAsMap()
In [14]:
from pyspark.mllib.recommendation import Rating
def generate_training_data(line):
user_id, artist_id, count = [int(y) for y in line.split(' ')]
final_artist_id = artist_alias.get(artist_id, artist_id) # Get canonical artist ID if it exists
return Rating(user_id, final_artist_id, count)
In [15]:
training_data = raw_user_artist_data.map(generate_training_data).cache()
In [16]:
training_data.take(10)
Out[16]:
In [17]:
from pyspark.mllib.recommendation import ALS
In [18]:
model = ALS.trainImplicit(training_data, rank=10, iterations=5, lambda_=0.01, alpha=1.0)
In [19]:
model.userFeatures().mapValues(lambda x: str(x).split(',')).first()
Out[19]:
Check the recommendations
In [20]:
raw_artists_for_user = raw_user_artist_data.map(lambda x: x.split(' ')) \
.filter(lambda x: x[0] == '2093760') \
.map(lambda x: int(x[1]))
In [21]:
existing_products = set(raw_artists_for_user.collect())
In [22]:
artist_by_id.filter(lambda x: x[0] in existing_products).collect()
Out[22]:
Make some recommendations
In [23]:
recommendations = model.recommendProducts(2093760, 5)
In [24]:
recommendations
Out[24]:
In [25]:
recommended_product_ids = set([x[1] for x in recommendations])
In [26]:
recommended_product_ids
Out[26]:
In [27]:
artist_by_id.filter(lambda x: x[0] in recommended_product_ids).collect()
Out[27]: