In [ ]:
import pyspark
from operator import add
import numpy as np
from pyspark.sql import SparkSession
Intialize a spark instance-
In [ ]:
sc = pyspark.SparkContext(appName="spark-notebook")
ss = SparkSession(sc)
In [ ]:
myRDD = sc.textFile("file:///path/to/part3/numbers.txt", 10)
Get number of RDD partitions-
In [ ]:
myRDD.getNumPartitions()
In [ ]:
print myRDD.take(20) # get first 20 values
We will define a square function for our map operation-
In [ ]:
def square(value):
return int(value)**2
In [ ]:
newRDD = myRDD.map(square)
In [ ]:
print newRDD.collect() # get map results
In [ ]:
subRDD = newRDD.map(lambda x: (x, 1) if x%2==0 else (x, 0)) # using lamda functions
The above map function maps generated two types of key value pairs- (even, 1) and (odd, 0)
In [ ]:
print subRDD.collect()
Now define a lambda function to generate (key, value) pairs where key = even or odd depending upon the input and value = input
In [ ]:
# your code here
In [ ]:
print testRDD.take(10)
There are two types of Reduce operations- reduceByKey() and reduce(). Check PySpark documentation for more details
In [ ]:
reduced = testRDD.reduceByKey(add)
In [ ]:
print reduced.collect()
In [ ]:
sc.stop()
In [ ]:
sc = pyspark.SparkContext(appName="spark-notebook")
In [ ]:
mat = np.array([])
with open("./numbers.txt", "r") as file:
for line in file:
mat = np.hstack((mat, np.array(int(line))))
mymat = mat[:6]
In [ ]:
print mymat
In MLlib you can use Dense or Sparse matrices for computation. Create a Sparse vector for MLlib using mat-
In [ ]:
from pyspark.mllib.linalg import Vectors
sv = Vectors.sparse(6,[0, 1, 3, 4],[1, 2, 4, 5])
In [ ]:
print type(sv)
In [ ]:
from pyspark.mllib.regression import LabeledPoint
pos = LabeledPoint(1.0, mat) # label(Y) = 1 and data(X) = mymat
print pos
In [ ]:
from pyspark.mllib.linalg import Matrix, Matrices
dm = Matrices.dense(2,2,mat[7:11]) # 2x2 dense matrix
print dm
A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive.
In [ ]:
newmat = np.reshape(mat[:6], (2,3)) # 2x3 matrix
print newmat
In [ ]:
from pyspark.mllib.linalg.distributed import RowMatrix
# Create an RDD of newmat
rows = sc.parallelize(newmat)
rowmat = RowMatrix(rows)
print rowmat.numRows(), rowmat.numCols()
In [ ]:
from pyspark.mllib.linalg.distributed import BlockMatrix
# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(2,2,mat[7:11])),
((1, 0), Matrices.dense(2,2,mat[1:5]))])
# Create a BlockMatrix from an RDD of sub-matrix blocks.
mat = BlockMatrix(blocks, 2, 2) # [A | B]
print mat
In [ ]:
sc.stop()