In this NoteBook, the reader will find code to load GeoTiff files, single- or multi-band, from HDFS. It reads the GeoTiffs as a ByteArrays and then stores the GeoTiffs in memory using MemFile from the RasterIO Python package. Subsequently, a statistical analysis is performed on each pair of datasets. In particular, the Python module productsvd is used to determine the SVD of the product of the two phenology datasets.
In [75]:
#Add all dependencies to PYTHON_PATH
import sys
sys.path.append("/usr/lib/spark/python")
sys.path.append("/usr/lib/spark/python/lib/py4j-0.10.4-src.zip")
sys.path.append("/usr/lib/python3/dist-packages")
sys.path.append("/data/local/jupyterhub/modules/python")
#Define environment variables
import os
os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "ipython"
import subprocess
#Load PySpark to connect to a Spark cluster
from pyspark import SparkConf, SparkContext
from hdfs import InsecureClient
from tempfile import TemporaryFile
#from osgeo import gdal
#To read GeoTiffs as a ByteArray
from io import BytesIO
from rasterio.io import MemoryFile
import numpy as np
import pandas
import datetime
import matplotlib.pyplot as plt
import rasterio
from rasterio import plot
from os import listdir
from os.path import isfile, join
from numpy import exp, log
from numpy.random import standard_normal
import scipy.linalg
from productsvd import qrproductsvd
from sklearn.utils.extmath import randomized_svd
In [76]:
debugMode = True
maxModes = 26
In [77]:
appName = "plot_GeoTiff"
masterURL = "spark://pheno0.phenovari-utwente.surf-hosted.nl:7077"
#A context needs to be created if it does not already exist
try:
sc.stop()
except NameError:
print("A new Spark Context will be created.")
sc = SparkContext(conf = SparkConf().setAppName(appName).setMaster(masterURL))
conf = sc.getConf()
In [78]:
def dprint(msg):
if (debugMode):
print(str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + " | " + msg)
In [79]:
def progressBar(message, value, endvalue, bar_length = 20):
if (debugMode):
percent = float(value) / endvalue
arrow = '-' * int(round(percent * bar_length)-1) + '>'
spaces = ' ' * (bar_length - len(arrow))
sys.stdout.write("\r"
+ str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+ " | "
+ message
+ ": [{0}] {1}%".format(arrow + spaces, int(round(percent * 100)))
)
if value == endvalue:
sys.stdout.write("\n")
sys.stdout.flush()
In [80]:
def get_hdfs_client():
return InsecureClient("emma0.emma.nlesc.nl:50070", user="pheno",
root="/")
In [81]:
def getDataSet(directoryPath, bandNum):
dprint("Running getDataSet(directoryPath)")
files = sc.binaryFiles(directoryPath + "/*.tif")
fileList = files.keys().collect()
dprint("Number of files: " + str(len(fileList)))
dataSet = []
plotShapes = []
flattenedShapes = []
for i, f in enumerate(sorted(fileList)):
print(f)
#progressBar("Reading files", i + 1, len(fileList))
data = files.lookup(f)
dataByteArray = bytearray(data[0])
memfile = MemoryFile(dataByteArray)
dataset = memfile.open()
relevantBand = np.array(dataset.read()[bandNum])
memfile.close()
plotShapes.append(relevantBand.shape)
flattenedDataSet = relevantBand.flatten()
flattenedShapes.append(flattenedDataSet.shape)
dataSet.append(flattenedDataSet)
dataSet = np.array(dataSet).T
dprint("dataSet.shape: " + str(dataSet.shape))
dprint("Ending getDataSet(directoryPath)")
return dataSet
In [82]:
def getMask(filePath):
dprint("Running getMask(filePath)")
mask_data = sc.binaryFiles(filePath).take(1)
mask_byteArray = bytearray(mask_data[0][1])
mask_memfile = MemoryFile(mask_byteArray)
mask_dataset = mask_memfile.open()
maskTransform = mask_dataset.transform
mask_data = np.array(mask_dataset.read()[0])
mask_memfile.close()
dprint("mask_data.shape: " + str(mask_data.shape))
dprint("Ending getMask(filePath)")
return mask_data, maskTransform
In [83]:
def filterDataSet(dataSet, maskData):
dprint("Running filterDataSet(dataSet, maskIndex)")
maskIndex = np.nonzero(np.nan_to_num(maskData.flatten()))[0]
dataSetFiltered = dataSet[maskIndex]
dprint("dataSetFiltered.shape: " + str(dataSetFiltered.shape))
dprint("Ending filterDataSet(dataSet, maskIndex)")
return dataSetFiltered
In [84]:
def validateNorms(dataSet1, dataSet2, U, s, V):
dprint("Running validateNorms(dataSet1, dataSet2, U, s, V)")
length = len(s)
norms = []
for i in range(length):
progressBar("Validating norms", i + 1, length)
u = dataSet1 @ (dataSet2.T @ V.T[i]) / s[i]
v = dataSet2 @ (dataSet1.T @ U.T[i]) / s[i]
norms.append(scipy.linalg.norm(U.T[i] - u))
norms.append(scipy.linalg.norm(V.T[i] - v))
dprint("Largest norm difference: " + str(max(norms)))
dprint("Ending validateNorms(dataSet1, dataSet2, U, s, V)")
In [85]:
def writeCSVs(resultDirectory, U, s, V):
dprint("Running writeCSV(resultDirectory, U, s, V)")
for i, vectorData in enumerate([U, s, V]):
progressBar("Writing CSV", i + 1, 3)
fileName = ["U", "s", "V"][i] + ".csv"
inFile = "/tmp/" + fileName
outFile = resultDirectory + fileName
#decompositionFile = open(inFile, "w")
#vectorData.T.tofile(decompositionFile, sep = ",")
#decompositionFile.close()
#np.savetxt(inFile, vectorData.T, fmt='%.12f', delimiter=',')
np.savetxt(inFile, vectorData.T, delimiter=',')
#Upload to HDFS
subprocess.run(['hadoop', 'dfs', '-copyFromLocal', '-f', inFile, outFile])
#Remove from /tmp/
subprocess.run(['rm', '-fr', inFile])
dprint("Ending writeCSV(resultDirectory, U, s, V)")
In [86]:
def plotSingularValues(resultDirectory, s):
dprint("Running plotSingularValues(resultDirectory, s)")
fileName = "s.pdf"
inFile = "/tmp/" + fileName
outFile = resultDirectory + fileName
x = range(len(s))
total = s.T @ s
cumulativeValue = 0
valueList = []
cumulativeList = []
for i in x:
value = np.square(s[i]) / total
valueList.append(value)
cumulativeValue = cumulativeValue + value
cumulativeList.append(cumulativeValue)
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
ax1.plot(x, valueList, "g^")
ax2.plot(x, cumulativeList, "ro")
ax1.set_xlabel("Singular values")
ax1.set_ylabel("Variance explained", color = "g")
ax2.set_ylabel("Cumulative variance explained", color = "r")
plt.savefig(inFile)
plt.clf()
#Upload to HDFS
subprocess.run(['hadoop', 'dfs', '-copyFromLocal', '-f', inFile, outFile])
#Remove from /tmp/
subprocess.run(['rm', '-fr', inFile])
dprint("Ending plotSingularValues(resultDirectory, s)")
In [87]:
def writeModes(resultDirectory, U, s, V):
dprint("Running writeModes(resultDirectory, U, s, V)")
for i in range(len(s)):
progressBar("Writing modes", i + 1, len(s))
fileName = "Mode" + str(i + 1).zfill(2) + ".txt"
inFile = "/tmp/" + fileName
outFile = resultDirectory + fileName
decompositionFile = open(inFile, "w")
U.T[i].tofile(decompositionFile, sep = ",")
decompositionFile.close()
decompositionFile = open(inFile, "a")
decompositionFile.write("\n")
s[i].tofile(decompositionFile, sep = ",")
decompositionFile.write("\n")
V.T[i].tofile(decompositionFile, sep = ",")
decompositionFile.close()
#Upload to HDFS
subprocess.run(['hadoop', 'dfs', '-copyFromLocal', '-f', inFile, outFile])
#Remove from /tmp/
subprocess.run(['rm', '-fr', inFile])
dprint("Ending writeModes(resultDirectory, U, s, V)")
In [88]:
def plotModes(resultDirectory, U, s, V, maskData, maskTransform):
dprint("Running plotModes(resultDirectory, U, s, V, maskData, maskTransform)")
plotTemplate = np.full(maskData.shape[0] * maskData.shape[1], np.nan, dtype=np.float64)
maskIndex = np.nonzero(np.nan_to_num(maskData.flatten()))[0]
for i in range(min(maxModes, len(s))):
progressBar("Plotting modes", i + 1, min(maxModes, len(s)))
for vectorData, vectorName in zip([U, V], ["U", "V"]):
data = np.copy(plotTemplate)
np.put(data, maskIndex, vectorData.T[i])
data = np.reshape(data, maskData.shape, )
fileName = "Mode" + vectorName + str(i + 1).zfill(2) + ".tif"
inFile = "/tmp/" + fileName
outFile = resultDirectory + fileName
rasterioPlot = rasterio.open(inFile, "w", driver = "GTiff", width = data.shape[1], height = data.shape[0], count = 1, dtype = data.dtype, crs = "EPSG:4326", transform = maskTransform) #, compress="deflate")
rasterioPlot.write(data, 1)
rasterioPlot.close()
#Upload to HDFS
subprocess.run(['hadoop', 'dfs', '-copyFromLocal', '-f', inFile, outFile])
#Remove from /tmp/
subprocess.run(['rm', '-fr', inFile])
dprint("Ending plotModes(resultDirectory, U, s, V, maskData, maskTransform)")
In [89]:
import scipy.linalg
import numpy as np
from numpy import linalg as LA
from sklearn.decomposition import PCA
def qrproductsvdRG(A, B):
QA, RA = scipy.linalg.qr(A, mode = "economic")
dprint("QB.shape: " + str(QA.shape))
dprint("RB.shape: " + str(RA.shape))
QB, RB = scipy.linalg.qr(B, mode = "economic")
dprint("QB.shape: " + str(QB.shape))
dprint("RB.shape: " + str(RB.shape))
#C = RA @ RB.T
C = A @ B.T
dprint("C.shape: " + str(C.shape))
#UC, s, VCt = scipy.linalg.svd(C, full_matrices = False)
U, s, Vt = scipy.linalg.svd(C, full_matrices = True)
#U = QA @ UC
#Vt = VCt @ QB.T
return U, s, Vt
In [90]:
def runAnalysis(dataDirectory1, dataDirectory2, bandNum1, bandNum2, maskFile, resultDirectory):
dprint("Running runAnalysis(dataDirectory1, dataDirectory2, maskFile, resultDirectory)")
dataSet1 = getDataSet(dataDirectory1, bandNum1)
dataSet2 = getDataSet(dataDirectory2, bandNum2)
if (dataSet2.shape[1] == 26 and dataSet1.shape[1] != 26): # Hack to align time-dimension of SOS with Bloom and Leaf
dataSet1 = dataSet1[:, 9:35]
maskData, maskTransform = getMask(maskFile)
dataSetFiltered1 = filterDataSet(dataSet1, maskData)
dataSetFiltered2 = filterDataSet(dataSet2, maskData)
U, s, Vt = qrproductsvd(dataSetFiltered1, dataSetFiltered2)
V = Vt.T
dprint("U.shape: " + str(U.shape))
dprint("s.shape: " + str(s.shape))
dprint("V.shape: " + str(V.shape))
dprint("Singular values of product: ")
dprint(str(s))
validateNorms(dataSetFiltered1, dataSetFiltered2, U, s, V)
plotSingularValues(resultDirectory, s)
#writeModes(resultDirectory, U, s, V)
plotModes(resultDirectory, U, s, V, maskData, maskTransform)
writeCSVs(resultDirectory, U, s, V)
dprint("Ending runAnalysis(dataDirectory1, dataDirectory2, maskFile, resultDirectory)")
In [57]:
dprint("-------------------------------")
dprint("Running analysis 0")
dprint("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomGridmet/"
bandNum1 = 3
dataDirectory2 = "hdfs:///user/hadoop/spring-index/LeafGridmet/"
bandNum2 = 3
maskFile = "hdfs:///user/hadoop/usa_state_masks/california_4km.tif"
resultDirectory = "hdfs:///user/emma/svd/BloomGridmetLeafGridmetCali/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runAnalysis(dataDirectory1, dataDirectory2, bandNum1, bandNum2, maskFile, resultDirectory)
dprint("-------------------------------")
dprint("Ending analysis 0")
dprint("-------------------------------")
In [58]:
dprint("-------------------------------")
dprint("Running analysis 1")
dprint("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomGridmet/"
bandNum1 = 3
dataDirectory2 = "hdfs:///user/hadoop/spring-index/LeafGridmet/"
bandNum2 = 3
maskFile = "hdfs:///user/hadoop/usa_mask_gridmet.tif"
resultDirectory = "hdfs:///user/emma/svd/BloomGridmetLeafGridmet/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runAnalysis(dataDirectory1, dataDirectory2, bandNum1, bandNum2, maskFile, resultDirectory)
dprint("-------------------------------")
dprint("Ending analysis 1")
dprint("-------------------------------")
In [91]:
dprint("-------------------------------")
dprint("Running analysis 2")
dprint("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomGridmet/"
bandNum1 = 3
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOST4Km/"
bandNum2 = 0
maskFile = "hdfs:///user/hadoop/usa_mask_gridmet.tif"
resultDirectory = "hdfs:///user/emma/svd/BloomGridmetSOST4Km/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runAnalysis(dataDirectory1, dataDirectory2, bandNum1, bandNum2, maskFile, resultDirectory)
dprint("-------------------------------")
dprint("Ending analysis 2")
dprint("-------------------------------")
In [ ]:
dprint("-------------------------------")
dprint("Running analysis 3")
dprint("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/LeafGridmet/"
bandNum1 = 3
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOST4Km/"
bandNum2 = 0
maskFile = "hdfs:///user/hadoop/usa_mask_gridmet.tif"
resultDirectory = "hdfs:///user/emma/svd/LeafGridmetSOST4Km/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runAnalysis(dataDirectory1, dataDirectory2, bandNum1, bandNum2, maskFile, resultDirectory)
dprint("-------------------------------")
dprint("Ending analysis 3")
dprint("-------------------------------")
In [35]:
dprint("-------------------------------")
dprint("Running analysis 4")
dprint("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinalLowPR/"
bandNum1 = 0
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOSTLowPR/"
bandNum2 = 0
maskFile = "hdfs:///user/hadoop/spring-index/BloomFinalLowPR/1989.tif"
resultDirectory = "hdfs:///user/emma/svd/BloomFinalLowPRSOSTLowPR/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runAnalysis(dataDirectory1, dataDirectory2, bandNum1, bandNum2, maskFile, resultDirectory)
dprint("-------------------------------")
dprint("Ending analysis 4")
dprint("-------------------------------")
In [36]:
dprint("-------------------------------")
dprint("Running analysis 5")
dprint("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/LeafFinalLowPR/"
bandNum1 = 0
dataDirectory2 = "hdfs:///user/hadoop/avhrr/SOSTLowPR/"
bandNum2 = 0
maskFile = "hdfs:///user/hadoop/spring-index/LeafFinalLowPR/1989.tif"
resultDirectory = "hdfs:///user/emma/svd/LeafFinalLowPRSOSTLowPR/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runAnalysis(dataDirectory1, dataDirectory2, bandNum1, bandNum2, maskFile, resultDirectory)
dprint("-------------------------------")
dprint("Ending analysis 5")
dprint("-------------------------------")
In [39]:
dprint("-------------------------------")
dprint("Running analysis 6")
dprint("-------------------------------")
dataDirectory1 = "hdfs:///user/hadoop/spring-index/BloomFinalLowPR/"
bandNum1 = 0
dataDirectory2 = "hdfs:///user/hadoop/spring-index/LeafFinalLowPR/"
bandNum2 = 0
maskFile = "hdfs:///user/hadoop/spring-index/BloomFinalLowPR/1989.tif"
resultDirectory = "hdfs:///user/emma/svd/BloomFinalLowPRLeafFinalLowPR/"
#Create Result dir
subprocess.run(['hadoop', 'dfs', '-mkdir', resultDirectory])
runAnalysis(dataDirectory1, dataDirectory2, bandNum1, bandNum2, maskFile, resultDirectory)
dprint("-------------------------------")
dprint("Ending analysis 6")
dprint("-------------------------------")
End of Notebook