In [1]:
import pyspark
from pyspark.mllib.linalg.distributed import MatrixEntry, CoordinateMatrix
In [2]:
import numpy as np
import pandas as pd
In [8]:
X = np.array([[1,0,0,0],[2,1,0,0],[3,4,1,0],[5,6,7,1]])
X_r = np.array([[1,2,3,5],[2,1,4,6],[3,4,1,7],[5,6,7,1]])
Y = np.array([[1,0],[0,1],[0.5,0.5],[0.5,0.5]])
print(Y)
print(X)
M = X.dot(Y)
print(M)
In [13]:
print(X_r)
print()
print(X_r.dot(Y))
In [59]:
matrix_x_r = sc.parallelize(list(enumerate(map(lambda x: list(enumerate(x)),X_r))))
real_X_mat = CoordinateMatrix(
matrix_x_r.flatMap(lambda x: [(x[0], *a) for a in x[1]]).map(lambda x: MatrixEntry(*x)),
numRows=4, numCols=4)
In [66]:
matrix_x = sc.parallelize(list(enumerate(map(lambda x: list(enumerate(x)),X))))
X_mat = CoordinateMatrix(
matrix_x.flatMap(lambda x: [(x[0], *a) for a in x[1]]).filter(lambda x: x[2] != 0.0).map(lambda x: MatrixEntry(*x)),
numRows=4, numCols=4)
X_mat.entries.take(5)
Out[66]:
In [64]:
matrix_y = sc.parallelize(list(enumerate(map(lambda x: list(enumerate(x)),Y))))
Y_mat = CoordinateMatrix(
matrix_y.flatMap(lambda x: [(x[0], *a) for a in x[1]]).filter(lambda x: x[2] != 0.0).map(lambda x: MatrixEntry(*x)),
numRows=4, numCols=2)
Y_mat.entries.collect()
Out[64]:
In [107]:
def naive_multiplication(A:CoordinateMatrix, B:CoordinateMatrix, is_triangle=False):
"""
A is the left matrix
B is the right matix
"""
if is_triangle:
left_rdd = (A.entries
.flatMap(lambda x: [((x.j, x.i), x.value),((x.i, x.j), x.value)])
.aggregateByKey(
zeroValue=(0.0,0.0),
seqFunc=lambda x,y: (x[0]+y, x[1] + 1 ),
combFunc=lambda a,b: (x[0] + y[0], x[1] + y[1]))
.mapValues(lambda x: x[0]/x[1])
.map(lambda x: (x[0][0], (x[0][1], x[1])))
)
else:
left_rdd = A.entries.map(lambda x: (x.j, (x.i, x.value)))
right_rdd = B.entries.map(lambda x: (x.i, (x.j, x.value)))
combined_rdd = (left_rdd
.join(right_rdd)
.map(lambda x: x[1])
.map(lambda x: ((x[0][0], x[1][0]), x[0][1]*x[1][1]))
.reduceByKey(lambda x,y: x+y)
.map(lambda x: MatrixEntry(i=x[0][0], j=x[0][1], value=x[1]))
)
return combined_rdd
In [108]:
real_product = naive_multiplication(real_X_mat,Y_mat)
real_product.collect()
Out[108]:
In [109]:
print(X_r.dot(Y))
In [110]:
triangle_product = naive_multiplication(X_mat,Y_mat,is_triangle=True)
triangle_product.collect()
# tmp = X_mat.entries.flatMap(lambda x: [((x.j, x.i), x.value),((x.i, x.j), x.value)])
# tmp= tmp.aggregateByKey(
# zeroValue=(0.0,0.0),
# seqFunc=lambda x,y: (x[0]+y, x[1] + 1 ),
# combFunc=lambda a,b: (x[0] + y[0], x[1] + y[1]))
# tmp.mapValues(lambda x: x[0]/x[1]).collect()
Out[110]:
In [102]:
real_X_mat.entries.collect()
Out[102]:
In [ ]: