In this NoteBook the reader finds code to read a GeoTiff file, single- or multi-band, from HDFS. It reads the GeoTiff as a ByteArray and then stores the GeoTiff in memory using MemFile from RasterIO python package. Then scipy is used to determine the SVD of a matrix multiplication between two phenology products.
With this example the user can load GeoTiffs from HDFS and then explore all the features of Python packages such as rasterio.
In [1]:
#Add all dependencies to PYTHON_PATH
import sys
sys.path.append("/usr/lib/spark/python")
sys.path.append("/usr/lib/spark/python/lib/py4j-0.10.4-src.zip")
sys.path.append("/usr/lib/python3/dist-packages")
sys.path.append("/data/local/jupyterhub/modules/python")
#Define environment variables
import os
os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "ipython"
import subprocess
#Load PySpark to connect to a Spark cluster
from pyspark import SparkConf, SparkContext
from hdfs import InsecureClient
from tempfile import TemporaryFile
#from osgeo import gdal
#To read GeoTiffs as a ByteArray
from io import BytesIO
from rasterio.io import MemoryFile
import numpy as np
import pandas
import datetime
import matplotlib.pyplot as plt
import rasterio
from rasterio import plot
from os import listdir
from os.path import isfile, join
from numpy import exp, log
from numpy.random import standard_normal
from scipy.linalg import norm, qr, svd
from lowrankproduct import lowrankproduct
from sklearn.utils.extmath import randomized_svd
In [11]:
appName = "plot_GeoTiff"
masterURL="spark://pheno0.phenovari-utwente.surf-hosted.nl:7077"
#A context needs to be created if it does not already exist
try:
sc.stop()
except NameError:
print("A new Spark Context will be created.")
sc = SparkContext(conf = SparkConf().setAppName(appName).setMaster(masterURL))
conf = sc.getConf()
In [2]:
debugMode = True
In [3]:
def dprint(msg):
if (debugMode):
print(msg)
In [4]:
def get_hdfs_client():
return InsecureClient("pheno0.phenovari-utwente.surf-hosted.nl:50070", user="pheno",
root="/")
In [5]:
def getDataSet(directoryPath):
dprint("-------------------------------")
dprint("Running getDataSet(directoryPath)")
dprint("Start time: " + str(datetime.datetime.now()))
dprint("-------------------------------")
dprint("directoryPath: " + directoryPath)
dprint("-------------------------------")
files = sc.binaryFiles(directoryPath)
fileList = files.keys().collect()
dprint("Found files: " + str(fileList))
dataArray = []
for f in fileList:
data = files.lookup(f)
dataByteArray = bytearray(data[0])
memfile = MemoryFile(dataByteArray)
dataset = memfile.open()
#relevantBand = np.uint8(dataset.read()[0])
relevantBand = np.array(dataset.read()[0])
memfile.close()
dprint("relevantBand.shape: " + str(relevantBand.shape))
flattenedDataSet = relevantBand.flatten()
dprint("flattenedDataSet.shape: " + str(flattenedDataSet.shape))
dataArray.append(flattenedDataSet)
#Pandas appends a vectors as a column to a DataFrame
dataSet = pandas.DataFrame(dataArray).T
maxDimension = max(dataSet.shape)
minDimension = min(dataSet.shape)
dataSetWithIndex = dataSet.reset_index()
dataSetWithoutNan = dataSetWithIndex.dropna(axis = 0, thresh = minDimension)
dataSetIndex = dataSetWithoutNan.index
dataSetWithoutIndex = np.array(dataSetWithoutNan.drop("index", axis = 1))
dprint("-------------------------------")
dprint("End time: " + str(datetime.datetime.now()))
dprint("Ending getDataSet(directoryPath)")
dprint("-------------------------------")
return dataSetWithoutIndex, dataSetIndex, maxDimension
In [6]:
def normDifferenceUpToSign(vector1, vector2): # Necesarry because algorithm sometimes gives back the negative of the expected result
normDifference = norm(vector1 - vector2)
if normDifference > 1:
normDifference = norm(vector1 + vector2)
return normDifference
In [7]:
def writeMode(resultDir, fileName, i, U, s, V):
inFile = "/tmp/" + fileName
outFile = resultDir + fileName
decompositionFile = open(inFile, "w")
U.T[i].tofile(decompositionFile, sep = ",")
decompositionFile.close()
decompositionFile = open(inFile, "a")
decompositionFile.write("\n")
s[i].tofile(decompositionFile, sep = ",")
decompositionFile.write("\n")
V.T[i].tofile(decompositionFile, sep = ",")
decompositionFile.close()
#Upload to HDFS
subprocess.run(['hadoop', 'dfs', '-copyFromLocal', '-f', inFile, outFile])
#Remove from /tmp/
subprocess.run(['rm', '-fr', inFile])
In [8]:
def writeCSV(resultDir, fileName, res):
inFile = "/tmp/" + fileName
outFile = resultDir + fileName
decompositionFile = open(inFile, "w")
res.T.tofile(decompositionFile, sep = ",")
decompositionFile.close()
#Upload to HDFS
subprocess.run(['hadoop', 'dfs', '-copyFromLocal', '-f', inFile, outFile])
#Remove from /tmp/
subprocess.run(['rm', '-fr', inFile])
In [9]:
def runTest(dataDirectory1, dataDirectory2, resultDir):
dprint("-------------------------------")
dprint("Running test")
dprint("Start time: " + str(datetime.datetime.now()))
dprint("-------------------------------")
dataSet1, dataSetIndex1, maxDimension1 = getDataSet(dataDirectory1)
dataSet2, dataSetIndex2, maxDimension2 = getDataSet(dataDirectory2)
dprint("dataSet1.shape: " + str(dataSet1.shape))
dprint("dataSet2.shape: " + str(dataSet2.shape))
maxDimension = max(max(dataSet1.shape), max(dataSet2.shape))
minDimension = min(min(dataSet1.shape), min(dataSet2.shape))
doFullSVD = maxDimension <= 33000
lowRankQ1, lowRankQ2, lowRankProduct = lowrankproduct(dataSet1, dataSet2, p = 0, i = 2, ifgram = False, iffast = True)
lowRankU, lowRankS, lowRankVt = svd(lowRankProduct, full_matrices = False)
lowRankV = lowRankVt.T
lowRankU2 = lowRankQ1 @ lowRankU
lowRankV2 = lowRankQ2 @ lowRankV
new_index1 = pandas.Index(range(maxDimension1), name = "index")
new_index2 = pandas.Index(range(maxDimension2), name = "index")
lowRankU3 = np.array(pandas.DataFrame(lowRankU2).reindex(dataSetIndex1).reindex(new_index1))
lowRankV3 = np.array(pandas.DataFrame(lowRankV2).reindex(dataSetIndex2).reindex(new_index2))
dprint("lowRankU.shape: " + str(lowRankU.shape))
dprint("lowRankS.shape: " + str(lowRankS.shape))
dprint("lowRankV.shape: " + str(lowRankV.shape))
dprint("lowRankU2.shape: " + str(lowRankU2.shape))
dprint("lowRankV2.shape: " + str(lowRankV2.shape))
dprint("lowRankU3.shape: " + str(lowRankU3.shape))
dprint("lowRankV3.shape: " + str(lowRankV3.shape))
dprint("Singular values of low-rank product: ")
dprint(lowRankS)
dprint("lowRankU2.T[0][:minDimension]: ")
dprint(lowRankU2.T[0][:minDimension])
dprint("lowRankV2.T[0][:minDimension]: ")
dprint(lowRankV2.T[0][:minDimension])
if doFullSVD:
fullProduct = dataSet1 @ dataSet2.T
dprint("fullProduct shape " + str(fullProduct.shape))
dprint("ncomponents " + str(minDimension))
fullU, fullS, fullVt = randomized_svd(fullProduct, n_components = minDimension)
fullV = fullVt.T
dprint("fullU.shape: " + str(fullU.shape))
dprint("fullS.shape: " + str(fullS.shape))
dprint("fullV.shape: " + str(fullV.shape))
dprint("Singular values of full product: ")
dprint(fullS)
dprint("fullU.T[0][:minDimension]: ")
dprint(fullU.T[0][:minDimension])
dprint("fullV.T[0][:minDimension]: ")
dprint(fullV.T[0][:minDimension])
for i in range(len(lowRankS)):
iString = str(i + 1).zfill(2)
if doFullSVD:
u = fullU.T[i]
v = fullV.T[i]
else:
u = dataSet1 @ (dataSet2.T @ lowRankV2.T[i]) / lowRankS[i]
v = dataSet2 @ (dataSet1.T @ lowRankU2.T[i]) / lowRankS[i]
dprint("Norm difference u" + iString + ": " + str(normDifferenceUpToSign(lowRankU2.T[i], u)))
dprint("Norm difference v" + iString + ": " + str(normDifferenceUpToSign(lowRankV2.T[i], v)))
writeMode(resultDir, "ModeWithoutNan" + iString + ".txt", i, lowRankU2, lowRankS, lowRankV2)
writeMode(resultDir, "ModeWithNan" + iString + ".txt", i, lowRankU3, lowRankS, lowRankV3)
#lowRankU2.T.tofile(resultDirectory + "/U.csv", sep = ",")
writeCSV(resultDir, "U.csv", lowRankU2)
#lowRankS.tofile(resultDirectory + "/s.csv", sep = ",")
writeCSV(resultDir, "s.csv", lowRankS)
#lowRankV2.T.tofile(resultDirectory + "/V.csv", sep = ",")
writeCSV(resultDir, "V.csv", lowRankV2)
dprint("-------------------------------")
dprint("Ending test")
dprint("End time: " + str(datetime.datetime.now()))
dprint("-------------------------------")
In [10]:
print("-------------------------------")
print("Running test 1")
print("Start time: " + str(datetime.datetime.now()))
print("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomGridmet/"
dataDirectory2 = "hdfs:///user/hadoop/spring-index/LeafGridmet/"
resultDirectory = "hdfs:///user/pheno/svd/BloomGridmetLeafGridmet/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runTest(dataDirectory1, dataDirectory2, resultDirectory)
print("-------------------------------")
print("Ending test 1")
print("End time: " + str(datetime.datetime.now()))
print("-------------------------------")
In [63]:
print("-------------------------------")
print("Running test 2")
print("Start time: " + str(datetime.datetime.now()))
print("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinalLow/"
dataDirectory2 = "hdfs:///user/hadoop/spring-index/LeafFinalLow/"
resultDirectory = "hdfs:///user/pheno/svd/BloomFinalLowLeafFinalLow/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runTest(dataDirectory1, dataDirectory2, resultDirectory)
print("-------------------------------")
print("Ending test 2")
print("End time: " + str(datetime.datetime.now()))
print("-------------------------------")
In [61]:
print("-------------------------------")
print("Running test 3")
print("Start time: " + str(datetime.datetime.now()))
print("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinalLow/"
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOSTLow/"
resultDirectory = "hdfs:///user/pheno/svd/BloomFinalLowSOSTLow/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runTest(dataDirectory1, dataDirectory2, resultDirectory)
print("-------------------------------")
print("Ending test 3")
print("End time: " + str(datetime.datetime.now()))
print("-------------------------------")
In [62]:
print("-------------------------------")
print("Running test 4")
print("Start time: " + str(datetime.datetime.now()))
print("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/LeafFinalLow/"
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOSTLow/"
resultDirectory = "hdfs:///user/pheno/svd/LeafFinalLowSOSTLow/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runTest(dataDirectory1, dataDirectory2, resultDirectory)
print("-------------------------------")
print("Ending test 4")
print("End time: " + str(datetime.datetime.now()))
print("-------------------------------")
In [67]:
print("-------------------------------")
print("Running test 5")
print("Start time: " + str(datetime.datetime.now()))
print("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinalLowPR/"
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOSTLowPR/"
resultDirectory = "hdfs:///user/pheno/svd/BloomFinalLowPRSOSTLowPR/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runTest(dataDirectory1, dataDirectory2, resultDirectory)
print("-------------------------------")
print("Ending test 5")
print("End time: " + str(datetime.datetime.now()))
print("-------------------------------")
In [13]:
print("-------------------------------")
print("Running test 6")
print("Start time: " + str(datetime.datetime.now()))
print("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinal/"
dataDirectory2 = "hdfs:///user/hadoop/spring-index/LeafFinal/"
resultDirectory = "hdfs:///user/pheno/svd/BloomFinalLeafFinal/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runTest(dataDirectory1, dataDirectory2, resultDirectory)
print("-------------------------------")
print("Ending test 6")
print("End time: " + str(datetime.datetime.now()))
print("-------------------------------")
In [5]:
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinalLow/"
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOSTLow/"
resultDirectory = "hdfs:///user/pheno/svd/BloomFinalLowSOSTLow/"
dataSet1, dataSetIndex1, maxDimension1 = getDataSet(dataDirectory1)
dataSet2, dataSetIndex2, maxDimension2 = getDataSet(dataDirectory2)
print("dataSet1.shape: " + str(dataSet1.shape))
print("dataSet2.shape: " + str(dataSet2.shape))
In [6]:
fullProduct = dataSet1 @ dataSet2.T
In [7]:
fullProduct.shape
Out[7]:
In [8]:
minDimension = min(min(dataSet1.shape), min(dataSet2.shape))
randU, randS, randVt = randomized_svd(fullProduct, n_components=minDimension)
In [ ]:
#normU, normS, normVt = svd(fullProduct, full_matrices = True)
normU, normS, normVt = svd(fullProduct, full_matrices = False)
In [ ]: