In [2]:
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.linalg.distributed import RowMatrix
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import time
from collections import defaultdict
from pyspark.sql import functions as sfunc
from pyspark.sql import types as stypes
import math
import sys
from pyspark.ml.linalg import SparseVector
from pyspark.mllib.linalg.distributed import RowMatrix
from operator import itemgetter
import operator
import random
In [3]:
schema = stypes.StructType().add("fv", stypes.StringType()).add("sku", stypes.StringType()).add("score", stypes.FloatType())
train_df = spark.read.csv('gs://lbanor/pyspark/train_query*.gz', header=True, schema=schema)
train_df.createOrReplaceTempView('test1')
In [17]:
train_df.head(3)
Out[17]:
In [188]:
print(train_df.rdd.filter(lambda x: x.sku == 'FI911SHF89UBM-50').take(3))
In [82]:
# query = """
# SELECT
# sku,
# ROW_NUMBER() OVER (ORDER BY SUM(1)) -1 idx
# FROM test1
# GROUP BY 1
# """
# skus_rdd = spark.sql(query).rdd
In [4]:
query_statistics = """
SELECT
sku,
SQRT(10 * LOG(COUNT(sku) OVER()) / {threshold}) / SQRT(SUM(score * score)) p,
IF(SQRT(10 * LOG(COUNT(sku) OVER()) / {threshold}) > SQRT(SUM(score * score)), SQRT(SUM(score * score)), SQRT(10 * LOG(COUNT(sku) OVER()) / {threshold})) q --- implements the min(gamma, ||c||)
FROM test1
GROUP BY 1
"""
In [8]:
skus_stats = spark.sql(query_statistics.format(threshold=0.1))
In [9]:
print(skus_stats.rdd.filter(lambda x: x.sku == 'FI911SHF89UBM-50').take(3))
In [178]:
sku_stats.take(3)
Out[178]:
In [194]:
print(skus_stats.rdd.filter(lambda x: x.sku == 'FI911SHF89UBM-50').take(3))
In [ ]:
# query_statistics = """
# SELECT
# sku,
# {gamma} / SQRT(SUM(score * score)) p,
# IF({gamma} > SQRT(SUM(score * score)), SQRT(SUM(score * score)), {gamma}) q
# FROM test1
# GROUP BY 1
# """
In [60]:
# def get_gamma(threshold, numCols):
# return math.sqrt(10 * math.log(numCols) / threshold) if threshold > 10e-6 else math.inf
In [76]:
# gamma_b = sc.broadcast(get_gamma(10e-2))
# print(gamma_b.value)
In [77]:
# skus_stats = spark.sql(query_statistics.format(gamma=gamma_b.value))
In [78]:
# skus_stats.head(2)
Out[78]:
In [10]:
pq_b = sc.broadcast({row.sku: [row.p, row.q] for row in skus_stats.collect()})
In [11]:
pq_b.value['FI911SHF89UBM-50']
Out[11]:
In [157]:
#skus_idx_b = sc.broadcast({sku: idx for idx, sku in enumerate(pq_b.value.keys())})
In [158]:
#idx_skus_b = sc.broadcast({value: key for key, value in skus_idx_b.value.items()})
In [53]:
# d = {row.sku: row.idx for row in skus_rdd.collect()}
# db = sc.broadcast(d)
# id_ = {value: key for key, value in d.items()}
# id_b = sc.broadcast(id_)
In [159]:
#numCols = sc.broadcast(len(idx_skus_b.value))
In [57]:
# p = [0] * numCols.value
# for row in skus_stats
In [55]:
#p = {row.sku: gamma_b.value / row.norm for row in skus_stats.collect()} # if 0 happens as the ``norm`` we expected an Exception to be raised.
#p_b = sc.broadcast(p)
In [34]:
#q = {row.sku: gamma_b.value / row.norm for row in skus_stats.collect()}
In [35]:
#numCols.value
Out[35]:
In [31]:
#skus_s['NI531SRM74IHX']
Out[31]:
In [12]:
query_users_items = """
SELECT
data
FROM(
SELECT
fv,
COLLECT_LIST(STRUCT(sku, score)) data
FROM test1
GROUP BY 1
)
WHERE SIZE(data) BETWEEN 2 AND 200
"""
t0 = time.time()
users = spark.sql(query_users_items)
users_rdd = users.rdd
In [148]:
users.head(2)
Out[148]:
In [13]:
def map_cosines(row):
for i in range(len(row)):
value_i = row[i].score / pq_b.value[row[i].sku][1]
if random.random() < pq_b.value[row[i].sku][0]:
for j in range(i + 1, len(row)):
value_j = row[j].score / pq_b.value[row[j].sku][1]
if random.random() < pq_b.value[row[i].sku][0]:
yield ((row[i].sku, row[j].sku), value_i * value_j)
In [14]:
users2 = users.rdd.flatMap(lambda row: map_cosines(row.data))
In [150]:
users2.take(2)
Out[150]:
In [15]:
final = users2.reduceByKey(operator.add)
In [16]:
t0 = time.time()
print(final.take(3))
print(time.time() - t0)
In [ ]:
In [18]:
import numpy as np
In [20]:
a = np.random.randn(12288, 150) # a.shape = (12288, 150)
b = np.random.randn(150, 45) # b.shape = (150, 45)
c = np.dot(a,b)
In [21]:
c.shape
Out[21]:
In [39]:
b = np.random.randn(4, 1)
In [40]:
b
Out[40]:
In [41]:
b[3]
Out[41]:
In [42]:
a = np.random.randn(3, 3)
b = np.random.randn(3, 1)
c = a*b
In [44]:
a
Out[44]:
In [45]:
b
Out[45]:
In [46]:
c
Out[46]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: