With this example the user can load GeoTiffs from HDFS and then explore all the features of Python packages such as rasterio.
In [1]:
#Add all dependencies to PYTHON_PATH
import sys
sys.path.append("/usr/lib/spark/python")
sys.path.append("/usr/lib/spark/python/lib/py4j-0.10.4-src.zip")
sys.path.append("/usr/lib/python3/dist-packages")
sys.path.append("/data/local/jupyterhub/modules/python")
#Define environment variables
import os
os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "ipython"
#Load PySpark to connect to a Spark cluster
from pyspark import SparkConf, SparkContext
from osgeo import gdal
import numpy as np
import matplotlib.pyplot as plt
from pylab import *
#To read GeoTiffs as a ByteArray
from io import BytesIO
from rasterio.io import MemoryFile
from numpy import exp, log
from numpy.random import standard_normal
from scipy.linalg import norm, qr, svd
from productsvd import qrproductsvd
from sklearn.utils.extmath import randomized_svd
In [2]:
appName = "compare_SVD_RSVD"
#masterURL="spark://pheno0.phenovari-utwente.surf-hosted.nl:7077"
masterURL="spark://emma0.emma.nlesc.nl:7077"
#A context needs to be created if it does not already exist
try:
sc.stop()
except NameError:
print("A new Spark Context will be created.")
sc = SparkContext(conf = SparkConf().setAppName(appName).setMaster(masterURL))
In [3]:
debugMode = True
In [4]:
def diff(first, second):
second = set(second)
return [item for item in first if item not in second]
In [5]:
def dprint(msg):
if (debugMode):
print(str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + " | " + msg)
In [6]:
def progressBar(message, value, endvalue, bar_length = 20):
if (debugMode):
percent = float(value) / endvalue
arrow = '-' * int(round(percent * bar_length)-1) + '>'
spaces = ' ' * (bar_length - len(arrow))
sys.stdout.write("\r"
+ str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
+ " | "
+ message
+ ": [{0}] {1}%".format(arrow + spaces, int(round(percent * 100)))
)
if value == endvalue:
sys.stdout.write("\n")
sys.stdout.flush()
In [7]:
resultDirectory = "hdfs:///user/emma/svd/spark/BloomGridmetLeafGridmet3/"
EigenVal_path = "S.csv"
svd_eigenvalues = sc.textFile(resultDirectory + EigenVal_path)
EigenVector_path = "U.csv"
svd_U_vector = sc.textFile(resultDirectory + EigenVector_path)
EigenVector_path = "V.csv"
svd_V_vector = sc.textFile(resultDirectory + EigenVector_path)
In [8]:
print(str(svd_eigenvalues.map(lambda line: (line.split(','))).collect()))
In [9]:
resultDirectory = "hdfs:///user/emma/svd/BloomGridmetLeafGridmet/"
EigenVal_path = "s.csv"
rsvd_eigenvalues = sc.textFile(resultDirectory + EigenVal_path)
EigenVector_path = "U.csv"
rsvd_U_vector = sc.textFile(resultDirectory + EigenVector_path)
EigenVector_path = "V.csv"
rsvd_V_vector = sc.textFile(resultDirectory + EigenVector_path)
In [ ]:
print(str(rsvd_eigenvalues.map(lambda line: (line.split(','))).collect()))
In [ ]:
svd_U = svd_U_vector.map(lambda line: (line.split(','))).collect()
svd_U_np = np.asarray(svd_U).astype(float)
rsvd_U = rsvd_U_vector.map(lambda line: (line.split(','))).collect()
rsvd_U_np = np.asarray(rsvd_U).astype(float)
rsvd_U_np_T = np.reshape(rsvd_U_np,(37,483850)).T
In [ ]:
for pc in range(0,3):
print(norm(svd_U_np[:,pc] - rsvd_U_np_T[:,pc]))
In [ ]:
svd_V = svd_V_vector.map(lambda line: (line.split(','))).collect()
svd_V_np = np.asarray(svd_U).astype(float)
rsvd_V = rsvd_V_vector.map(lambda line: (line.split(','))).collect()
rsvd_V_np = np.asarray(rsvd_V).astype(float)
rsvd_V_np_T = np.reshape(rsvd_V_np,(37,483850)).T
In [ ]:
for pc in range(0,3):
print(norm(svd_V_np[:,pc] - rsvd_V_np_T[:,pc]))
In [ ]: