Working with NetCDF files - Calculating yearly averages for climate set datasets

First setup our configuration and Spark context

In [1]:
import sys
import StringIO
import zipfile
import re
import os
import sys
import numpy as np

spark_home = '/opt/spark-1.3.0-bin-hadoop2.4'
os.environ['SPARK_HOME'] = spark_home

sys.path.append(os.path.join(spark_home, 'python'))
sys.path.append(os.path.join(spark_home, 'bin'))
sys.path.append(os.path.join(spark_home, 'python/lib/'))

from pyspark import SparkContext, SparkFiles, SparkConf

datafile_path = '/media/bitbucket/'
parameter = 'pr'
timesteps = 12
partitions = 8
grid_chunk_size = 2000

spark_config = SparkConf();
spark_config.set('spark.driver.memory', '10g')
spark_config.set('spark.akka.frameSize', 32)
spark_config.set('spark.executor.memory', '4g')
spark_config.set('spark.driver.maxResultSize', '4g')
spark_config.set('spark.shuffle.memoryFraction', 0.6)
spark_config.set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
spark_config.set('spark.kryoserializer.buffer.max.mb', 1024)

sc = SparkContext('spark://ulex:7077', 'n_timesteps_mean', conf=spark_config)

Extract timestep chunks and grid chunks from NetCDF file

In [2]:
from netCDF4 import Dataset

data = Dataset(datafile_path)
pr = data.variables[parameter]

# Get the number of timesteps
num_timesteps = data.variables['time'].size

# Get number of locations per timestep
shape = pr[0].shape
num_grid_points = pr[0].size

# Break timesteps into n size chunks
timestep_chunks = []
for x in xrange(0, num_timesteps, timesteps):
    if x + timesteps < num_timesteps:
        timestep_chunks.append((x, x + timesteps))
        timestep_chunks.append((x, num_timesteps))

# Break locations into chunks
grid_chunks = []
for lat in xrange(0, shape[0], grid_chunk_size):
    for lon in xrange(0, shape[1], grid_chunk_size):
        grid_chunks.append((lat, lon))

Now parallelize the grid chunks across the cluster

In [3]:
grid_chunks = sc.parallelize(grid_chunks, partitions)

Define function to calculate the mean for a given grid chunk

In [4]:
def calculate_means(grid_chunk):

    data = Dataset(datafile_path)
    pr = data.variables[parameter]

    (lat, lon) = grid_chunk

    values = []
    for timestep_range in timestep_chunks:
        (start_timesteps, end_timesteps) = timestep_range

        mean = np.mean(pr[start_timesteps:end_timesteps,
                          lon:lon+grid_chunk_size], axis=0)

    return values

Calculate the yearly means for each chunk

In [5]:
means =

Finally collect the chunks and recreate the grid

In [6]:
means = means.collect()

timestep_means = [ for x in range(len(timestep_chunks))]

i = 0
for lat in xrange(0, shape[0], grid_chunk_size):
    for lon in xrange(0, shape[1], grid_chunk_size):
        for j in range(len(timestep_chunks)):
            chunk = means[i][j]
            timestep_means[j][lat:lat+chunk.shape[0], lon:lon+chunk.shape[1]] = chunk

        i += 1

for m in timestep_means:

[2.957010049916183e-05 2.9542837485981483e-05 2.951173276718085e-05 ...,
 1.878620605566539e-05 1.884244678270382e-05 1.8830543316047017e-05]
[2.8391329882045586e-05 2.8350747015792876e-05 2.825960594539841e-05 ...,
 1.850248736445792e-05 1.8515632594547544e-05 1.8503381094584864e-05]
[3.375462013840055e-05 3.367043245816603e-05 3.360415818557764e-05 ...,
 1.7519595227592315e-05 1.751458694343455e-05 1.7499485693406314e-05]
[2.4524049270742882e-05 2.4500026484020054e-05 2.444009199583282e-05 ...,
 1.7043377738445997e-05 1.706103648757562e-05 1.7049323408476386e-05]
[2.909944790493076e-05 2.9074484094356496e-05 2.903968803972627e-05 ...,
 2.3176815981666248e-05 2.3199606706233073e-05 2.3192110044571262e-05]

Shutdown context ( we can only have one running at once )

In [6]:

Using cluster wide accumulators - Calculate average elevation in North America using Shuttle Radar Topography Mission (SRTM) data

Setup our Spark context

In [3]:
spark_home = '/opt/spark-1.3.0-bin-hadoop2.4'
os.environ['SPARK_HOME'] = spark_home

sys.path.append(os.path.join(spark_home, 'python'))
sys.path.append(os.path.join(spark_home, 'bin'))
sys.path.append(os.path.join(spark_home, 'python/lib/'))

from pyspark import SparkContext

sc = SparkContext('spark://ulex:7077', 'srtm')

First define the functions that will run on each node in the cluster

In [4]:
srtm_dtype = np.dtype('>i2')
filename_regex = re.compile('([NSEW]\d+[NSEW]\d+).*')

# Function to array
def read_array(data):
    hgt_2darray = np.flipud(np.fromstring(data, dtype=srtm_dtype).reshape(1201, 1201))

    return hgt_2darray

# Function to process a HGT file
def process_file(file):
    (name, content) = file

    filename = os.path.basename(name)
    srtm_name = filename.split('.')[0]
    match = filename_regex.match(srtm_name)

    # Skip anything that doesn't match
    if not match:

    hgt_file = '%s.hgt' %

    stream = StringIO.StringIO(content)
        with zipfile.ZipFile(stream, 'r') as zipfd:
            hgt_data =
            data = read_array(hgt_data)
            samples = 0
            sum = 0
            for x in np.nditer(data):
                if x != -32768:
                    samples += 1
                    sum += x

            # Add the the local results to the global accumulators
    except zipfile.BadZipfile:
        # Skip anything thats not a zip

Now setup our global accumulators

In [5]:
num_samples_acc = sc.accumulator(0)
sum_acc = sc.accumulator(0)

Load the data files accross the cluster

In [6]:
data_files = '/media/bitbucket/srtm/version2_1/SRTM3/North_America'
data = sc.binaryFiles(data_files)

Process the files accross the cluster

In [7]:

Finally calculate the mean using the global accumulators

In [8]:
sum_acc.value / num_samples_acc.value


