In [39]:
import numpy as np
from itertools import groupby, permutations, chain, islice
from operator import itemgetter, add
def get_random_element(x):
if x is None:
return -1
else:
return x[np.random.randint(len(x))]
def split_every(n, iterable):
i = iter(iterable)
piece = list(islice(i, n))
while piece:
yield piece
piece = list(islice(i, n))
Let's generate some random shopping lists:
In [40]:
num_unique_items = 5000
num_paired_items = 100
num_bought_items = 250000
Select some random pairs which are usually bought together
In [41]:
test_givenpairs = np.random.randint(num_unique_items, size=(num_paired_items, 2))
test_pair_dict = dict([(k, [itemgetter(1)(f) for f in v]) for k, v in groupby(sorted(test_givenpairs,key=itemgetter(0)), key = itemgetter(0))])
print "We generate example pairs which will always be bought together. Some examples:"
test_pair_dict.items()[0:5]
Out[41]:
Determine the items that are bought. By running them modulo their index, we create an exponential distribution
In [42]:
test_purchases_bought = [j % (i+1) for i, j in enumerate(np.random.randint(num_unique_items, size=(num_bought_items)))]
Now map all bought items to a random item which is by our definition bought together. If one is available
In [43]:
vectorized_select_paired_items = np.vectorize(test_pair_dict.get)
vectorized_select_random_item = np.vectorize(get_random_element)
test_purchases_added = (
vectorized_select_random_item( # select one item random from the list of paired items
vectorized_select_paired_items( # map them to the list of paired items
test_purchases_bought # the items we bought
)
))
test_purchases_pairs = np.transpose(np.vstack((test_purchases_bought, test_purchases_added)))
np.random.shuffle(test_purchases_pairs)
Given these pairs, we unravel it to create one long list, which is then splitted into different shopping baskets
In [44]:
test_sequence = test_purchases_pairs.ravel()
test_shopping_baskets = [nonempty for nonempty in [np.unique(a[a>0]).tolist() for a in [np.array(a) for a in list(split_every(6, test_sequence))]] if len(nonempty)>0]
print "Each few items are joined together to form a list. Because some items are removed, lists have variable length. Some examples:"
print test_shopping_baskets[0:5]
print "Total number of generated shopping lists = {}".format(len(test_shopping_baskets))
In [45]:
shopping_baskets = sc.parallelize(test_shopping_baskets)
shopping_baskets_count = shopping_baskets.count()
print "There are in total {} shopping lists".format(shopping_baskets_count)
Getting item buy frequency. While the shopping lists do not have to fit in memory, a list of unique items should
In [46]:
def check_uniqueness(t):
if len(set(t))!=len(t):
raise ValueError("Items in a transaction must be unique but got {}".format(t))
return t
else:
return t
item_freq = dict(shopping_baskets.flatMap(check_uniqueness).map(lambda v: (v, 1L)).reduceByKey(add).collect())
In [47]:
print "The most popular item is bought in {} shopping baskets".format(max(item_freq.values()))
print "As example, the first few items with their frequencies:"
print item_freq.items()[0:5]
We use the following score: $$ score = \dfrac{\bigg(\dfrac{X\ and\ Y}{X}\bigg)}{\bigg(\dfrac{(not\ X)\ and\ Y}{not\ X}\bigg)}$$
In [48]:
from __future__ import division
def calculate_score( xy, xy_count):
"""
xy is a tuple of item ids
xy_count is the observation count
calculates:
x and y / x
/
not x and y / not x"""
x_item, y_item = xy
x = item_freq[x_item]
y = item_freq[y_item]
notx = shopping_baskets_count - x
x_y = xy_count
notx_y = y - x_y
if notx_y==0:
return (xy, np.Inf)
else:
return (xy, (notx/x) * (x_y/notx_y))
def all_pairs(x):
return list(permutations(x, 2)) # permutations also generates the pairs with _1 and _2 flipped
def as_key_with_value(i):
def as_key(x):
return x, i
return as_key
In [49]:
pairs = shopping_baskets\
.flatMap(all_pairs)\
.map(as_key_with_value(1))\
.reduceByKey(add)\
.map(lambda x:calculate_score(*x))\
.cache()
Now we have the score for every pair of products ever bought
In [50]:
pairs_count = pairs.count()
print "There are in total {} pairs of bought products".format(pairs_count)
print "The first few pairs with their score:"
pairs.take(10)
Out[50]:
As context, let's get the histogram
In [61]:
pairs.map(lambda k_v:k_v[1]).filter(lambda score: not np.isinf(score)).histogram(10)
Out[61]:
In [62]:
frequent_pairs = pairs.filter(lambda k_v:k_v[1]>250).collect()
print "The number of frequent pairs = {}".format(len(frequent_pairs))
In [52]:
frequent_pairs[0:5]
Out[52]:
Here we define some helper functions to keep the highest N co occurring items
In [53]:
def aggregate_zero():
return []
def aggregate_seq(n):
def sequenceadd(seq, item):
seq.append(item)
seq.sort(key=lambda x:x[1], reverse=True)
return seq[0:n]
return sequenceadd
def aggregate_combine(n):
def combine(seq1, seq2):
return sorted(seq1+seq2, key=lambda x:x[1], reverse=True)[0:n]
return combine
Instead of hard thresholds, we can just find the most cross sellable product for each product. Some examples:
In [54]:
item_with_cross_sells = pairs\
.map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))\
.aggregateByKey(aggregate_zero(), aggregate_seq(5), aggregate_combine(5)).cache()
In [55]:
item_with_cross_sells.take(5)
Out[55]:
Let's find a perfect pair, one with score infinity
In [56]:
perfect_pair = pairs.filter(lambda x: np.isinf(x[1])).take(1)[0]
perfect_pair
Out[56]:
And show with which other items that occurs:
In [57]:
item_with_cross_sells.lookup(perfect_pair[0][0])
Out[57]: