Joeri R. Hermans
Departement of Data Science & Knowledge Engineering
Maastricht University, The Netherlands
This notebook will show you how to parse a collection of Numpy files straight from HDFS into a Spark Dataframe.
In the following sections, we set up the cluster properties.
In [1]:
%matplotlib inline
import numpy as np
import os
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.storagelevel import StorageLevel
# Use the DataBricks AVRO reader.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-avro_2.11:3.2.0 pyspark-shell'
In [2]:
# Modify these variables according to your needs.
application_name = "Distributed Numpy Parsing"
using_spark_2 = False
local = False
if local:
# Tell master to use local resources.
master = "local[*]"
num_processes = 3
num_executors = 1
else:
# Tell master to use YARN.
master = "yarn-client"
num_executors = 20
num_processes = 1
# This variable is derived from the number of cores and executors,
# and will be used to assign the number of model trainers.
num_workers = num_executors * num_processes
print("Number of desired executors: " + `num_executors`)
print("Number of desired processes / executor: " + `num_processes`)
print("Total number of workers: " + `num_workers`)
In [3]:
# Do not change anything here.
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_processes`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.executor.memory", "5g")
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.max", "2000")
conf.set("spark.executor.heartbeatInterval", "6000s")
conf.set("spark.network.timeout", "10000000s")
conf.set("spark.shuffle.spill", "true")
conf.set("spark.driver.memory", "10g")
conf.set("spark.driver.maxResultSize", "10g")
# Check if the user is running Spark 2.0 +
if using_spark_2:
sc = SparkSession.builder.config(conf=conf) \
.appName(application_name) \
.getOrCreate()
else:
# Create the Spark context.
sc = SparkContext(conf=conf)
# Add the missing imports
from pyspark import SQLContext
sqlContext = SQLContext(sc)
# Check if we are using Spark 2.0
if using_spark_2:
reader = sc
else:
reader = sqlContext
In [4]:
# Define the command that needs to be executed, this will list all the numpy files in the specified directory.
cmd = "hdfs dfs -ls /user/jhermans/data/cms/RelValWjet_Pt_3000_3500_13_GEN-SIM-RECO_evt3150/*.npy | awk '{print $NF}'"
# Fetch the output of the command, and construct a list.
output = os.popen(cmd).read()
file_paths = output.split("\n")
In [5]:
rows = []
for path in file_paths:
row = Row(**{'path': path})
rows.append(row)
Now we are able to create the Spark DataFrame. Note, for Spark 2.0 use spark.
instead of sqlContext.
.
In [6]:
df = sqlContext.createDataFrame(rows)
# Repartition the dataset for increased parallelism.
df = df.repartition(20)
print("Number of paths to be parsed: " + str(df.count()))
df.printSchema()
In [7]:
# Example content of the dataframe.
df.take(1)
Out[7]:
In [8]:
# Development cell, this will be executed in the lambdas.
import pydoop.hdfs as hdfs
with hdfs.open(file_paths[0]) as f:
data = np.load(f)
# Obtain the fields (columns) of your numpy data.
fields = []
for k in data[0].dtype.fields:
fields.append(k)
print("Number of columns: " + str(len(data.dtype.fields)))
print("First five columns: ")
i = 0
for k in data.dtype.fields:
print(k)
i += 1
if i == 5:
break
Now we have a working prototype, let's construct a Spark mapper which will fetch the data in a distributed manner from HDFS. Note that if you would like to adjust the data in any way after reading, you can do so by modifying the lambda function, or executing another map after the data has been read.
In [13]:
def parse(iterator):
rows = []
# MODIFY TO YOUR NEEDS IF NECESSARY
for row in iterator:
path = row['path']
# Load the file from HFDS.
with hdfs.open(path) as f:
data = np.load(f)
# Add all rows in current path.
for r in data:
d = {}
for f in fields:
d[f] = r[f].item()
rows.append(Row(**d))
return iter(rows)
# Apply the lambda function.
dataset = df.rdd.mapPartitions(parse).toDF()
dataset.printSchema()