In [1]:
import sys
import StringIO
import zipfile
import re
import os
import sys
import numpy as np
spark_home = '/opt/spark-1.3.0-bin-hadoop2.4'
os.environ['SPARK_HOME'] = spark_home
sys.path.append(os.path.join(spark_home, 'python'))
sys.path.append(os.path.join(spark_home, 'bin'))
sys.path.append(os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))
from pyspark import SparkContext, SparkFiles, SparkConf
datafile_path = '/media/bitbucket/pr_amon_BCSD_rcp26_r1i1p1_CONUS_bcc-csm1-1_202101-202512.nc'
parameter = 'pr'
timesteps = 12
partitions = 8
grid_chunk_size = 2000
spark_config = SparkConf();
spark_config.set('spark.driver.memory', '10g')
spark_config.set('spark.akka.frameSize', 32)
spark_config.set('spark.executor.memory', '4g')
spark_config.set('spark.driver.maxResultSize', '4g')
spark_config.set('spark.shuffle.memoryFraction', 0.6)
spark_config.set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
spark_config.set('spark.kryoserializer.buffer.max.mb', 1024)
sc = SparkContext('spark://ulex:7077', 'n_timesteps_mean', conf=spark_config)
In [2]:
from netCDF4 import Dataset
data = Dataset(datafile_path)
pr = data.variables[parameter]
# Get the number of timesteps
num_timesteps = data.variables['time'].size
# Get number of locations per timestep
shape = pr[0].shape
num_grid_points = pr[0].size
# Break timesteps into n size chunks
timestep_chunks = []
for x in xrange(0, num_timesteps, timesteps):
if x + timesteps < num_timesteps:
timestep_chunks.append((x, x + timesteps))
else:
timestep_chunks.append((x, num_timesteps))
# Break locations into chunks
grid_chunks = []
for lat in xrange(0, shape[0], grid_chunk_size):
for lon in xrange(0, shape[1], grid_chunk_size):
grid_chunks.append((lat, lon))
In [3]:
grid_chunks = sc.parallelize(grid_chunks, partitions)
In [4]:
def calculate_means(grid_chunk):
data = Dataset(datafile_path)
pr = data.variables[parameter]
(lat, lon) = grid_chunk
values = []
for timestep_range in timestep_chunks:
(start_timesteps, end_timesteps) = timestep_range
mean = np.mean(pr[start_timesteps:end_timesteps,
lat:lat+grid_chunk_size,
lon:lon+grid_chunk_size], axis=0)
values.append(mean)
return values
In [5]:
means = grid_chunks.map(calculate_means)
In [6]:
means = means.collect()
timestep_means = [np.ma.empty(shape) for x in range(len(timestep_chunks))]
i = 0
for lat in xrange(0, shape[0], grid_chunk_size):
for lon in xrange(0, shape[1], grid_chunk_size):
for j in range(len(timestep_chunks)):
chunk = means[i][j]
timestep_means[j][lat:lat+chunk.shape[0], lon:lon+chunk.shape[1]] = chunk
i += 1
for m in timestep_means:
print(m[~m.mask])
In [6]:
sc.stop()
In [3]:
import sys
import StringIO
import zipfile
import re
import os
import sys
import numpy as np
spark_home = '/opt/spark-1.3.0-bin-hadoop2.4'
os.environ['SPARK_HOME'] = spark_home
sys.path.append(os.path.join(spark_home, 'python'))
sys.path.append(os.path.join(spark_home, 'bin'))
sys.path.append(os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))
from pyspark import SparkContext
sc = SparkContext('spark://ulex:7077', 'srtm')
In [4]:
srtm_dtype = np.dtype('>i2')
filename_regex = re.compile('([NSEW]\d+[NSEW]\d+).*')
# Function to array
def read_array(data):
hgt_2darray = np.flipud(np.fromstring(data, dtype=srtm_dtype).reshape(1201, 1201))
return hgt_2darray
# Function to process a HGT file
def process_file(file):
(name, content) = file
filename = os.path.basename(name)
srtm_name = filename.split('.')[0]
match = filename_regex.match(srtm_name)
# Skip anything that doesn't match
if not match:
return
hgt_file = '%s.hgt' % match.group(1)
stream = StringIO.StringIO(content)
try:
with zipfile.ZipFile(stream, 'r') as zipfd:
hgt_data = zipfd.read(hgt_file)
data = read_array(hgt_data)
samples = 0
sum = 0
for x in np.nditer(data):
if x != -32768:
samples += 1
sum += x
# Add the the local results to the global accumulators
num_samples_acc.add(samples)
sum_acc.add(sum)
except zipfile.BadZipfile:
# Skip anything thats not a zip
pass
In [5]:
num_samples_acc = sc.accumulator(0)
sum_acc = sc.accumulator(0)
In [6]:
data_files = '/media/bitbucket/srtm/version2_1/SRTM3/North_America'
data = sc.binaryFiles(data_files)
In [7]:
data.foreach(process_file)
In [8]:
sum_acc.value / num_samples_acc.value
Out[8]:
TODO Add lessons learnt etc. ....