SVD Python

In this NoteBook the reader finds code to read a GeoTiff file, single- or multi-band, from HDFS. It reads the GeoTiff as a ByteArray and then stores the GeoTiff in memory using MemFile from RasterIO python package. Then scipy is used to determine the SVD of a matrix multiplication between two phenology products.

With this example the user can load GeoTiffs from HDFS and then explore all the features of Python packages such as rasterio.


In [1]:
#Add all dependencies to PYTHON_PATH
import sys

#Define environment variables
import os
os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "ipython"

import subprocess

#Load PySpark to connect to a Spark cluster
from pyspark import SparkConf, SparkContext
from hdfs import InsecureClient
from tempfile import TemporaryFile

#from osgeo import gdal
#To read GeoTiffs as a ByteArray
from io import BytesIO
from import MemoryFile

import numpy as np
import pandas
import datetime
import matplotlib.pyplot as plt
import rasterio
from rasterio import plot
from os import listdir
from os.path import isfile, join
from numpy import exp, log
from numpy.random import standard_normal
from scipy.linalg import norm, qr, svd
from lowrankproduct import lowrankproduct
from sklearn.utils.extmath import randomized_svd

Connect to Spark

In [11]:
appName = "plot_GeoTiff"

#A context needs to be created if it does not already exist
except NameError:
    print("A new Spark Context will be created.")

sc = SparkContext(conf = SparkConf().setAppName(appName).setMaster(masterURL))
conf = sc.getConf()

A new Spark Context will be created.
In [2]:
debugMode = True

Support functions

In [3]:
def dprint(msg):
    if (debugMode):

In [4]:
def get_hdfs_client():
    return InsecureClient("", user="pheno",

Read GeoTiffs

In [5]:
def getDataSet(directoryPath):
    dprint("Running getDataSet(directoryPath)")
    dprint("Start time: " + str(
    dprint("directoryPath: " + directoryPath)
    files = sc.binaryFiles(directoryPath)
    fileList = files.keys().collect()
    dprint("Found files: " + str(fileList))
    dataArray = []
    for f in fileList:
        data = files.lookup(f)
        dataByteArray = bytearray(data[0])
        memfile = MemoryFile(dataByteArray)
        dataset =
        #relevantBand = np.uint8([0])
        relevantBand = np.array([0])
        dprint("relevantBand.shape: " + str(relevantBand.shape))
        flattenedDataSet = relevantBand.flatten()
        dprint("flattenedDataSet.shape: " + str(flattenedDataSet.shape))
    #Pandas appends a vectors as a column to a DataFrame
    dataSet = pandas.DataFrame(dataArray).T
    maxDimension = max(dataSet.shape)
    minDimension = min(dataSet.shape)
    dataSetWithIndex = dataSet.reset_index()
    dataSetWithoutNan = dataSetWithIndex.dropna(axis = 0, thresh = minDimension)
    dataSetIndex = dataSetWithoutNan.index
    dataSetWithoutIndex = np.array(dataSetWithoutNan.drop("index", axis = 1))
    dprint("End time: " + str(
    dprint("Ending getDataSet(directoryPath)")
    return dataSetWithoutIndex, dataSetIndex, maxDimension

In [6]:
def normDifferenceUpToSign(vector1, vector2): # Necesarry because algorithm sometimes gives back the negative of the expected result
    normDifference = norm(vector1 - vector2)
    if normDifference > 1:
            normDifference = norm(vector1 + vector2)
    return normDifference

In [7]:
def writeMode(resultDir, fileName, i, U, s, V): 
    inFile = "/tmp/" + fileName
    outFile = resultDir + fileName
    decompositionFile = open(inFile, "w")
    U.T[i].tofile(decompositionFile, sep = ",")
    decompositionFile = open(inFile, "a")
    s[i].tofile(decompositionFile, sep = ",")
    V.T[i].tofile(decompositionFile, sep = ",")
    #Upload to HDFS['hadoop', 'dfs', '-copyFromLocal', '-f', inFile, outFile])  

    #Remove from /tmp/['rm', '-fr', inFile])

In [8]:
def writeCSV(resultDir, fileName, res):
    inFile = "/tmp/" + fileName
    outFile = resultDir + fileName
    decompositionFile = open(inFile, "w")
    res.T.tofile(decompositionFile, sep = ",")
    #Upload to HDFS['hadoop', 'dfs', '-copyFromLocal', '-f', inFile, outFile])  

    #Remove from /tmp/['rm', '-fr', inFile])

In [9]:
def runTest(dataDirectory1, dataDirectory2, resultDir):
    dprint("Running test")
    dprint("Start time: " + str(

    dataSet1, dataSetIndex1, maxDimension1 = getDataSet(dataDirectory1)
    dataSet2, dataSetIndex2, maxDimension2 = getDataSet(dataDirectory2)
    dprint("dataSet1.shape: " + str(dataSet1.shape))
    dprint("dataSet2.shape: " + str(dataSet2.shape))
    maxDimension = max(max(dataSet1.shape), max(dataSet2.shape))
    minDimension = min(min(dataSet1.shape), min(dataSet2.shape))
    doFullSVD = maxDimension <= 33000
    lowRankQ1, lowRankQ2, lowRankProduct = lowrankproduct(dataSet1, dataSet2, p = 0, i = 2, ifgram = False, iffast = True)
    lowRankU, lowRankS, lowRankVt = svd(lowRankProduct, full_matrices = False)
    lowRankV = lowRankVt.T
    lowRankU2 = lowRankQ1 @ lowRankU
    lowRankV2 = lowRankQ2 @ lowRankV
    new_index1 = pandas.Index(range(maxDimension1), name = "index")
    new_index2 = pandas.Index(range(maxDimension2), name = "index")
    lowRankU3 = np.array(pandas.DataFrame(lowRankU2).reindex(dataSetIndex1).reindex(new_index1))
    lowRankV3 = np.array(pandas.DataFrame(lowRankV2).reindex(dataSetIndex2).reindex(new_index2))
    dprint("lowRankU.shape: " + str(lowRankU.shape))
    dprint("lowRankS.shape: " + str(lowRankS.shape))
    dprint("lowRankV.shape: " + str(lowRankV.shape))
    dprint("lowRankU2.shape: " + str(lowRankU2.shape))
    dprint("lowRankV2.shape: " + str(lowRankV2.shape))
    dprint("lowRankU3.shape: " + str(lowRankU3.shape))
    dprint("lowRankV3.shape: " + str(lowRankV3.shape))
    dprint("Singular values of low-rank product: ")
    dprint("lowRankU2.T[0][:minDimension]: ")
    dprint("lowRankV2.T[0][:minDimension]: ")
    if doFullSVD:
        fullProduct = dataSet1 @ dataSet2.T
        dprint("fullProduct shape " + str(fullProduct.shape))
        dprint("ncomponents " + str(minDimension))
        fullU, fullS, fullVt = randomized_svd(fullProduct, n_components = minDimension)
        fullV = fullVt.T
        dprint("fullU.shape: " + str(fullU.shape))
        dprint("fullS.shape: " + str(fullS.shape))
        dprint("fullV.shape: " + str(fullV.shape))
        dprint("Singular values of full product: ")
        dprint("fullU.T[0][:minDimension]: ")
        dprint("fullV.T[0][:minDimension]: ")
    for i in range(len(lowRankS)):
        iString = str(i + 1).zfill(2)
        if doFullSVD:
            u = fullU.T[i]
            v = fullV.T[i]
            u = dataSet1 @ (dataSet2.T @ lowRankV2.T[i]) / lowRankS[i]
            v = dataSet2 @ (dataSet1.T @ lowRankU2.T[i]) / lowRankS[i]
        dprint("Norm difference u" + iString + ": " + str(normDifferenceUpToSign(lowRankU2.T[i], u)))
        dprint("Norm difference v" + iString + ": " + str(normDifferenceUpToSign(lowRankV2.T[i], v)))
        writeMode(resultDir, "ModeWithoutNan" + iString + ".txt", i, lowRankU2, lowRankS, lowRankV2)
        writeMode(resultDir, "ModeWithNan" + iString + ".txt", i, lowRankU3, lowRankS, lowRankV3)
    #lowRankU2.T.tofile(resultDirectory + "/U.csv", sep = ",")
    writeCSV(resultDir, "U.csv", lowRankU2)
    #lowRankS.tofile(resultDirectory + "/s.csv", sep = ",")
    writeCSV(resultDir, "s.csv", lowRankS)
    #lowRankV2.T.tofile(resultDirectory + "/V.csv", sep = ",")
    writeCSV(resultDir, "V.csv", lowRankV2)

    dprint("Ending test")
    dprint("End time: " + str(


Test 1

In [10]:
print("Running test 1")
print("Start time: " + str(

dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomGridmet/"
dataDirectory2 = "hdfs:///user/hadoop/spring-index/LeafGridmet/"
resultDirectory = "hdfs:///user/pheno/svd/BloomGridmetLeafGridmet/"

#Create Result dir['hadoop', 'dfs', '-mkdir', resultDirectory])

runTest(dataDirectory1, dataDirectory2, resultDirectory)

print("Ending test 1")
print("End time: " + str(

Running test 1
Start time: 2017-11-03 12:01:43.049522
Running test
Start time: 2017-11-03 12:01:46.405227
Running getDataSet(directoryPath)
Start time: 2017-11-03 12:01:46.405997
directoryPath: hdfs:///user/hadoop/spring-index/BloomGridmet/
Test 2

In [63]:
print("Running test 2")
print("Start time: " + str(

dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinalLow/"
dataDirectory2 = "hdfs:///user/hadoop/spring-index/LeafFinalLow/"
resultDirectory = "hdfs:///user/pheno/svd/BloomFinalLowLeafFinalLow/"

#Create Result dir['hadoop', 'dfs', '-mkdir', resultDirectory])

runTest(dataDirectory1, dataDirectory2, resultDirectory)

print("Ending test 2")
print("End time: " + str(

Running test 2
Start time: 2017-10-27 13:16:20.092836
Running lowrankproduct(dataSet1, dataSet2, p, i, ifgram, iffast)
Start time: 2017-10-27 13:16:55.997545
dataSet1.shape: (31089, 26)
dataSet2.shape: (31089, 26)
p: 0
i: 2
ifgram: False
iffast: True
k: 26
l: 26
lowRankQ1.shape: (31089, 26)
lowRankQ2.shape: (31089, 26)
B1.shape: (26, 26)
B2.shape: (26, 26)
lowRankProduct.shape: (26, 26)
Ending lowrankproductsvd(dataSet1, dataSet2, p, i, ifgram, iffast)
End time: 2017-10-27 13:17:06.911020
Ending test 2
End time: 2017-10-27 13:19:24.926910

Test 3

In [61]:
print("Running test 3")
print("Start time: " + str(

dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinalLow/"
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOSTLow/"
resultDirectory = "hdfs:///user/pheno/svd/BloomFinalLowSOSTLow/"

#Create Result dir['hadoop', 'dfs', '-mkdir', resultDirectory])

runTest(dataDirectory1, dataDirectory2, resultDirectory)

print("Ending test 3")
print("End time: " + str(

Running test 3
Start time: 2017-10-27 13:06:04.778482
Running lowrankproduct(dataSet1, dataSet2, p, i, ifgram, iffast)
Start time: 2017-10-27 13:06:41.716855
dataSet1.shape: (31089, 26)
dataSet2.shape: (31524, 26)
p: 0
i: 2
ifgram: False
iffast: True
k: 26
l: 26
lowRankQ1.shape: (31089, 26)
lowRankQ2.shape: (31524, 26)
B1.shape: (26, 26)
B2.shape: (26, 26)
lowRankProduct.shape: (26, 26)
Ending lowrankproductsvd(dataSet1, dataSet2, p, i, ifgram, iffast)
End time: 2017-10-27 13:06:51.317871
Ending test 3
End time: 2017-10-27 13:09:08.199955

Test 4

In [62]:
print("Running test 4")
print("Start time: " + str(

dataDirectory1 = "hdfs:///user/hadoop/spring-index/LeafFinalLow/"
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOSTLow/"
resultDirectory = "hdfs:///user/pheno/svd/LeafFinalLowSOSTLow/"

#Create Result dir['hadoop', 'dfs', '-mkdir', resultDirectory])

runTest(dataDirectory1, dataDirectory2, resultDirectory)

print("Ending test 4")
print("End time: " + str(

Running test 4
Start time: 2017-10-27 13:11:26.834135
Running lowrankproduct(dataSet1, dataSet2, p, i, ifgram, iffast)
Start time: 2017-10-27 13:12:03.052125
dataSet1.shape: (31089, 26)
dataSet2.shape: (31524, 26)
p: 0
i: 2
ifgram: False
iffast: True
k: 26
l: 26
lowRankQ1.shape: (31089, 26)
lowRankQ2.shape: (31524, 26)
B1.shape: (26, 26)
B2.shape: (26, 26)
lowRankProduct.shape: (26, 26)
Ending lowrankproductsvd(dataSet1, dataSet2, p, i, ifgram, iffast)
End time: 2017-10-27 13:12:14.168737
Ending test 4
End time: 2017-10-27 13:14:30.913878

Test 5

In [67]:
print("Running test 5")
print("Start time: " + str(

dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinalLowPR/"
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOSTLowPR/"
resultDirectory = "hdfs:///user/pheno/svd/BloomFinalLowPRSOSTLowPR/"

#Create Result dir['hadoop', 'dfs', '-mkdir', resultDirectory])

runTest(dataDirectory1, dataDirectory2, resultDirectory)

print("Ending test 5")
print("End time: " + str(

Running test 5
Start time: 2017-10-27 16:46:10.109739
Test 6

In [13]:
print("Running test 6")
print("Start time: " + str(

dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinal/"
dataDirectory2 = "hdfs:///user/hadoop/spring-index/LeafFinal/"
resultDirectory = "hdfs:///user/pheno/svd/BloomFinalLeafFinal/"

#Create Result dir['hadoop', 'dfs', '-mkdir', resultDirectory])

runTest(dataDirectory1, dataDirectory2, resultDirectory)

print("Ending test 6")
print("End time: " + str(

Running test 6
Start time: 2017-10-30 12:48:04.816080
Running test
Start time: 2017-10-30 12:48:06.428073
Running getDataSet(directoryPath)
Start time: 2017-10-30 12:48:06.428889
directoryPath: hdfs:///user/hadoop/spring-index/BloomFinal/
Found files: ['hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://']
End time: 2017-10-30 13:18:27.284805
Ending getDataSet(directoryPath)
Running getDataSet(directoryPath)
Start time: 2017-10-30 13:18:27.488375
directoryPath: hdfs:///user/hadoop/spring-index/LeafFinal/
Found files: ['hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://', 'hdfs://']
End time: 2017-10-30 13:50:15.884963
Ending getDataSet(directoryPath)
dataSet1.shape: (11819120, 36)
dataSet2.shape: (11819636, 36)
Running lowrankproduct(dataSet1, dataSet2, p, i, ifgram, iffast)
Start time: 2017-10-30 13:50:15.975175
dataSet1.shape: (11819120, 36)
dataSet2.shape: (11819636, 36)
p: 0
i: 2
ifgram: False
iffast: True
k: 36
l: 36
SVD test ground

In [5]:
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinalLow/"
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOSTLow/"
resultDirectory = "hdfs:///user/pheno/svd/BloomFinalLowSOSTLow/"

dataSet1, dataSetIndex1, maxDimension1 = getDataSet(dataDirectory1)
dataSet2, dataSetIndex2, maxDimension2 = getDataSet(dataDirectory2)

print("dataSet1.shape: " + str(dataSet1.shape))
print("dataSet2.shape: " + str(dataSet2.shape))

(31524, 26)
(31524, 26)
dataSet1.shape: (31089, 26)
dataSet2.shape: (31524, 26)

In [6]:
fullProduct = dataSet1 @ dataSet2.T

In [7]:

(31089, 31524)

In [8]:
minDimension = min(min(dataSet1.shape), min(dataSet2.shape))
randU, randS, randVt = randomized_svd(fullProduct, n_components=minDimension)

In [ ]:
#normU, normS, normVt = svd(fullProduct, full_matrices = True)
normU, normS, normVt = svd(fullProduct, full_matrices = False)

In [ ]: