IPython magic functions for Pyspark

Examples of shortcuts for executing SQL in Spark


In [1]:
#
# IPython magic functions to use with Pyspark and Spark SQL
# The following code is intended as examples of shorcuts to simplify the use of SQL in pyspark
# The defined functions are:
#
# %sql <statement>          - return a Spark DataFrame for lazy evaluation of the SQL
# %sql_show <statement>     - run the SQL statement and show max_show_lines (50) lines
# %sql_display <statement>  - run the SQL statement and display the results using a HTML table
#                           - this is implemented passing via Pandas and displays up to max_show_lines (50)
# %sql_explain <statement>  - display the execution plan of the SQL statement
#
# Use: %<magic> for line magic or %%<magic> for cell magic.
#
# Author: Luca.Canali@cern.ch
# September 2016
#

from IPython.core.magic import register_line_cell_magic

# Configuration parameters
max_show_lines = 50         # Limit on the number of lines to show with %sql_show and %sql_display
detailed_explain = True     # Set to False if you want to see only the physical plan when running explain


@register_line_cell_magic
def sql(line, cell=None):
    "Return a Spark DataFrame for lazy evaluation of the sql. Use: %sql or %%sql"
    val = cell if cell is not None else line 
    return spark.sql(val)

@register_line_cell_magic
def sql_show(line, cell=None):
    "Execute sql and show the first max_show_lines lines. Use: %sql_show or %%sql_show"
    val = cell if cell is not None else line 
    return spark.sql(val).show(max_show_lines) 

@register_line_cell_magic
def sql_display(line, cell=None):
    """Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
    Use: %sql_display or %%sql_display"""
    val = cell if cell is not None else line 
    return spark.sql(val).limit(max_show_lines).toPandas() 

@register_line_cell_magic
def sql_explain(line, cell=None):
    "Display the execution plan of the sql. Use: %sql_explain or %%sql_explain"
    val = cell if cell is not None else line 
    return spark.sql(val).explain(detailed_explain)

Define test tables


In [2]:
# Define test data and register it as tables 
# This is a classic example of employee and department relational tables
# Test data will be used in the examples later in this notebook

from pyspark.sql import Row

Employee = Row("id", "name", "email", "manager_id", "dep_id")
df_emp = sqlContext.createDataFrame([
        Employee(1234, 'John', 'john@mail.com', 1236, 10),
        Employee(1235, 'Mike', 'mike@mail.com', 1237, 10),
        Employee(1236, 'Pat', 'pat@mail.com', 1237, 20),
        Employee(1237, 'Claire', 'claire@mail.com', None, 20),
        Employee(1238, 'Jim', 'jim@mail.com', 1236, 30)
        ])

df_emp.registerTempTable("employee")

Department = Row("dep_id", "dep_name")
df_dep = sqlContext.createDataFrame([
        Department(10, 'Engineering'),
        Department(20, 'Head Quarter'),
        Department(30, 'Human resources')
        ])

df_dep.registerTempTable("department")

Examples of how to use %SQL magic functions with Spark

Use %sql to run SQL and return a DataFrame, lazy evaluation


In [3]:
# Example of line magic, a shortcut to run SQL in pyspark
# Pyspark has lazy evaluation, so the query is not executed in this exmaple

df = %sql select * from employee
df


Out[3]:
DataFrame[id: bigint, name: string, email: string, manager_id: bigint, dep_id: bigint]

Use %sql_show to run SQL and show the top lines of the result set


In [4]:
# Example of line magic, the SQL is executed and the result is displayed
# the maximum number of displayed lines is configurable (max_show_lines)

%sql_show select * from employee


+----+------+---------------+----------+------+
|  id|  name|          email|manager_id|dep_id|
+----+------+---------------+----------+------+
|1234|  John|  john@mail.com|      1236|    10|
|1235|  Mike|  mike@mail.com|      1237|    10|
|1236|   Pat|   pat@mail.com|      1237|    20|
|1237|Claire|claire@mail.com|      null|    20|
|1238|   Jim|   jim@mail.com|      1236|    30|
+----+------+---------------+----------+------+

Example of cell magic to run SQL spanning multiple lines


In [8]:
%%sql_show 
select emp.id, emp.name, emp.email, emp.manager_id, dep.dep_name 
from employee emp, department dep 
where emp.dep_id=dep.dep_id


+----+------+---------------+----------+---------------+
|  id|  name|          email|manager_id|       dep_name|
+----+------+---------------+----------+---------------+
|1234|  John|  john@mail.com|      1236|    Engineering|
|1235|  Mike|  mike@mail.com|      1237|    Engineering|
|1238|   Jim|   jim@mail.com|      1236|Human resources|
|1236|   Pat|   pat@mail.com|      1237|   Head Quarter|
|1237|Claire|claire@mail.com|      null|   Head Quarter|
+----+------+---------------+----------+---------------+

Use %sql_display to run SQL and display the results as a HTML table

Example of cell magic that runs SQL and then transforms it to Pandas. This will display the output as a HTML table in Jupyter notebooks


In [10]:
%%sql_display 
select emp.id, emp.name, emp.email, emp2.name as manager_name, dep.dep_name 
from employee emp 
     left outer join employee emp2 on emp2.id=emp.manager_id
     join department dep on emp.dep_id=dep.dep_id


Out[10]:
id name email manager_name dep_name
0 1234 John john@mail.com Pat Engineering
1 1235 Mike mike@mail.com Claire Engineering
2 1238 Jim jim@mail.com Pat Human resources
3 1237 Claire claire@mail.com None Head Quarter
4 1236 Pat pat@mail.com Claire Head Quarter

Use %sql_explain to display the execution plan


In [11]:
%%sql_explain
select emp.id, emp.name, emp.email, emp2.name as manager_name, dep.dep_name 
from employee emp 
     left outer join employee emp2 on emp2.id=emp.manager_id
     join department dep on emp.dep_id=dep.dep_id


== Parsed Logical Plan ==
'Project ['emp.id, 'emp.name, 'emp.email, 'emp2.name AS manager_name#68, 'dep.dep_name]
+- 'Join Inner, ('emp.dep_id = 'dep.dep_id)
   :- 'Join LeftOuter, ('emp2.id = 'emp.manager_id)
   :  :- 'UnresolvedRelation `employee`, emp
   :  +- 'UnresolvedRelation `employee`, emp2
   +- 'UnresolvedRelation `department`, dep

== Analyzed Logical Plan ==
id: bigint, name: string, email: string, manager_name: string, dep_name: string
Project [id#0L, name#1, email#2, name#80 AS manager_name#68, dep_name#13]
+- Join Inner, (dep_id#4L = dep_id#12L)
   :- Join LeftOuter, (id#79L = manager_id#3L)
   :  :- SubqueryAlias emp
   :  :  +- SubqueryAlias employee
   :  :     +- LogicalRDD [id#0L, name#1, email#2, manager_id#3L, dep_id#4L]
   :  +- SubqueryAlias emp2
   :     +- SubqueryAlias employee
   :        +- LogicalRDD [id#79L, name#80, email#81, manager_id#82L, dep_id#83L]
   +- SubqueryAlias dep
      +- SubqueryAlias department
         +- LogicalRDD [dep_id#12L, dep_name#13]

== Optimized Logical Plan ==
Project [id#0L, name#1, email#2, name#80 AS manager_name#68, dep_name#13]
+- Join Inner, (dep_id#4L = dep_id#12L)
   :- Project [id#0L, name#1, email#2, dep_id#4L, name#80]
   :  +- Join LeftOuter, (id#79L = manager_id#3L)
   :     :- Filter isnotnull(dep_id#4L)
   :     :  +- LogicalRDD [id#0L, name#1, email#2, manager_id#3L, dep_id#4L]
   :     +- Project [id#79L, name#80]
   :        +- LogicalRDD [id#79L, name#80, email#81, manager_id#82L, dep_id#83L]
   +- Filter isnotnull(dep_id#12L)
      +- LogicalRDD [dep_id#12L, dep_name#13]

== Physical Plan ==
*Project [id#0L, name#1, email#2, name#80 AS manager_name#68, dep_name#13]
+- *SortMergeJoin [dep_id#4L], [dep_id#12L], Inner
   :- *Sort [dep_id#4L ASC], false, 0
   :  +- Exchange hashpartitioning(dep_id#4L, 200)
   :     +- *Project [id#0L, name#1, email#2, dep_id#4L, name#80]
   :        +- SortMergeJoin [manager_id#3L], [id#79L], LeftOuter
   :           :- *Sort [manager_id#3L ASC], false, 0
   :           :  +- Exchange hashpartitioning(manager_id#3L, 200)
   :           :     +- *Filter isnotnull(dep_id#4L)
   :           :        +- Scan ExistingRDD[id#0L,name#1,email#2,manager_id#3L,dep_id#4L]
   :           +- *Sort [id#79L ASC], false, 0
   :              +- Exchange hashpartitioning(id#79L, 200)
   :                 +- *Project [id#79L, name#80]
   :                    +- Scan ExistingRDD[id#79L,name#80,email#81,manager_id#82L,dep_id#83L]
   +- *Sort [dep_id#12L ASC], false, 0
      +- Exchange hashpartitioning(dep_id#12L, 200)
         +- *Filter isnotnull(dep_id#12L)
            +- Scan ExistingRDD[dep_id#12L,dep_name#13]

In [ ]: