Distributed Numpy Parsing

Joeri R. Hermans
Departement of Data Science & Knowledge Engineering
Maastricht University, The Netherlands

This notebook will show you how to parse a collection of Numpy files straight from HDFS into a Spark Dataframe.

Cluster Configuration

In the following sections, we set up the cluster properties.


In [1]:
%matplotlib inline

import numpy as np

import os

from pyspark import SparkContext
from pyspark import SparkConf

from pyspark.sql.types import *

from pyspark.sql import Row

from pyspark.storagelevel import StorageLevel

# Use the DataBricks AVRO reader.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-avro_2.11:3.2.0 pyspark-shell'

In [2]:
# Modify these variables according to your needs.
application_name = "Distributed Numpy Parsing"
using_spark_2 = False
local = False

if local:
    # Tell master to use local resources.
    master = "local[*]"
    num_processes = 3
    num_executors = 1
else:
    # Tell master to use YARN.
    master = "yarn-client"
    num_executors = 20
    num_processes = 1

# This variable is derived from the number of cores and executors,
# and will be used to assign the number of model trainers.
num_workers = num_executors * num_processes

print("Number of desired executors: " + `num_executors`)
print("Number of desired processes / executor: " + `num_processes`)
print("Total number of workers: " + `num_workers`)


Number of desired executors: 20
Number of desired processes / executor: 1
Total number of workers: 20

In [3]:
# Do not change anything here.
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_processes`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.executor.memory", "5g")
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.max", "2000")
conf.set("spark.executor.heartbeatInterval", "6000s")
conf.set("spark.network.timeout", "10000000s")
conf.set("spark.shuffle.spill", "true")
conf.set("spark.driver.memory", "10g")
conf.set("spark.driver.maxResultSize", "10g")

# Check if the user is running Spark 2.0 +
if using_spark_2:
    sc = SparkSession.builder.config(conf=conf) \
                     .appName(application_name) \
                     .getOrCreate()
else:
    # Create the Spark context.
    sc = SparkContext(conf=conf)
    # Add the missing imports
    from pyspark import SQLContext
    sqlContext = SQLContext(sc)

# Check if we are using Spark 2.0
if using_spark_2:
    reader = sc
else:
    reader = sqlContext

Obtaining the required file-paths

Basically what we are going to do now, is obtain a lists of file paths (*.npy) which we will map with a custom lambda function to read all the data into a dataframe.


In [4]:
# Define the command that needs to be executed, this will list all the numpy files in the specified directory.
cmd = "hdfs dfs -ls /user/jhermans/data/cms/RelValWjet_Pt_3000_3500_13_GEN-SIM-RECO_evt3150/*.npy | awk '{print $NF}'"
# Fetch the output of the command, and construct a list.
output = os.popen(cmd).read()
file_paths = output.split("\n")

Creating a Spark Dataframe from the specified list

Before we convert to a list to a Spark Dataframe, we first need to specify the schema. We do this by converting every element in the list to a Spark row. Afterwards, Spark will be able to automatically infer the schema of the dataframe.


In [5]:
rows = []

for path in file_paths:
    row = Row(**{'path': path})
    rows.append(row)

Now we are able to create the Spark DataFrame. Note, for Spark 2.0 use spark. instead of sqlContext..


In [6]:
df = sqlContext.createDataFrame(rows)
# Repartition the dataset for increased parallelism.
df = df.repartition(20)

print("Number of paths to be parsed: " + str(df.count()))
df.printSchema()


Number of paths to be parsed: 393
root
 |-- path: string (nullable = true)


In [7]:
# Example content of the dataframe.
df.take(1)


Out[7]:
[Row(path=u'/user/jhermans/data/cms/RelValWjet_Pt_3000_3500_13_GEN-SIM-RECO_evt3150/trackparams220.npy')]

Parsing your Numpy files

This is a fairly straightforward operation where we basically map all the file paths using a custom lambda function to read the numpy files from HDFS.


In [8]:
# Development cell, this will be executed in the lambdas.

import pydoop.hdfs as hdfs

with hdfs.open(file_paths[0]) as f:
    data = np.load(f)

# Obtain the fields (columns) of your numpy data.
fields = []
for k in data[0].dtype.fields:
    fields.append(k)
    
print("Number of columns: " + str(len(data.dtype.fields)))

print("First five columns: ")
i = 0
for k in data.dtype.fields:
    print(k)
    i += 1
    if i == 5:
        break


Number of columns: 190
First five columns: 
sis_25_x
normalizedChi2
sis_25_z
sis_25_y
sis_48_x

Now we have a working prototype, let's construct a Spark mapper which will fetch the data in a distributed manner from HDFS. Note that if you would like to adjust the data in any way after reading, you can do so by modifying the lambda function, or executing another map after the data has been read.


In [13]:
def parse(iterator):
    rows = []
    
    # MODIFY TO YOUR NEEDS IF NECESSARY
    for row in iterator:
        path = row['path']
        # Load the file from HFDS.
        with hdfs.open(path) as f:
            data = np.load(f)
        # Add all rows in current path.
        for r in data:
            d = {}
            for f in fields:
                d[f] = r[f].item()
            rows.append(Row(**d))
        
    return iter(rows)

# Apply the lambda function.
dataset = df.rdd.mapPartitions(parse).toDF()
dataset.printSchema()


root
 |-- TrackId: long (nullable = true)
 |-- charge: long (nullable = true)
 |-- chi2: double (nullable = true)
 |-- d0: double (nullable = true)
 |-- dsz: double (nullable = true)
 |-- dxy: double (nullable = true)
 |-- dz: double (nullable = true)
 |-- eta: double (nullable = true)
 |-- evt: long (nullable = true)
 |-- lambda: double (nullable = true)
 |-- lumi: long (nullable = true)
 |-- ndof: double (nullable = true)
 |-- normalizedChi2: double (nullable = true)
 |-- p: double (nullable = true)
 |-- phi: double (nullable = true)
 |-- pix_0_x: double (nullable = true)
 |-- pix_0_y: double (nullable = true)
 |-- pix_0_z: double (nullable = true)
 |-- pix_1_x: double (nullable = true)
 |-- pix_1_y: double (nullable = true)
 |-- pix_1_z: double (nullable = true)
 |-- pix_2_x: double (nullable = true)
 |-- pix_2_y: double (nullable = true)
 |-- pix_2_z: double (nullable = true)
 |-- pix_3_x: double (nullable = true)
 |-- pix_3_y: double (nullable = true)
 |-- pix_3_z: double (nullable = true)
 |-- pix_4_x: double (nullable = true)
 |-- pix_4_y: double (nullable = true)
 |-- pix_4_z: double (nullable = true)
 |-- pt: double (nullable = true)
 |-- px: double (nullable = true)
 |-- py: double (nullable = true)
 |-- pz: double (nullable = true)
 |-- qoverp: double (nullable = true)
 |-- run: long (nullable = true)
 |-- sis_0_x: double (nullable = true)
 |-- sis_0_y: double (nullable = true)
 |-- sis_0_z: double (nullable = true)
 |-- sis_10_x: double (nullable = true)
 |-- sis_10_y: double (nullable = true)
 |-- sis_10_z: double (nullable = true)
 |-- sis_11_x: double (nullable = true)
 |-- sis_11_y: double (nullable = true)
 |-- sis_11_z: double (nullable = true)
 |-- sis_12_x: double (nullable = true)
 |-- sis_12_y: double (nullable = true)
 |-- sis_12_z: double (nullable = true)
 |-- sis_13_x: double (nullable = true)
 |-- sis_13_y: double (nullable = true)
 |-- sis_13_z: double (nullable = true)
 |-- sis_14_x: double (nullable = true)
 |-- sis_14_y: double (nullable = true)
 |-- sis_14_z: double (nullable = true)
 |-- sis_15_x: double (nullable = true)
 |-- sis_15_y: double (nullable = true)
 |-- sis_15_z: double (nullable = true)
 |-- sis_16_x: double (nullable = true)
 |-- sis_16_y: double (nullable = true)
 |-- sis_16_z: double (nullable = true)
 |-- sis_17_x: double (nullable = true)
 |-- sis_17_y: double (nullable = true)
 |-- sis_17_z: double (nullable = true)
 |-- sis_18_x: double (nullable = true)
 |-- sis_18_y: double (nullable = true)
 |-- sis_18_z: double (nullable = true)
 |-- sis_19_x: double (nullable = true)
 |-- sis_19_y: double (nullable = true)
 |-- sis_19_z: double (nullable = true)
 |-- sis_1_x: double (nullable = true)
 |-- sis_1_y: double (nullable = true)
 |-- sis_1_z: double (nullable = true)
 |-- sis_20_x: double (nullable = true)
 |-- sis_20_y: double (nullable = true)
 |-- sis_20_z: double (nullable = true)
 |-- sis_21_x: double (nullable = true)
 |-- sis_21_y: double (nullable = true)
 |-- sis_21_z: double (nullable = true)
 |-- sis_22_x: double (nullable = true)
 |-- sis_22_y: double (nullable = true)
 |-- sis_22_z: double (nullable = true)
 |-- sis_23_x: double (nullable = true)
 |-- sis_23_y: double (nullable = true)
 |-- sis_23_z: double (nullable = true)
 |-- sis_24_x: double (nullable = true)
 |-- sis_24_y: double (nullable = true)
 |-- sis_24_z: double (nullable = true)
 |-- sis_25_x: double (nullable = true)
 |-- sis_25_y: double (nullable = true)
 |-- sis_25_z: double (nullable = true)
 |-- sis_26_x: double (nullable = true)
 |-- sis_26_y: double (nullable = true)
 |-- sis_26_z: double (nullable = true)
 |-- sis_27_x: double (nullable = true)
 |-- sis_27_y: double (nullable = true)
 |-- sis_27_z: double (nullable = true)
 |-- sis_28_x: double (nullable = true)
 |-- sis_28_y: double (nullable = true)
 |-- sis_28_z: double (nullable = true)
 |-- sis_29_x: double (nullable = true)
 |-- sis_29_y: double (nullable = true)
 |-- sis_29_z: double (nullable = true)
 |-- sis_2_x: double (nullable = true)
 |-- sis_2_y: double (nullable = true)
 |-- sis_2_z: double (nullable = true)
 |-- sis_30_x: double (nullable = true)
 |-- sis_30_y: double (nullable = true)
 |-- sis_30_z: double (nullable = true)
 |-- sis_31_x: double (nullable = true)
 |-- sis_31_y: double (nullable = true)
 |-- sis_31_z: double (nullable = true)
 |-- sis_32_x: double (nullable = true)
 |-- sis_32_y: double (nullable = true)
 |-- sis_32_z: double (nullable = true)
 |-- sis_33_x: double (nullable = true)
 |-- sis_33_y: double (nullable = true)
 |-- sis_33_z: double (nullable = true)
 |-- sis_34_x: double (nullable = true)
 |-- sis_34_y: double (nullable = true)
 |-- sis_34_z: double (nullable = true)
 |-- sis_35_x: double (nullable = true)
 |-- sis_35_y: double (nullable = true)
 |-- sis_35_z: double (nullable = true)
 |-- sis_36_x: double (nullable = true)
 |-- sis_36_y: double (nullable = true)
 |-- sis_36_z: double (nullable = true)
 |-- sis_37_x: double (nullable = true)
 |-- sis_37_y: double (nullable = true)
 |-- sis_37_z: double (nullable = true)
 |-- sis_38_x: double (nullable = true)
 |-- sis_38_y: double (nullable = true)
 |-- sis_38_z: double (nullable = true)
 |-- sis_39_x: double (nullable = true)
 |-- sis_39_y: double (nullable = true)
 |-- sis_39_z: double (nullable = true)
 |-- sis_3_x: double (nullable = true)
 |-- sis_3_y: double (nullable = true)
 |-- sis_3_z: double (nullable = true)
 |-- sis_40_x: double (nullable = true)
 |-- sis_40_y: double (nullable = true)
 |-- sis_40_z: double (nullable = true)
 |-- sis_41_x: double (nullable = true)
 |-- sis_41_y: double (nullable = true)
 |-- sis_41_z: double (nullable = true)
 |-- sis_42_x: double (nullable = true)
 |-- sis_42_y: double (nullable = true)
 |-- sis_42_z: double (nullable = true)
 |-- sis_43_x: double (nullable = true)
 |-- sis_43_y: double (nullable = true)
 |-- sis_43_z: double (nullable = true)
 |-- sis_44_x: double (nullable = true)
 |-- sis_44_y: double (nullable = true)
 |-- sis_44_z: double (nullable = true)
 |-- sis_45_x: double (nullable = true)
 |-- sis_45_y: double (nullable = true)
 |-- sis_45_z: double (nullable = true)
 |-- sis_46_x: double (nullable = true)
 |-- sis_46_y: double (nullable = true)
 |-- sis_46_z: double (nullable = true)
 |-- sis_47_x: double (nullable = true)
 |-- sis_47_y: double (nullable = true)
 |-- sis_47_z: double (nullable = true)
 |-- sis_48_x: double (nullable = true)
 |-- sis_48_y: double (nullable = true)
 |-- sis_48_z: double (nullable = true)
 |-- sis_49_x: double (nullable = true)
 |-- sis_49_y: double (nullable = true)
 |-- sis_49_z: double (nullable = true)
 |-- sis_4_x: double (nullable = true)
 |-- sis_4_y: double (nullable = true)
 |-- sis_4_z: double (nullable = true)
 |-- sis_5_x: double (nullable = true)
 |-- sis_5_y: double (nullable = true)
 |-- sis_5_z: double (nullable = true)
 |-- sis_6_x: double (nullable = true)
 |-- sis_6_y: double (nullable = true)
 |-- sis_6_z: double (nullable = true)
 |-- sis_7_x: double (nullable = true)
 |-- sis_7_y: double (nullable = true)
 |-- sis_7_z: double (nullable = true)
 |-- sis_8_x: double (nullable = true)
 |-- sis_8_y: double (nullable = true)
 |-- sis_8_z: double (nullable = true)
 |-- sis_9_x: double (nullable = true)
 |-- sis_9_y: double (nullable = true)
 |-- sis_9_z: double (nullable = true)
 |-- theta: double (nullable = true)
 |-- vx: double (nullable = true)
 |-- vy: double (nullable = true)
 |-- vz: double (nullable = true)