Compare SVD with R-SVD

With this example the user can load GeoTiffs from HDFS and then explore all the features of Python packages such as rasterio.

Dependencies


In [1]:
#Add all dependencies to PYTHON_PATH
import sys
sys.path.append("/usr/lib/spark/python")
sys.path.append("/usr/lib/spark/python/lib/py4j-0.10.4-src.zip")
sys.path.append("/usr/lib/python3/dist-packages")
sys.path.append("/data/local/jupyterhub/modules/python")

#Define environment variables
import os
os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "ipython"

#Load PySpark to connect to a Spark cluster
from pyspark import SparkConf, SparkContext

from osgeo import gdal
import numpy as np
import matplotlib.pyplot as plt
from pylab import *
#To read GeoTiffs as a ByteArray
from io import BytesIO
from rasterio.io import MemoryFile

from numpy import exp, log
from numpy.random import standard_normal
from scipy.linalg import norm, qr, svd
from productsvd import qrproductsvd
from sklearn.utils.extmath import randomized_svd

Connect to Spark


In [2]:
appName = "compare_SVD_RSVD"
#masterURL="spark://pheno0.phenovari-utwente.surf-hosted.nl:7077"
masterURL="spark://emma0.emma.nlesc.nl:7077"

#A context needs to be created if it does not already exist
try:
    sc.stop()
except NameError:
    print("A  new Spark Context will be created.")
    
sc = SparkContext(conf = SparkConf().setAppName(appName).setMaster(masterURL))


A  new Spark Context will be created.

Configuration

This configuration determines whether functions print logs during the execution.


In [3]:
debugMode = True

Support functions


In [4]:
def diff(first, second):
    second = set(second)
    return [item for item in first if item not in second]

In [5]:
def dprint(msg):
    if (debugMode):
        print(str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + " | " + msg)

In [6]:
def progressBar(message, value, endvalue, bar_length = 20):
    if (debugMode):
        percent = float(value) / endvalue
        arrow = '-' * int(round(percent * bar_length)-1) + '>'
        spaces = ' ' * (bar_length - len(arrow))
        sys.stdout.write("\r" 
                         + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) 
                         + " | " 
                         + message 
                         + ": [{0}] {1}%".format(arrow + spaces, int(round(percent * 100)))
                        )
        if value == endvalue:
            sys.stdout.write("\n")
        sys.stdout.flush()

Read data

SVD


In [7]:
resultDirectory = "hdfs:///user/emma/svd/spark/BloomGridmetLeafGridmet3/"
EigenVal_path = "S.csv"
svd_eigenvalues = sc.textFile(resultDirectory + EigenVal_path)
EigenVector_path = "U.csv"
svd_U_vector = sc.textFile(resultDirectory + EigenVector_path)
EigenVector_path = "V.csv"
svd_V_vector = sc.textFile(resultDirectory + EigenVector_path)

In [8]:
print(str(svd_eigenvalues.map(lambda line: (line.split(','))).collect()))


[['1.2051734846237376E11', '0.0', '0.0'], ['0.0', '1.573150206334602E8', '0.0'], ['0.0', '0.0', '9.354714854448391E7']]

R-SVD


In [9]:
resultDirectory = "hdfs:///user/emma/svd/BloomGridmetLeafGridmet/"
EigenVal_path = "s.csv"
rsvd_eigenvalues = sc.textFile(resultDirectory + EigenVal_path)
EigenVector_path = "U.csv"
rsvd_U_vector = sc.textFile(resultDirectory + EigenVector_path)
EigenVector_path = "V.csv"
rsvd_V_vector = sc.textFile(resultDirectory + EigenVector_path)

In [ ]:
print(str(rsvd_eigenvalues.map(lambda line: (line.split(','))).collect()))


[['178024103475.1254', '119077846.9692295', '93428727.56734921', '72721591.82347083', '65841756.54620246', '59404710.40743541', '51631282.069282435', '49513903.9558733', '47108951.885175385', '38968643.61893467', '37964497.6647184', '30864611.430567198', '29702882.837573484', '28846459.46894772', '26868885.05725242', '25634563.017505467', '21866384.033159643', '21278468.657219097', '19666948.232925273', '19320181.569853082', '17596932.24841928', '17076063.727075674', '16459195.201867793', '15707948.425088547', '15403646.200111331', '14583482.312037304', '13950787.830484996', '13511287.22838203', '13102844.783520581', '12662716.742046334', '12304538.159589903', '11636398.91255648', '10780714.878158744', '10023492.753480507', '9844759.910824766', '9408355.309497282', '9022289.716617534']]

Validate Norms

U


In [ ]:
svd_U = svd_U_vector.map(lambda line: (line.split(','))).collect()
svd_U_np = np.asarray(svd_U).astype(float)

rsvd_U = rsvd_U_vector.map(lambda line: (line.split(','))).collect()
rsvd_U_np = np.asarray(rsvd_U).astype(float)
rsvd_U_np_T = np.reshape(rsvd_U_np,(37,483850)).T

In [ ]:
for pc in range(0,3):
    print(norm(svd_U_np[:,pc] - rsvd_U_np_T[:,pc]))

V


In [ ]:
svd_V = svd_V_vector.map(lambda line: (line.split(','))).collect()
svd_V_np = np.asarray(svd_U).astype(float)

rsvd_V = rsvd_V_vector.map(lambda line: (line.split(','))).collect()
rsvd_V_np = np.asarray(rsvd_V).astype(float)
rsvd_V_np_T = np.reshape(rsvd_V_np,(37,483850)).T

In [ ]:
for pc in range(0,3):
    print(norm(svd_V_np[:,pc] - rsvd_V_np_T[:,pc]))

In [ ]: