Some boilerplate

Importing library Setting up spark


In [1]:
import spylon
import spylon.spark as sp
c = sp.SparkConfiguration()
c._spark_home = "/path/to/spark-1.6.2-bin-hadoop2.6"
c.master = ["local[2]"]

Tab Completion

Spark configuration properties can be tab completed with the spylon Spark Configuration object

c.conf.spark.sql.<TAB>

will bring up a list of valid properties that can be used.


In [2]:
# Tab completion for spark properties
c.conf.spark.sql.shuffle.partitions = 50
c.conf.spark.sql.shuffle.partitions


Out[2]:
spark.sql.shuffle.partitions <SET>: 50 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

In [3]:
(sc, sqlContext) = c.sql_context("MyApplicationName")

Progress Bars!


In [9]:
import time

In [10]:
def delayed(seconds):
    def f(x):
        time.sleep(seconds)
        return x
    return f

In [11]:
rdd = sc.parallelize(list(range(10)) + list(range(5)), 5).map(delayed(1))
reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
reduced.map(delayed(1)).collect()


Out[11]:
[(0, 2),
 (5, 1),
 (1, 2),
 (6, 1),
 (2, 2),
 (7, 1),
 (8, 1),
 (3, 2),
 (9, 1),
 (4, 2)]

Turning on progressbars is just calling a method to start up a progressbar handler.


In [12]:
sp.start_spark_progress_bar_thread(sc, sleep_time=0.1)

In [13]:
rdd = sc.parallelize(list(range(10)) + list(range(5)), 5).map(delayed(1))
reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
reduced.map(delayed(1)).collect()


[Stage 2:================>    (4 + 1 / 5 Dur: 08s]
[Stage 3:================>    (4 + 1 / 5 Dur: 05s]
Out[13]:
[(0, 2),
 (5, 1),
 (1, 2),
 (6, 1),
 (2, 2),
 (7, 1),
 (8, 1),
 (3, 2),
 (9, 1),
 (4, 2)]