In [1]:
import pyspark
from pyspark.mllib.linalg.distributed import MatrixEntry, CoordinateMatrix

In [2]:
import numpy as np
import pandas as pd


/usr/local/lib/python3.5/dist-packages/pandas/core/computation/__init__.py:18: UserWarning: The installed version of numexpr 2.4.3 is not supported in pandas and will be not be used
The minimum supported version is 2.4.6

  ver=ver, min_ver=_MIN_NUMEXPR_VERSION), UserWarning)

In [8]:
X = np.array([[1,0,0,0],[2,1,0,0],[3,4,1,0],[5,6,7,1]])
X_r = np.array([[1,2,3,5],[2,1,4,6],[3,4,1,7],[5,6,7,1]])
Y = np.array([[1,0],[0,1],[0.5,0.5],[0.5,0.5]])
print(Y)
print(X)
M = X.dot(Y)
print(M)


[[ 1.   0. ]
 [ 0.   1. ]
 [ 0.5  0.5]
 [ 0.5  0.5]]
[[1 0 0 0]
 [2 1 0 0]
 [3 4 1 0]
 [5 6 7 1]]
[[  1.    0. ]
 [  2.    1. ]
 [  3.5   4.5]
 [  9.   10. ]]

In [13]:
print(X_r)
print()
print(X_r.dot(Y))


[[1 2 3 5]
 [2 1 4 6]
 [3 4 1 7]
 [5 6 7 1]]

[[  5.   6.]
 [  7.   6.]
 [  7.   8.]
 [  9.  10.]]

In [59]:
matrix_x_r = sc.parallelize(list(enumerate(map(lambda x: list(enumerate(x)),X_r))))
real_X_mat = CoordinateMatrix(
    matrix_x_r.flatMap(lambda x: [(x[0], *a) for a in x[1]]).map(lambda x: MatrixEntry(*x)),
    numRows=4, numCols=4)

In [66]:
matrix_x = sc.parallelize(list(enumerate(map(lambda x: list(enumerate(x)),X))))
X_mat = CoordinateMatrix(
    matrix_x.flatMap(lambda x: [(x[0], *a) for a in x[1]]).filter(lambda x: x[2] != 0.0).map(lambda x: MatrixEntry(*x)),
    numRows=4, numCols=4)
X_mat.entries.take(5)


Out[66]:
[MatrixEntry(0, 0, 1.0),
 MatrixEntry(1, 0, 2.0),
 MatrixEntry(1, 1, 1.0),
 MatrixEntry(2, 0, 3.0),
 MatrixEntry(2, 1, 4.0)]

In [64]:
matrix_y = sc.parallelize(list(enumerate(map(lambda x: list(enumerate(x)),Y))))
Y_mat = CoordinateMatrix(
    matrix_y.flatMap(lambda x: [(x[0], *a) for a in x[1]]).filter(lambda x: x[2] != 0.0).map(lambda x: MatrixEntry(*x)),
    numRows=4, numCols=2)
Y_mat.entries.collect()


Out[64]:
[MatrixEntry(0, 0, 1.0),
 MatrixEntry(1, 1, 1.0),
 MatrixEntry(2, 0, 0.5),
 MatrixEntry(2, 1, 0.5),
 MatrixEntry(3, 0, 0.5),
 MatrixEntry(3, 1, 0.5)]

In [107]:
def naive_multiplication(A:CoordinateMatrix, B:CoordinateMatrix, is_triangle=False):
    """
    A is the left matrix
    B is the right matix
    """
    if is_triangle:
        left_rdd = (A.entries
                    .flatMap(lambda x: [((x.j, x.i), x.value),((x.i, x.j), x.value)])
                    .aggregateByKey(
                        zeroValue=(0.0,0.0),
                        seqFunc=lambda x,y: (x[0]+y, x[1] + 1 ),
                        combFunc=lambda a,b: (x[0] + y[0], x[1] + y[1]))
                    .mapValues(lambda x: x[0]/x[1])
                    .map(lambda x: (x[0][0], (x[0][1], x[1])))
                   )
    else:
        left_rdd = A.entries.map(lambda x: (x.j, (x.i, x.value))) 
    right_rdd = B.entries.map(lambda x: (x.i, (x.j, x.value)))
    combined_rdd = (left_rdd
                    .join(right_rdd)
                    .map(lambda x: x[1])
                    .map(lambda x: ((x[0][0], x[1][0]), x[0][1]*x[1][1]))
                    .reduceByKey(lambda x,y: x+y)
                    .map(lambda x: MatrixEntry(i=x[0][0], j=x[0][1], value=x[1]))
                   )
    return combined_rdd

In [108]:
real_product = naive_multiplication(real_X_mat,Y_mat)
real_product.collect()


Out[108]:
[MatrixEntry(2, 0, 7.0),
 MatrixEntry(3, 0, 9.0),
 MatrixEntry(0, 1, 6.0),
 MatrixEntry(0, 0, 5.0),
 MatrixEntry(3, 1, 10.0),
 MatrixEntry(1, 0, 7.0),
 MatrixEntry(1, 1, 6.0),
 MatrixEntry(2, 1, 8.0)]

In [109]:
print(X_r.dot(Y))


[[  5.   6.]
 [  7.   6.]
 [  7.   8.]
 [  9.  10.]]

In [110]:
triangle_product = naive_multiplication(X_mat,Y_mat,is_triangle=True)
triangle_product.collect()
# tmp = X_mat.entries.flatMap(lambda x: [((x.j, x.i), x.value),((x.i, x.j), x.value)])
# tmp= tmp.aggregateByKey(
#     zeroValue=(0.0,0.0),
#     seqFunc=lambda x,y: (x[0]+y, x[1] + 1 ),
#     combFunc=lambda a,b: (x[0] + y[0], x[1] + y[1]))
# tmp.mapValues(lambda x: x[0]/x[1]).collect()


Out[110]:
[MatrixEntry(2, 0, 7.0),
 MatrixEntry(3, 0, 9.0),
 MatrixEntry(0, 1, 6.0),
 MatrixEntry(0, 0, 5.0),
 MatrixEntry(3, 1, 10.0),
 MatrixEntry(1, 0, 7.0),
 MatrixEntry(1, 1, 6.0),
 MatrixEntry(2, 1, 8.0)]

In [102]:
real_X_mat.entries.collect()


Out[102]:
[MatrixEntry(0, 0, 1.0),
 MatrixEntry(0, 1, 2.0),
 MatrixEntry(0, 2, 3.0),
 MatrixEntry(0, 3, 5.0),
 MatrixEntry(1, 0, 2.0),
 MatrixEntry(1, 1, 1.0),
 MatrixEntry(1, 2, 4.0),
 MatrixEntry(1, 3, 6.0),
 MatrixEntry(2, 0, 3.0),
 MatrixEntry(2, 1, 4.0),
 MatrixEntry(2, 2, 1.0),
 MatrixEntry(2, 3, 7.0),
 MatrixEntry(3, 0, 5.0),
 MatrixEntry(3, 1, 6.0),
 MatrixEntry(3, 2, 7.0),
 MatrixEntry(3, 3, 1.0)]

In [ ]: