In [2]:
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.linalg.distributed import RowMatrix
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import time
from collections import defaultdict
from pyspark.sql import functions as sfunc
from pyspark.sql import types as stypes
import math
import sys
from pyspark.ml.linalg import SparseVector
from pyspark.mllib.linalg.distributed import RowMatrix
from operator import itemgetter
import operator
import random

In [3]:
schema = stypes.StructType().add("fv", stypes.StringType()).add("sku", stypes.StringType()).add("score", stypes.FloatType())
train_df = spark.read.csv('gs://lbanor/pyspark/train_query*.gz', header=True, schema=schema)
train_df.createOrReplaceTempView('test1')

In [17]:
train_df.head(3)


Out[17]:
[Row(fv='6094795238635852694', sku='BR049APM25PCS', score=0.5),
 Row(fv='7454424246364596889', sku='TR763APF11DLC', score=0.5),
 Row(fv='5798933384203870548', sku='AN778SHF35NNG', score=0.5)]

In [188]:
print(train_df.rdd.filter(lambda x: x.sku == 'FI911SHF89UBM-50').take(3))


[Row(fv='1005105267406228429', sku='FI911SHF89UBM-50', score=5.0)]

In [82]:
# query = """
# SELECT
#   sku,
#   ROW_NUMBER() OVER (ORDER BY SUM(1)) -1 idx
# FROM test1
# GROUP BY 1
# """
# skus_rdd = spark.sql(query).rdd

In [4]:
query_statistics = """
SELECT
  sku,
  SQRT(10 * LOG(COUNT(sku) OVER()) / {threshold}) / SQRT(SUM(score * score)) p,
  IF(SQRT(10 * LOG(COUNT(sku) OVER()) / {threshold}) > SQRT(SUM(score * score)), SQRT(SUM(score * score)), SQRT(10 * LOG(COUNT(sku) OVER()) / {threshold})) q --- implements the min(gamma, ||c||)
FROM test1
GROUP BY 1
"""

In [8]:
skus_stats = spark.sql(query_statistics.format(threshold=0.1))

In [9]:
print(skus_stats.rdd.filter(lambda x: x.sku == 'FI911SHF89UBM-50').take(3))


[Row(sku='FI911SHF89UBM-50', p=7.132311576894841, q=5.0)]

In [178]:
sku_stats.take(3)


Out[178]:
[Row(sku='PO140ACU06DDD', p=2.4697175158107982, q=14.439529078193651),
 Row(sku='PO140ACU76FVN', p=35.661557884474206, q=1.0),
 Row(sku='JU082SHF02WWZ', p=3.790780833876121, q=9.40744386111339)]

In [194]:
print(skus_stats.rdd.filter(lambda x: x.sku == 'FI911SHF89UBM-50').take(3))


[]

In [ ]:
# query_statistics = """
# SELECT
#   sku,
#   {gamma} / SQRT(SUM(score * score)) p,
#   IF({gamma} > SQRT(SUM(score * score)), SQRT(SUM(score * score)), {gamma}) q
# FROM test1
# GROUP BY 1
# """

In [60]:
# def get_gamma(threshold, numCols):
#     return math.sqrt(10 * math.log(numCols) / threshold) if threshold > 10e-6 else math.inf

In [76]:
# gamma_b = sc.broadcast(get_gamma(10e-2))
# print(gamma_b.value)


35.57234899487128

In [77]:
# skus_stats = spark.sql(query_statistics.format(gamma=gamma_b.value))

In [78]:
# skus_stats.head(2)


Out[78]:
[Row(sku='NI531SRM74IHX', p=2.8758539658272255, q=12.36931687685298),
 Row(sku='MO578SHF45QNE', p=0.5225157525775272, q=35.57234899487128)]

In [10]:
pq_b = sc.broadcast({row.sku: [row.p, row.q] for row in skus_stats.collect()})

In [11]:
pq_b.value['FI911SHF89UBM-50']


Out[11]:
[7.132311576894841, 5.0]

In [157]:
#skus_idx_b = sc.broadcast({sku: idx for idx, sku in enumerate(pq_b.value.keys())})

In [158]:
#idx_skus_b = sc.broadcast({value: key for key, value in skus_idx_b.value.items()})

In [53]:
# d = {row.sku: row.idx for row in skus_rdd.collect()}
# db = sc.broadcast(d)

# id_ = {value: key for key, value in d.items()}
# id_b = sc.broadcast(id_)

In [159]:
#numCols = sc.broadcast(len(idx_skus_b.value))

In [57]:
# p = [0] * numCols.value
# for row in skus_stats

In [55]:
#p = {row.sku: gamma_b.value / row.norm for row in skus_stats.collect()} # if 0 happens as the ``norm`` we expected an Exception to be raised.
#p_b = sc.broadcast(p)

In [34]:
#q = {row.sku: gamma_b.value / row.norm for row in skus_stats.collect()}

In [35]:
#numCols.value


Out[35]:
312988

In [31]:
#skus_s['NI531SRM74IHX']


Out[31]:
12.36931687685298

In [12]:
query_users_items = """
SELECT
data
FROM(
  SELECT
    fv,
    COLLECT_LIST(STRUCT(sku, score)) data
  FROM test1
  GROUP BY 1
)
WHERE SIZE(data) BETWEEN 2 AND 200
"""

t0 = time.time()
users = spark.sql(query_users_items)
users_rdd = users.rdd

In [148]:
users.head(2)


Out[148]:
[Row(data=[Row(sku='CO796SCF87LXG', score=0.5), Row(sku='CO796SCM72JGT', score=0.5), Row(sku='CO796SCM23HHW', score=0.5)]),
 Row(data=[Row(sku='HA723APF18CPL', score=0.5), Row(sku='CO515APF44YPR', score=0.5), Row(sku='LA906APF69OQC', score=0.5), Row(sku='TU142APF19BPC', score=0.5), Row(sku='CO515APF27DIA', score=0.5), Row(sku='GA753APF40NJR', score=0.5), Row(sku='GA753APF41NJQ', score=1.0)])]

In [13]:
def map_cosines(row):
    for i in range(len(row)):
        value_i = row[i].score / pq_b.value[row[i].sku][1]
        if random.random() < pq_b.value[row[i].sku][0]:
            for j in range(i + 1, len(row)):
                value_j = row[j].score / pq_b.value[row[j].sku][1]
                if random.random() < pq_b.value[row[i].sku][0]:
                    yield ((row[i].sku, row[j].sku), value_i * value_j)

In [14]:
users2 = users.rdd.flatMap(lambda row: map_cosines(row.data))

In [150]:
users2.take(2)


Out[150]:
[(('CO796SCM72JGT', 'CO796SCM23HHW'), 0.0002015811797719921),
 (('HA723APF18CPL', 'CO515APF44YPR'), 0.031234752377721216)]

In [15]:
final = users2.reduceByKey(operator.add)

In [16]:
t0 = time.time()
print(final.take(3))
print(time.time() - t0)


[(('VI618SHF35NCY-51', 'LU773ACF56ILV'), 0.029501220638256383), (('FI911APF72ZHF', 'KA952APF52DNB'), 0.015504341823651058), (('FA865ACF45CCS', 'QU097ACF14BCMN'), 0.7071067811865475)]
363.733115196228

In [ ]:


In [18]:
import numpy as np

In [20]:
a = np.random.randn(12288, 150) # a.shape = (12288, 150)
b = np.random.randn(150, 45) # b.shape = (150, 45)
c = np.dot(a,b)

In [21]:
c.shape


Out[21]:
(12288, 45)

In [39]:
b = np.random.randn(4, 1)

In [40]:
b


Out[40]:
array([[ 0.22988676],
       [-0.77589895],
       [-0.77754825],
       [-0.06151452]])

In [41]:
b[3]


Out[41]:
array([-0.06151452])

In [42]:
a = np.random.randn(3, 3)
b = np.random.randn(3, 1)
c = a*b

In [44]:
a


Out[44]:
array([[-0.01004274, -0.45400667, -1.97007744],
       [-0.54591752, -0.59968557,  1.47375852],
       [ 0.33738485,  1.00607007,  0.69213239]])

In [45]:
b


Out[45]:
array([[ 0.42442128],
       [-0.8827092 ],
       [-0.5387125 ]])

In [46]:
c


Out[46]:
array([[-0.00426235, -0.19269009, -0.83614278],
       [ 0.48188642,  0.52934797, -1.30090021],
       [-0.18175343, -0.54198252, -0.37286037]])

In [ ]:


In [ ]:


In [ ]:


In [ ]: