References:
https://github.com/LucaCanali/sparkMeasure
sparkmeasure Python docs: docs/Python_shell_and_Jupyter
Luca.Canali@cern.ch, July 2018
Dependencies:
- This notebook assumes you have an active Spark sessions called "spark"
- That you have sparkmeasure jar in the driver classpath
- That you have installed the Python wrapper API package sparkmeasure
This is some example code to help you on that:
# Note install sparkmeasure.py Python wrapper package if not already done:
pip install sparkmeasure
export PYSPARK_DRIVER_PYTHON=jupyter-notebook
export PYSPARK_DRIVER_PYTHON_OPTS="--ip=`hostname` --no-browser"
# run PySpark
bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.11:0.14
In [1]:
# Load the Python API for sparkmeasure package
# and attach the sparkMeasure Listener for stagemetrics to the active Spark session
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
In [2]:
# Define cell and line magic to wrap the instrumentation
from IPython.core.magic import (register_line_magic, register_cell_magic, register_line_cell_magic)
@register_line_cell_magic
def sparkmeasure(line, cell=None):
"run and measure spark workload. Use: %sparkmeasure or %%sparkmeasure"
val = cell if cell is not None else line
stagemetrics.begin()
eval(val)
stagemetrics.end()
stagemetrics.print_report()
In [3]:
%%sparkmeasure
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
In [4]:
# Print additional metrics from accumulables
stagemetrics.print_accumulables()
In [5]:
# You can also explicitly Wrap your Spark workload into stagemetrics instrumentation
# as in this example
stagemetrics.begin()
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
stagemetrics.end()
# Print a summary report
stagemetrics.print_report()
In [6]:
# Another way to encapsulate code and instrumentation in a compact form
stagemetrics.runandmeasure(locals(), """
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
""")
Collecting Spark task metrics at the granularity of each task completion has additional overhead compare to collecting at the stage completion level, therefore this option should only be used if you need data with this finer granularity, for example because you want to study skew effects, otherwise consider using stagemetrics aggregation as preferred choice.
In [8]:
from sparkmeasure import TaskMetrics
taskmetrics = TaskMetrics(spark)
taskmetrics.begin()
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
taskmetrics.end()
taskmetrics.print_report()
In [ ]: