Example notebook to showcase sparkMeasure APIs for Python

References: https://github.com/LucaCanali/sparkMeasure
sparkmeasure Python docs: docs/Python_shell_and_Jupyter

Luca.Canali@cern.ch, July 2018

Dependencies:

- This notebook assumes you have an active Spark sessions called "spark"
- That you have sparkmeasure jar in the driver classpath 
- That you have installed the Python wrapper API package sparkmeasure

This is some example code to help you on that:

# Note install sparkmeasure.py Python wrapper package if not already done:
pip install sparkmeasure

export PYSPARK_DRIVER_PYTHON=jupyter-notebook
export PYSPARK_DRIVER_PYTHON_OPTS="--ip=`hostname` --no-browser"
# run PySpark
bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.11:0.14

In [1]:
# Load the Python API for sparkmeasure package
# and attach the sparkMeasure Listener for stagemetrics to the active Spark session

from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)

In [2]:
# Define cell and line magic to wrap the instrumentation
from IPython.core.magic import (register_line_magic, register_cell_magic, register_line_cell_magic)

@register_line_cell_magic
def sparkmeasure(line, cell=None):
    "run and measure spark workload. Use: %sparkmeasure or %%sparkmeasure"
    val = cell if cell is not None else line
    stagemetrics.begin()
    eval(val)
    stagemetrics.end()
    stagemetrics.print_report()

In [3]:
%%sparkmeasure
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()


+----------+
|  count(1)|
+----------+
|1000000000|
+----------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
sum(numTasks) => 17
elapsedTime => 12053 (12 s)
sum(stageDuration) => 11950 (12 s)
sum(executorRunTime) => 92685 (1.5 min)
sum(executorCpuTime) => 91134 (1.5 min)
sum(executorDeserializeTime) => 225 (0.2 s)
sum(executorDeserializeCpuTime) => 108 (0.1 s)
sum(resultSerializationTime) => 11 (11 ms)
sum(jvmGCTime) => 96 (96 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 6 (6 ms)
max(resultSize) => 18315 (17.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 2000
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 472 (472 Bytes)
sum(shuffleTotalBlocksFetched) => 8
sum(shuffleLocalBlocksFetched) => 8
sum(shuffleRemoteBlocksFetched) => 0
sum(shuffleBytesWritten) => 472 (472 Bytes)
sum(shuffleRecordsWritten) => 8

In [4]:
# Print additional metrics from accumulables
stagemetrics.print_accumulables()


Aggregated Spark accumulables of type internal.metric. Sum of values grouped by metric name
Name => sum(value) [group by name]

executorCpuTime => 91135 (1.5 min)
executorDeserializeCpuTime => 109 (0.1 s)
executorDeserializeTime => 225 (0.2 s)
executorRunTime => 92685 (1.5 min)
input.recordsRead => 2000
jvmGCTime => 96 (96 ms)
resultSerializationTime => 11 (11 ms)
resultSize => 30749 (30.0 KB)
shuffle.read.fetchWaitTime => 0 (0 ms)
shuffle.read.localBlocksFetched => 8
shuffle.read.localBytesRead => 472 (472 Bytes)
shuffle.read.recordsRead => 8
shuffle.read.remoteBlocksFetched => 0
shuffle.read.remoteBytesRead => 0 (0 Bytes)
shuffle.read.remoteBytesReadToDisk => 0 (0 Bytes)
shuffle.write.bytesWritten => 472 (472 Bytes)
shuffle.write.recordsWritten => 8
shuffle.write.writeTime => 6 (6 ms)

SQL Metrics and other non-internal metrics. Values grouped per accumulatorId and metric name.
Accid, Name => max(value) [group by accId, name]

    3, data size total => 119 (119 Bytes)
    4, duration total => 6 (6 ms)
    5, number of output rows => 1
    8, aggregate time total => 6 (6 ms)
   10, duration total => 92078 (1.5 min)
   11, number of output rows => 8
   14, aggregate time total => 92018 (1.5 min)
   16, number of output rows => 1000000000
   17, number of output rows => 1000000
   18, duration total => 92198 (1.5 min)
   19, number of output rows => 1000
   24, duration total => 8 (8 ms)
   25, number of output rows => 1000

In [5]:
# You can also explicitly Wrap your Spark workload into stagemetrics instrumentation 
# as in this example
stagemetrics.begin()

spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()

stagemetrics.end()
# Print a summary report
stagemetrics.print_report()


+----------+
|  count(1)|
+----------+
|1000000000|
+----------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
sum(numTasks) => 17
elapsedTime => 13962 (14 s)
sum(stageDuration) => 13928 (14 s)
sum(executorRunTime) => 110345 (1.8 min)
sum(executorCpuTime) => 109816 (1.8 min)
sum(executorDeserializeTime) => 26 (26 ms)
sum(executorDeserializeCpuTime) => 19 (19 ms)
sum(resultSerializationTime) => 0 (0 ms)
sum(jvmGCTime) => 24 (24 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 1 (1 ms)
max(resultSize) => 18272 (17.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 2000
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 472 (472 Bytes)
sum(shuffleTotalBlocksFetched) => 8
sum(shuffleLocalBlocksFetched) => 8
sum(shuffleRemoteBlocksFetched) => 0
sum(shuffleBytesWritten) => 472 (472 Bytes)
sum(shuffleRecordsWritten) => 8

In [6]:
# Another way to encapsulate code and instrumentation in a compact form

stagemetrics.runandmeasure(locals(), """
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
""")


+----------+
|  count(1)|
+----------+
|1000000000|
+----------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
sum(numTasks) => 17
elapsedTime => 13432 (13 s)
sum(stageDuration) => 13394 (13 s)
sum(executorRunTime) => 106155 (1.8 min)
sum(executorCpuTime) => 105053 (1.8 min)
sum(executorDeserializeTime) => 75 (75 ms)
sum(executorDeserializeCpuTime) => 23 (23 ms)
sum(resultSerializationTime) => 0 (0 ms)
sum(jvmGCTime) => 0 (0 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 1 (1 ms)
max(resultSize) => 17928 (17.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 2000
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 472 (472 Bytes)
sum(shuffleTotalBlocksFetched) => 8
sum(shuffleLocalBlocksFetched) => 8
sum(shuffleRemoteBlocksFetched) => 0
sum(shuffleBytesWritten) => 472 (472 Bytes)
sum(shuffleRecordsWritten) => 8

Example of collecting using Task Metrics

Collecting Spark task metrics at the granularity of each task completion has additional overhead compare to collecting at the stage completion level, therefore this option should only be used if you need data with this finer granularity, for example because you want to study skew effects, otherwise consider using stagemetrics aggregation as preferred choice.


In [8]:
from sparkmeasure import TaskMetrics
taskmetrics = TaskMetrics(spark)

taskmetrics.begin()
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
taskmetrics.end()
taskmetrics.print_report()


+----------+
|  count(1)|
+----------+
|1000000000|
+----------+


Scheduling mode = FIFO
Spark Contex default degree of parallelism = 8
Aggregated Spark task metrics:
numtasks => 17
elapsedTime => 13406 (13 s)
sum(duration) => 105262 (1.8 min)
sum(schedulerDelay) => 67
sum(executorRunTime) => 105164 (1.8 min)
sum(executorCpuTime) => 104817 (1.7 min)
sum(executorDeserializeTime) => 31 (31 ms)
sum(executorDeserializeCpuTime) => 10 (10 ms)
sum(resultSerializationTime) => 0 (0 ms)
sum(jvmGCTime) => 0 (0 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 0 (0 ms)
sum(gettingResultTime) => 0 (0 ms)
max(resultSize) => 2241 (2.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 2000
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 472 (472 Bytes)
sum(shuffleTotalBlocksFetched) => 8
sum(shuffleLocalBlocksFetched) => 8
sum(shuffleRemoteBlocksFetched) => 0
sum(shuffleBytesWritten) => 472 (472 Bytes)
sum(shuffleRecordsWritten) => 8

In [ ]: