In [1]:
from pyspark import SparkConf
from pyspark import SparkContext

In [2]:
conf = SparkConf()
conf.setMaster('spark://ip-172-31-9-200:7077')
conf.setAppName('spark_analytics_chpt_3')
conf.set("spark.executor.memory", "10g")
sc = SparkContext(conf=conf)

In [3]:
raw_user_artist_data = sc.textFile('user_artist_data.txt', 12)
raw_artist_data = sc.textFile('artist_data.txt')
raw_artist_alias = sc.textFile('artist_alias.txt')

In [4]:
raw_user_artist_data.take(5)


Out[4]:
[u'1000002 1 55',
 u'1000002 1000006 33',
 u'1000002 1000007 8',
 u'1000002 1000009 144',
 u'1000002 1000010 314']

In [5]:
raw_artist_data.take(5)


Out[5]:
[u'1134999\t06Crazy Life',
 u'6821360\tPang Nakarin',
 u'10113088\tTerfel, Bartoli- Mozart: Don',
 u'10151459\tThe Flaming Sidebur',
 u'6826647\tBodenstandig 3000']

In [6]:
raw_artist_alias.take(5)


Out[6]:
[u'1092764\t1000311',
 u'1095122\t1000557',
 u'6708070\t1007267',
 u'10088054\t1042317',
 u'1195917\t1042317']

In [7]:
raw_user_artist_data.map(lambda x: int(x.split(' ')[0])).stats()


Out[7]:
(count: 24296858, mean: 1947573.26535, stdev: 496000.544975, max: 2443548.0, min: 90.0)

In [8]:
raw_user_artist_data.map(lambda x: int(x.split(' ')[1])).stats()


Out[8]:
(count: 24296858, mean: 1718704.09376, stdev: 2539389.04017, max: 10794401.0, min: 1.0)

In [9]:
def clean_artist_data(s):
    if '\t' in s:
        try:
            artist_id = int(s.split('\t')[0])
            artist = s.split('\t')[1].strip()
            return [artist_id, artist]
        except:
            return [0, None]
    else:
        return [0, None]

In [10]:
artist_by_id = raw_artist_data.map(clean_artist_data)

In [11]:
artist_by_id.take(5)


Out[11]:
[[1134999, u'06Crazy Life'],
 [6821360, u'Pang Nakarin'],
 [10113088, u'Terfel, Bartoli- Mozart: Don'],
 [10151459, u'The Flaming Sidebur'],
 [6826647, u'Bodenstandig 3000']]

In [12]:
def clean_artist_alias(s):
    if '\t' in s:
        try:
            bad = int(s.split('\t')[0])
            good = int(s.split('\t')[1])
            return [bad, good]
        except:
            return [0, 0]
    else:
        return [0, 0]

In [13]:
artist_alias = raw_artist_alias.map(clean_artist_alias) \
                               .collectAsMap()

In [14]:
from pyspark.mllib.recommendation import Rating

def generate_training_data(line):
    user_id, artist_id, count = [int(y) for y in line.split(' ')]
    final_artist_id = artist_alias.get(artist_id, artist_id)  # Get canonical artist ID if it exists
    return Rating(user_id, final_artist_id, count)

In [15]:
training_data = raw_user_artist_data.map(generate_training_data).cache()

In [16]:
training_data.take(10)


Out[16]:
[Rating(user=1000002, product=1, rating=55.0),
 Rating(user=1000002, product=1000006, rating=33.0),
 Rating(user=1000002, product=1000007, rating=8.0),
 Rating(user=1000002, product=1000009, rating=144.0),
 Rating(user=1000002, product=1000010, rating=314.0),
 Rating(user=1000002, product=1000013, rating=8.0),
 Rating(user=1000002, product=1000014, rating=42.0),
 Rating(user=1000002, product=1000017, rating=69.0),
 Rating(user=1000002, product=1000024, rating=329.0),
 Rating(user=1000002, product=1000025, rating=1.0)]

In [17]:
from pyspark.mllib.recommendation import ALS

In [18]:
model = ALS.trainImplicit(training_data, rank=10, iterations=5, lambda_=0.01, alpha=1.0)

In [19]:
model.userFeatures().mapValues(lambda x: str(x).split(',')).first()


Out[19]:
(120,
 ["array('d'",
  ' [-0.08928482979536057',
  ' -0.18168248236179352',
  ' 0.04558441787958145',
  ' 0.20511680841445923',
  ' -0.1506977677345276',
  ' 0.051154762506484985',
  ' 0.23714230954647064',
  ' -0.12505264580249786',
  ' -0.15268318355083466',
  ' 0.053666628897190094])'])

Check the recommendations


In [20]:
raw_artists_for_user = raw_user_artist_data.map(lambda x: x.split(' ')) \
                                           .filter(lambda x: x[0] == '2093760') \
                                           .map(lambda x: int(x[1]))

In [21]:
existing_products = set(raw_artists_for_user.collect())

In [22]:
artist_by_id.filter(lambda x: x[0] in existing_products).collect()


Out[22]:
[[1180, u'David Gray'],
 [378, u'Blackalicious'],
 [813, u'Jurassic 5'],
 [1255340, u'The Saw Doctors'],
 [942, u'Xzibit']]

Make some recommendations


In [23]:
recommendations = model.recommendProducts(2093760, 5)

In [24]:
recommendations


Out[24]:
[Rating(user=2093760, product=1001819, rating=0.02756638741437349),
 Rating(user=2093760, product=2814, rating=0.02745091514581488),
 Rating(user=2093760, product=1300642, rating=0.027437118521924767),
 Rating(user=2093760, product=1811, rating=0.027202111323623254),
 Rating(user=2093760, product=1003249, rating=0.027083081810742995)]

In [25]:
recommended_product_ids = set([x[1] for x in recommendations])

In [26]:
recommended_product_ids


Out[26]:
{1811, 2814, 1001819, 1003249, 1300642}

In [27]:
artist_by_id.filter(lambda x: x[0] in recommended_product_ids).collect()


Out[27]:
[[2814, u'50 Cent'],
 [1811, u'Dr. Dre'],
 [1003249, u'Ludacris'],
 [1001819, u'2Pac'],
 [1300642, u'The Game']]