In [1]:
import pandas as pd
import math
from collections import OrderedDict, namedtuple

The contents of this notebook are a version of what appears in http://c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark-config-cheatsheet/

While there are many scattered sources on the Internet for configuring an EMR cluster for use with Spark, I found the link above to be the most useful. Kudos to Anthony Shipman for writing the page (and providing the spreadsheet, Apache Spark Config Cheatsheet - xlsx, in the contents). Note that if you are trying to configure Spark in cluster mode on EMR then refer to the "cluster" tab in that spreadsheet.

Why this notebook then? It's just that I find this "notebook" version to be easier to use.


In [2]:
Resource = namedtuple('Resource', 'memory_per_node_GB cores_per_node')

# The first number is yarn.scheduler.maximum-allocation-mb 
#    from https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html#emr-hadoop-task-config-r3
# The second number is the cores per node that are available for Spark’s use. 
#    If using Yarn, this will be the number of cores per machine managed by Yarn Resource Manager.
#    from https://aws.amazon.com/ec2/instance-types/
basic_info = {'r5.2xlarge' : Resource(57.344, 8),\
              'r5.4xlarge' : Resource(122.880, 16),\
              'r5.8xlarge' : Resource(253.952, 32),\
              'r5.12xlarge' : Resource(385.024, 48),\
              'r3.8xlarge' : Resource(241.644, 64),\
              'r3.4xlarge' : Resource(116.736, 32),\
              'r3.2xlarge' : Resource(54.272, 16),\
              'm2.4xlarge' : Resource(61.440, 8), \
              'd2.2xlarge' : Resource(54.272, 16),\
              'c3.2xlarge' : Resource(11.520, 8),\
              'c3.xlarge' : Resource(5.632, 4),\
              'm3.xlarge' : Resource(11.520, 8),\
              'r4.4xlarge': Resource(116.736, 16),\
              'i3.2xlarge': Resource(0.85*61, 8)
             }
# FYI:
# The Amazon console for a cluster with r3.8xlarges shows 64 vCPUs listed in the hardware but 
# https://aws.amazon.com/ec2/instance-types/ shows 32 vCPUs for this instance type(WTF!) 
# Now using 64 instead of 32 allowed me to max out the CPU usage (as verified via Ganglia).
# Thus for vCPUs in the dictionary above I am using what I saw reported in the Amazon console.

In [3]:
# recommended parameter settings

#The percentage of memory in each executor that will be reserved for spark.yarn.executor.memoryOverhead.
memory_overhead_coefficient = 0.1 

#The upper bound for executor memory. 
# Each executor runs on its own JVM. Upwards of 64GB of memory and garbage collection issues can cause slowness
executor_memory_upper_bound_GB = 64

#The upper bound for number of cores per executor. 
# More than 5 cores per executor can degrade HDFS I/O throughput. 
# I believe this value can safely be increased if writing to a web-based “file system” such as S3, but significant increases to this limit are not recommended.
executor_core_upper_bound = 5

# Cores per machine to reserve for OS processes. 
#Should be zero if only a percentage of the machine’s cores were made available to Spark 
#(i.e. entered in the cores_per_node input above).
os_reserved_cores_per_node = 1

#The amount of RAM per machine to reserve for OS processes. 
#Should be zero if only a percentage of the machine’s RAM was made available to Spark 
#(i.e. entered in the memory_per_node_GB input above).
os_reserved_memory_per_node_GB = 1

# The level of parallelism per allocated core. This field is used to determine the spark.default.parallelism setting. 
#Generally recommended setting for this value is double the number of cores.
parallelism_per_core = 2

In [4]:
# INPUTS

instance_type_worker_node = 'r3.8xlarge'

# The number of worker machines in your cluster. This can be as low as one machine.
number_of_nodes = 8

#input the value in yarn.scheduler.maximum-allocation-mb divided by 1000
#http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html
#  The amount of RAM per node that is available for Spark’s use. 
#  If using Yarn, this will be the amount of RAM per machine managed by Yarn Resource Manager.
memory_per_node_GB = basic_info[instance_type_worker_node].memory_per_node_GB 

cores_per_node = basic_info[instance_type_worker_node].cores_per_node

In [5]:
# calculated values
available_memory_per_node_GB = memory_per_node_GB - os_reserved_memory_per_node_GB
available_memory_per_node_GB_flr = math.floor(available_memory_per_node_GB)
available_cores_per_node = cores_per_node - os_reserved_cores_per_node 
print('available_memory_per_node_GB = {0},\navailable_memory_per_node_GB_flr = {1},\navailable_cores_per_node = {2}'.format(available_memory_per_node_GB, available_memory_per_node_GB_flr, available_cores_per_node))


available_memory_per_node_GB = 240.644,
available_memory_per_node_GB_flr = 240,
available_cores_per_node = 63

In [6]:
look_up_df = pd.DataFrame.from_dict( {'executors_per_node' : range(1,193) } )

look_up_df['total_memory_per_executor_GB'] = look_up_df['executors_per_node']\
            .map(lambda z : math.floor(available_memory_per_node_GB_flr/z))

look_up_df['overhead_memory_per_executor_GB'] = look_up_df['total_memory_per_executor_GB']\
            .map(lambda z : math.ceil( z * memory_overhead_coefficient))

look_up_df['memory_per_executor_GB'] = look_up_df['total_memory_per_executor_GB'] - \
                                        look_up_df['overhead_memory_per_executor_GB']
look_up_df['cores_per_executor'] = look_up_df['executors_per_node']\
            .map(lambda z : math.floor(available_cores_per_node/z) )

look_up_df['unused_memory_per_node_GB'] = available_memory_per_node_GB - \
    (look_up_df['executors_per_node'] * look_up_df['total_memory_per_executor_GB'])

look_up_df['unused_cores_per_node'] = available_cores_per_node - \
    (look_up_df['executors_per_node'] * look_up_df['cores_per_executor'])

In [7]:
selected_executors_per_node = None
for candidate_executors_per_node in look_up_df['executors_per_node']:
    myrow = look_up_df[look_up_df.executors_per_node == candidate_executors_per_node ]
    
    warnings = 0

    if ( myrow['total_memory_per_executor_GB'].values[0] > executor_memory_upper_bound_GB ):
        warnings += 1
        print('Warning - For {0} executors per node the Total Memory Per Executor exceeds the Executor Memory Upper Bound'.format(candidate_executors_per_node) )
    if ( myrow['cores_per_executor'].values[0] > executor_core_upper_bound ):
        warnings += 1
        print('Warning - For {0} executors per node the Cores Per Executor exceeds the Executor Core Upper Bound'.format(candidate_executors_per_node) )
    if ( myrow['unused_memory_per_node_GB'].values[0] > 1 ):
        print('Note - For {0} executors per node the unused memory per node is {1}'.format(candidate_executors_per_node, myrow['unused_memory_per_node_GB'].values[0]) )
    if ( myrow['unused_cores_per_node'].values[0] > 1 ):
        print('Note - For {0} executors per node the unused cores per node is {1}'.format(candidate_executors_per_node, myrow['unused_cores_per_node'].values[0]) )
    if warnings == 0:
        print('**** We recommend using {0} executors per node ****'.format(candidate_executors_per_node) )
        print('****  This will result in {0} executors across the cluster'.format( (candidate_executors_per_node * number_of_nodes) ) )
        print('****  This will result in {0} cores used across the cluster'.format( ( number_of_nodes * candidate_executors_per_node * myrow['cores_per_executor'].values[0]) ) )
        selected_executors_per_node = candidate_executors_per_node
        break


Warning - For 1 executors per node the Total Memory Per Executor exceeds the Executor Memory Upper Bound
Warning - For 1 executors per node the Cores Per Executor exceeds the Executor Core Upper Bound
Warning - For 2 executors per node the Total Memory Per Executor exceeds the Executor Memory Upper Bound
Warning - For 2 executors per node the Cores Per Executor exceeds the Executor Core Upper Bound
Warning - For 3 executors per node the Total Memory Per Executor exceeds the Executor Memory Upper Bound
Warning - For 3 executors per node the Cores Per Executor exceeds the Executor Core Upper Bound
Warning - For 4 executors per node the Cores Per Executor exceeds the Executor Core Upper Bound
Note - For 4 executors per node the unused cores per node is 3
Warning - For 5 executors per node the Cores Per Executor exceeds the Executor Core Upper Bound
Note - For 5 executors per node the unused cores per node is 3
Warning - For 6 executors per node the Cores Per Executor exceeds the Executor Core Upper Bound
Note - For 6 executors per node the unused cores per node is 3
Warning - For 7 executors per node the Cores Per Executor exceeds the Executor Core Upper Bound
Note - For 7 executors per node the unused memory per node is 2.6440000000000055
Warning - For 8 executors per node the Cores Per Executor exceeds the Executor Core Upper Bound
Note - For 8 executors per node the unused cores per node is 7
Warning - For 9 executors per node the Cores Per Executor exceeds the Executor Core Upper Bound
Note - For 9 executors per node the unused memory per node is 6.6440000000000055
Warning - For 10 executors per node the Cores Per Executor exceeds the Executor Core Upper Bound
Note - For 10 executors per node the unused cores per node is 3
Note - For 11 executors per node the unused memory per node is 9.644000000000005
Note - For 11 executors per node the unused cores per node is 8
**** We recommend using 11 executors per node ****
****  This will result in 88 executors across the cluster
****  This will result in 440 cores used across the cluster

In [8]:
# Select the number of executors per node you would like to have
# A good rule of thumb for selecting the optimal number of Executors Per Node 
# would be to select the setting that minimizes Unused Memory Per Node 
# and Unused Cores Per Node while keeping Total Memory Per Executor 
# below the Executor Memory Upper Bound and Core Per Executor 
# below the Executor Core Upper Bound.

# I will use the recommendation spit out by the cell before this one
# you can provide a different number if you prefer
spark_executor_cores_per_node = selected_executors_per_node 

myrow = look_up_df[look_up_df.executors_per_node == spark_executor_cores_per_node ]

if ( myrow['total_memory_per_executor_GB'].values[0] > executor_memory_upper_bound_GB ):
    print('Warning - Total Memory Per Executor exceeds the Executor Memory Upper Bound' )
if ( myrow['cores_per_executor'].values[0] > executor_core_upper_bound ):
    print('Warning - Cores Per Executor exceeds the Executor Core Upper Bound' )
if ( myrow['unused_memory_per_node_GB'].values[0] > 1 ):
    print('Note - For {0} executors per node the unused memory per node is {1} GB'.format(selected_executors_per_node, myrow['unused_memory_per_node_GB'].values[0]) )
if ( myrow['unused_cores_per_node'].values[0] > 1 ):
    print('Note - For {0} executors per node the unused cores per node is {1}'.format(selected_executors_per_node, myrow['unused_cores_per_node'].values[0]) ) 
print('****  This will result in {0} executors across the workers in the cluster'.format( ( (selected_executors_per_node * number_of_nodes) - 1 ) ) )    
print('****  This will result in {0} tasks running across the workers in the cluster at one time'.format( ( (number_of_nodes * selected_executors_per_node - 1 ) * myrow['cores_per_executor'].values[0] ) ) )


Note - For 11 executors per node the unused memory per node is 9.644000000000005 GB
Note - For 11 executors per node the unused cores per node is 8
****  This will result in 87 executors across the workers in the cluster
****  This will result in 435 tasks running across the workers in the cluster at one time

In [9]:
# spark configs

# this should be equal (or close enough) to the number of executors shown in the 'Executors' tab of the Spark Console
# look at section below the summary 
# This is the number of total executors in your cluster. 
#We subtract one to account for the driver. The driver will consume as many resources as we are allocating to an individual executor on one, and only one, of our nodes.
spark_executor_instances = ((number_of_nodes * selected_executors_per_node) -1)

spark_yarn_executor_memory_overhead =  1024 * myrow['overhead_memory_per_executor_GB'].values[0]

# Assuming tasks are available, Spark will run one task per core that is available to the executor
# verify this by looking at 'Executors' tab of the Spark Console (look below Summary section at Cores/ActiveTasks)
spark_cores_per_executor = myrow['cores_per_executor'].values[0]

#default number of partitions for Spark RDDs, Dataframes, etc.
#('spark.scheduler.mode', 'FAIR')
spark_default_parallelism = spark_executor_instances * spark_executor_cores_per_node * parallelism_per_core

spark_configs = OrderedDict([ ('spark.executor.instances', spark_executor_instances), \
    ('spark.yarn.executor.memoryOverhead',spark_yarn_executor_memory_overhead),\
    ('spark.executor.memory',myrow['memory_per_executor_GB'].values[0]), \
    ('spark.yarn.driver.memoryOverhead', spark_yarn_executor_memory_overhead), \
    ('spark.driver.memory', myrow['memory_per_executor_GB'].values[0]), \
    ('spark.executor.cores',spark_cores_per_executor), \
    ('spark.driver.cores',spark_cores_per_executor), \
    ('spark.default.parallelism', spark_default_parallelism )
])
spark_configs


Out[9]:
OrderedDict([('spark.executor.instances', 87),
             ('spark.yarn.executor.memoryOverhead', 3072),
             ('spark.executor.memory', 18),
             ('spark.yarn.driver.memoryOverhead', 3072),
             ('spark.driver.memory', 18),
             ('spark.executor.cores', 5),
             ('spark.driver.cores', 5),
             ('spark.default.parallelism', 1914)])

In [10]:
look_up_df[look_up_df.executors_per_node == spark_executor_cores_per_node ]


Out[10]:
executors_per_node total_memory_per_executor_GB overhead_memory_per_executor_GB memory_per_executor_GB cores_per_executor unused_memory_per_node_GB unused_cores_per_node
10 11 21 3 18 5 9.644 8

In [ ]: