#Getting started: Create a SQL Context from the Spark Context, sc, which is predefined in every notebook

In [20]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

SQL Context queries Dataframes, not RDDs.

A data file on world banks will downloaded from GitHub after removing any previous data that may exist


In [2]:
!rm world_bank* -f
!wget https://raw.githubusercontent.com/bradenrc/sparksql_pot/master/world_bank.json.gz


--2016-03-30 16:20:21--  https://raw.githubusercontent.com/bradenrc/sparksql_pot/master/world_bank.json.gz
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 23.235.47.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|23.235.47.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 446287 (436K) [application/octet-stream]
Saving to: 'world_bank.json.gz'

100%[======================================>] 446,287     --.-K/s   in 0.04s   

2016-03-30 16:20:22 (11.7 MB/s) - 'world_bank.json.gz' saved [446287/446287]

A Dataframe will be created using the sqlContext to read the file. Many other types are supported including text and Parquet


In [3]:
example1_df = sqlContext.read.json("world_bank.json.gz")

Spark SQL has the ability to infer the schema of JSON data and understand the structure of the data


In [4]:
print example1_df.printSchema()


root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- approvalfy: string (nullable = true)
 |-- board_approval_month: string (nullable = true)
 |-- boardapprovaldate: string (nullable = true)
 |-- borrower: string (nullable = true)
 |-- closingdate: string (nullable = true)
 |-- country_namecode: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- countryname: string (nullable = true)
 |-- countryshortname: string (nullable = true)
 |-- docty: string (nullable = true)
 |-- envassesmentcategorycode: string (nullable = true)
 |-- grantamt: long (nullable = true)
 |-- ibrdcommamt: long (nullable = true)
 |-- id: string (nullable = true)
 |-- idacommamt: long (nullable = true)
 |-- impagency: string (nullable = true)
 |-- lendinginstr: string (nullable = true)
 |-- lendinginstrtype: string (nullable = true)
 |-- lendprojectcost: long (nullable = true)
 |-- majorsector_percent: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Percent: long (nullable = true)
 |-- mjsector_namecode: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- mjtheme: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- mjtheme_namecode: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- mjthemecode: string (nullable = true)
 |-- prodline: string (nullable = true)
 |-- prodlinetext: string (nullable = true)
 |-- productlinetype: string (nullable = true)
 |-- project_abstract: struct (nullable = true)
 |    |-- cdata: string (nullable = true)
 |-- project_name: string (nullable = true)
 |-- projectdocs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- DocDate: string (nullable = true)
 |    |    |-- DocType: string (nullable = true)
 |    |    |-- DocTypeDesc: string (nullable = true)
 |    |    |-- DocURL: string (nullable = true)
 |    |    |-- EntityID: string (nullable = true)
 |-- projectfinancialtype: string (nullable = true)
 |-- projectstatusdisplay: string (nullable = true)
 |-- regionname: string (nullable = true)
 |-- sector: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Name: string (nullable = true)
 |-- sector1: struct (nullable = true)
 |    |-- Name: string (nullable = true)
 |    |-- Percent: long (nullable = true)
 |-- sector2: struct (nullable = true)
 |    |-- Name: string (nullable = true)
 |    |-- Percent: long (nullable = true)
 |-- sector3: struct (nullable = true)
 |    |-- Name: string (nullable = true)
 |    |-- Percent: long (nullable = true)
 |-- sector4: struct (nullable = true)
 |    |-- Name: string (nullable = true)
 |    |-- Percent: long (nullable = true)
 |-- sector_namecode: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- sectorcode: string (nullable = true)
 |-- source: string (nullable = true)
 |-- status: string (nullable = true)
 |-- supplementprojectflg: string (nullable = true)
 |-- theme1: struct (nullable = true)
 |    |-- Name: string (nullable = true)
 |    |-- Percent: long (nullable = true)
 |-- theme_namecode: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- themecode: string (nullable = true)
 |-- totalamt: long (nullable = true)
 |-- totalcommamt: long (nullable = true)
 |-- url: string (nullable = true)

None

Let's take a look at the first two rows of data


In [5]:
for row in example1_df.take(2):
    print row
    print "*" * 20


Row(_id=Row($oid=u'52b213b38594d8a2be17c780'), approvalfy=u'1999', board_approval_month=u'November', boardapprovaldate=u'2013-11-12T00:00:00Z', borrower=u'FEDERAL DEMOCRATIC REPUBLIC OF ETHIOPIA', closingdate=u'2018-07-07T00:00:00Z', country_namecode=u'Federal Democratic Republic of Ethiopia!$!ET', countrycode=u'ET', countryname=u'Federal Democratic Republic of Ethiopia', countryshortname=u'Ethiopia', docty=u'Project Information Document,Indigenous Peoples Plan,Project Information Document', envassesmentcategorycode=u'C', grantamt=0, ibrdcommamt=0, id=u'P129828', idacommamt=130000000, impagency=u'MINISTRY OF EDUCATION', lendinginstr=u'Investment Project Financing', lendinginstrtype=u'IN', lendprojectcost=550000000, majorsector_percent=[Row(Name=u'Education', Percent=46), Row(Name=u'Education', Percent=26), Row(Name=u'Public Administration, Law, and Justice', Percent=16), Row(Name=u'Education', Percent=12)], mjsector_namecode=[Row(code=u'EX', name=u'Education'), Row(code=u'EX', name=u'Education'), Row(code=u'BX', name=u'Public Administration, Law, and Justice'), Row(code=u'EX', name=u'Education')], mjtheme=[u'Human development'], mjtheme_namecode=[Row(code=u'8', name=u'Human development'), Row(code=u'11', name=u'')], mjthemecode=u'8,11', prodline=u'PE', prodlinetext=u'IBRD/IDA', productlinetype=u'L', project_abstract=Row(cdata=u'The development objective of the Second Phase of General Education Quality Improvement Project for Ethiopia is to improve learning conditions in primary and secondary schools and strengthen institutions at different levels of educational administration. The project has six components. The first component is curriculum, textbooks, assessment, examinations, and inspection. This component will support improvement of learning conditions in grades KG-12 by providing increased access to teaching and learning materials and through improvements to the curriculum by assessing the strengths and weaknesses of the current curriculum. This component has following four sub-components: (i) curriculum reform and implementation; (ii) teaching and learning materials; (iii) assessment and examinations; and (iv) inspection. The second component is teacher development program (TDP). This component will support improvements in learning conditions in both primary and secondary schools by advancing the quality of teaching in general education through: (a) enhancing the training of pre-service teachers in teacher education institutions; and (b) improving the quality of in-service teacher training. This component has following three sub-components: (i) pre-service teacher training; (ii) in-service teacher training; and (iii) licensing and relicensing of teachers and school leaders. The third component is school improvement plan. This component will support the strengthening of school planning in order to improve learning outcomes, and to partly fund the school improvement plans through school grants. It has following two sub-components: (i) school improvement plan; and (ii) school grants. The fourth component is management and capacity building, including education management information systems (EMIS). This component will support management and capacity building aspect of the project. This component has following three sub-components: (i) capacity building for education planning and management; (ii) capacity building for school planning and management; and (iii) EMIS. The fifth component is improving the quality of learning and teaching in secondary schools and universities through the use of information and communications technology (ICT). It has following five sub-components: (i) national policy and institution for ICT in general education; (ii) national ICT infrastructure improvement plan for general education; (iii) develop an integrated monitoring, evaluation, and learning system specifically for the ICT component; (iv) teacher professional development in the use of ICT; and (v) provision of limited number of e-Braille display readers with the possibility to scale up to all secondary education schools based on the successful implementation and usage of the readers. The sixth component is program coordination, monitoring and evaluation, and communication. It will support institutional strengthening by developing capacities in all aspects of program coordination, monitoring and evaluation; a new sub-component on communications will support information sharing for better management and accountability. It has following three sub-components: (i) program coordination; (ii) monitoring and evaluation (M and E); and (iii) communication.'), project_name=u'Ethiopia General Education Quality Improvement Project II', projectdocs=[Row(DocDate=u'28-AUG-2013', DocType=u'PID', DocTypeDesc=u'Project Information Document (PID),  Vol.', DocURL=u'http://www-wds.worldbank.org/servlet/WDSServlet?pcont=details&eid=090224b081e545fb_1_0', EntityID=u'090224b081e545fb_1_0'), Row(DocDate=u'01-JUL-2013', DocType=u'IP', DocTypeDesc=u'Indigenous Peoples Plan (IP),  Vol.1 of 1', DocURL=u'http://www-wds.worldbank.org/servlet/WDSServlet?pcont=details&eid=000442464_20130920111729', EntityID=u'000442464_20130920111729'), Row(DocDate=u'22-NOV-2012', DocType=u'PID', DocTypeDesc=u'Project Information Document (PID),  Vol.', DocURL=u'http://www-wds.worldbank.org/servlet/WDSServlet?pcont=details&eid=090224b0817b19e2_1_0', EntityID=u'090224b0817b19e2_1_0')], projectfinancialtype=u'IDA', projectstatusdisplay=u'Active', regionname=u'Africa', sector=[Row(Name=u'Primary education'), Row(Name=u'Secondary education'), Row(Name=u'Public administration- Other social services'), Row(Name=u'Tertiary education')], sector1=Row(Name=u'Primary education', Percent=46), sector2=Row(Name=u'Secondary education', Percent=26), sector3=Row(Name=u'Public administration- Other social services', Percent=16), sector4=Row(Name=u'Tertiary education', Percent=12), sector_namecode=[Row(code=u'EP', name=u'Primary education'), Row(code=u'ES', name=u'Secondary education'), Row(code=u'BS', name=u'Public administration- Other social services'), Row(code=u'ET', name=u'Tertiary education')], sectorcode=u'ET,BS,ES,EP', source=u'IBRD', status=u'Active', supplementprojectflg=u'N', theme1=Row(Name=u'Education for all', Percent=100), theme_namecode=[Row(code=u'65', name=u'Education for all')], themecode=u'65', totalamt=130000000, totalcommamt=130000000, url=u'http://www.worldbank.org/projects/P129828/ethiopia-general-education-quality-improvement-project-ii?lang=en')
********************
Row(_id=Row($oid=u'52b213b38594d8a2be17c781'), approvalfy=u'2015', board_approval_month=u'November', boardapprovaldate=u'2013-11-04T00:00:00Z', borrower=u'GOVERNMENT OF TUNISIA', closingdate=None, country_namecode=u'Republic of Tunisia!$!TN', countrycode=u'TN', countryname=u'Republic of Tunisia', countryshortname=u'Tunisia', docty=u'Project Information Document,Integrated Safeguards Data Sheet,Integrated Safeguards Data Sheet,Project Information Document,Integrated Safeguards Data Sheet,Project Information Document', envassesmentcategorycode=u'C', grantamt=4700000, ibrdcommamt=0, id=u'P144674', idacommamt=0, impagency=u'MINISTRY OF FINANCE', lendinginstr=u'Specific Investment Loan', lendinginstrtype=u'IN', lendprojectcost=5700000, majorsector_percent=[Row(Name=u'Public Administration, Law, and Justice', Percent=70), Row(Name=u'Public Administration, Law, and Justice', Percent=30)], mjsector_namecode=[Row(code=u'BX', name=u'Public Administration, Law, and Justice'), Row(code=u'BX', name=u'Public Administration, Law, and Justice')], mjtheme=[u'Economic management', u'Social protection and risk management'], mjtheme_namecode=[Row(code=u'1', name=u'Economic management'), Row(code=u'6', name=u'Social protection and risk management')], mjthemecode=u'1,6', prodline=u'RE', prodlinetext=u'Recipient Executed Activities', productlinetype=u'L', project_abstract=None, project_name=u'TN: DTF Social Protection Reforms Support', projectdocs=[Row(DocDate=u'29-MAR-2013', DocType=u'PID', DocTypeDesc=u'Project Information Document (PID),  Vol.1 of 1', DocURL=u'http://www-wds.worldbank.org/servlet/WDSServlet?pcont=details&eid=000333037_20131024115616', EntityID=u'000333037_20131024115616'), Row(DocDate=u'29-MAR-2013', DocType=u'ISDS', DocTypeDesc=u'Integrated Safeguards Data Sheet (ISDS),  Vol.1 of 1', DocURL=u'http://www-wds.worldbank.org/servlet/WDSServlet?pcont=details&eid=000356161_20131024151611', EntityID=u'000356161_20131024151611'), Row(DocDate=u'29-MAR-2013', DocType=u'ISDS', DocTypeDesc=u'Integrated Safeguards Data Sheet (ISDS),  Vol.1 of 1', DocURL=u'http://www-wds.worldbank.org/servlet/WDSServlet?pcont=details&eid=000442464_20131031112136', EntityID=u'000442464_20131031112136'), Row(DocDate=u'29-MAR-2013', DocType=u'PID', DocTypeDesc=u'Project Information Document (PID),  Vol.1 of 1', DocURL=u'http://www-wds.worldbank.org/servlet/WDSServlet?pcont=details&eid=000333037_20131031105716', EntityID=u'000333037_20131031105716'), Row(DocDate=u'16-JAN-2013', DocType=u'ISDS', DocTypeDesc=u'Integrated Safeguards Data Sheet (ISDS),  Vol.1 of 1', DocURL=u'http://www-wds.worldbank.org/servlet/WDSServlet?pcont=details&eid=000356161_20130305113209', EntityID=u'000356161_20130305113209'), Row(DocDate=u'16-JAN-2013', DocType=u'PID', DocTypeDesc=u'Project Information Document (PID),  Vol.1 of 1', DocURL=u'http://www-wds.worldbank.org/servlet/WDSServlet?pcont=details&eid=000356161_20130305113716', EntityID=u'000356161_20130305113716')], projectfinancialtype=u'OTHER', projectstatusdisplay=u'Active', regionname=u'Middle East and North Africa', sector=[Row(Name=u'Public administration- Other social services'), Row(Name=u'General public administration sector')], sector1=Row(Name=u'Public administration- Other social services', Percent=70), sector2=Row(Name=u'General public administration sector', Percent=30), sector3=None, sector4=None, sector_namecode=[Row(code=u'BS', name=u'Public administration- Other social services'), Row(code=u'BZ', name=u'General public administration sector')], sectorcode=u'BZ,BS', source=u'IBRD', status=u'Active', supplementprojectflg=u'N', theme1=Row(Name=u'Other economic management', Percent=30), theme_namecode=[Row(code=u'24', name=u'Other economic management'), Row(code=u'54', name=u'Social safety nets')], themecode=u'54,24', totalamt=0, totalcommamt=4700000, url=u'http://www.worldbank.org/projects/P144674?lang=en')
********************

Now let's register a table which is a pointer to the Dataframe and allows data access via Spark SQL


In [6]:
#Simply use the Dataframe Object to create the table:
example1_df.registerTempTable("world_bank")

In [7]:
#now that the table is registered we can execute sql commands 
#NOTE that the returned object is another Dataframe:

temp_df =  sqlContext.sql("select * from world_bank limit 2")

print type(temp_df)
print "*" * 20
print temp_df


<class 'pyspark.sql.dataframe.DataFrame'>
********************
DataFrame[_id: struct<$oid:string>, approvalfy: string, board_approval_month: string, boardapprovaldate: string, borrower: string, closingdate: string, country_namecode: string, countrycode: string, countryname: string, countryshortname: string, docty: string, envassesmentcategorycode: string, grantamt: bigint, ibrdcommamt: bigint, id: string, idacommamt: bigint, impagency: string, lendinginstr: string, lendinginstrtype: string, lendprojectcost: bigint, majorsector_percent: array<struct<Name:string,Percent:bigint>>, mjsector_namecode: array<struct<code:string,name:string>>, mjtheme: array<string>, mjtheme_namecode: array<struct<code:string,name:string>>, mjthemecode: string, prodline: string, prodlinetext: string, productlinetype: string, project_abstract: struct<cdata:string>, project_name: string, projectdocs: array<struct<DocDate:string,DocType:string,DocTypeDesc:string,DocURL:string,EntityID:string>>, projectfinancialtype: string, projectstatusdisplay: string, regionname: string, sector: array<struct<Name:string>>, sector1: struct<Name:string,Percent:bigint>, sector2: struct<Name:string,Percent:bigint>, sector3: struct<Name:string,Percent:bigint>, sector4: struct<Name:string,Percent:bigint>, sector_namecode: array<struct<code:string,name:string>>, sectorcode: string, source: string, status: string, supplementprojectflg: string, theme1: struct<Name:string,Percent:bigint>, theme_namecode: array<struct<code:string,name:string>>, themecode: string, totalamt: bigint, totalcommamt: bigint, url: string]

In [8]:
#one nice feature of the notebooks and python is that we can show it in a table via Pandas
sqlContext.sql("select id, borrower from world_bank limit 2").toPandas()


Out[8]:
id borrower
0 P129828 FEDERAL DEMOCRATIC REPUBLIC OF ETHIOPIA
1 P144674 GOVERNMENT OF TUNISIA

In [9]:
#Here is a simple group by example:

query = """
select
    regionname ,
    count(*) as project_count
from world_bank
group by regionname 
order by count(*) desc
"""

sqlContext.sql(query).toPandas()


Out[9]:
regionname project_count
0 Africa 152
1 East Asia and Pacific 100
2 Europe and Central Asia 74
3 South Asia 65
4 Middle East and North Africa 54
5 Latin America and Caribbean 53
6 Other 2

In [10]:
#subselect works as well:

query = """

select * from
    (select
        regionname ,
        count(*) as project_count
    from world_bank
    group by regionname 
    order by count(*) desc) table_alias
limit 2
"""

sqlContext.sql(query).toPandas()


Out[10]:
regionname project_count
0 Africa 152
1 East Asia and Pacific 100

Simple Example of Adding a Schema (headers) to an RDD and using it as a dataframe

In the example below a simple RDD is created with Random Data in two columns and an ID column.


In [11]:
import random

#first let's create a simple RDD

#create a Python list of lists for our example
data_e2 = []
for x in range(1,6):
    random_int = int(random.random() * 10)
    data_e2.append([x, random_int, random_int^2])

#create the RDD with the random list of lists
rdd_example2 = sc.parallelize(data_e2)
print rdd_example2.collect()


[[1, 1, 3], [2, 1, 3], [3, 3, 1], [4, 0, 2], [5, 6, 4]]

In [12]:
from pyspark.sql.types import *

#now we can assign some header information

# The schema is encoded in a string.
schemaString = "ID VAL1 VAL2"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaExample = sqlContext.createDataFrame(rdd_example2, schema)

# Register the DataFrame as a table.
schemaExample.registerTempTable("example2")

# Pull the data
print schemaExample.collect()


[Row(ID=u'1', VAL1=u'1', VAL2=u'3'), Row(ID=u'2', VAL1=u'1', VAL2=u'3'), Row(ID=u'3', VAL1=u'3', VAL2=u'1'), Row(ID=u'4', VAL1=u'0', VAL2=u'2'), Row(ID=u'5', VAL1=u'6', VAL2=u'4')]

In [13]:
#In Dataframes we can reference the columns names for example:

for row in schemaExample.take(2):
    print row.ID, row.VAL1, row.VAL2


1 1 3
2 1 3

In [14]:
#Again a simple sql example:

sqlContext.sql("select * from example2").toPandas()


Out[14]:
ID VAL1 VAL2
0 1 1 3
1 2 1 3
2 3 3 1
3 4 0 2
4 5 6 4

Another Example of creating a Dataframe from an RDD


In [15]:
#Remember this RDD:
print type(rdd_example2)
print rdd_example2.collect()


<class 'pyspark.rdd.RDD'>
[[1, 1, 3], [2, 1, 3], [3, 3, 1], [4, 0, 2], [5, 6, 4]]

In [16]:
#we can use Row to specify the name of the columns with a Map, then use that to create the Dataframe
from pyspark.sql import Row

rdd_example3 = rdd_example2.map(lambda x: Row(id=x[0], val1=x[1], val2=x[2]))

print rdd_example3.collect()


[Row(id=1, val1=1, val2=3), Row(id=2, val1=1, val2=3), Row(id=3, val1=3, val2=1), Row(id=4, val1=0, val2=2), Row(id=5, val1=6, val2=4)]

In [17]:
#now we can convert rdd_example3 to a Dataframe

df_example3 = rdd_example3.toDF()
df_example3.registerTempTable("df_example3")

print type(df_example3)


<class 'pyspark.sql.dataframe.DataFrame'>

In [18]:
#now a simple SQL statement
sqlContext.sql("select * from df_example3").toPandas()


Out[18]:
id val1 val2
0 1 1 3
1 2 1 3
2 3 3 1
3 4 0 2
4 5 6 4

Joins are supported, here is a simple example with our two new tables

We can join example2 and example3 on ID


In [19]:
query = """
select
    *
from
    example2 e2
inner join df_example3 e3 on
    e2.id = e3.id
"""

print sqlContext.sql(query).toPandas()


---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-19-1f4835386318> in <module>()
      8 """
      9 
---> 10 print sqlContext.sql(query).toPandas()

/root/spark-1.6.0-bin-fluxcapacitor/python/pyspark/sql/context.pyc in sql(self, sqlQuery)
    581         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    582         """
--> 583         return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
    584 
    585     @since(1.0)

/root/spark-1.6.0-bin-fluxcapacitor/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/root/spark-1.6.0-bin-fluxcapacitor/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     49                                              e.java_exception.getStackTrace()))
     50             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 51                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     52             if s.startswith('java.lang.IllegalArgumentException: '):
     53                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u"cannot resolve 'e2.id' given input columns VAL1, ID, val2, val1, id, VAL2;"

In [ ]:
#Alternatively you can join within Python as well

df_example4 = df_example3.join(schemaExample, schemaExample["id"] == df_example3["ID"] )

for row in df_example4.take(5):
    print row

One of the more powerful features is the ability to create Functions and Use them in SQL Here is a simple example


In [ ]:
#first we create a Python function:

def simple_function(v):
    return int(v * 10)

#test the function
print simple_function(3)

In [ ]:
#now we can register the function for use in SQL
sqlContext.registerFunction("simple_function", simple_function)

In [ ]:
#now we can apply the filter in a SQL Statement
query = """
select
    ID,
    VAL1,
    VAL2,
    simple_function(VAL1) as s_VAL1,
    simple_function(VAL2) as s_VAL1
from
 example2
"""
sqlContext.sql(query).toPandas()

In [ ]:
#note that the VAL1 and VAL2 look like strings, we can cast them as well
query = """
select
    ID,
    VAL1,
    VAL2,
    simple_function(cast(VAL1 as int)) as s_VAL1,
    simple_function(cast(VAL2 as int)) as s_VAL1
from
 example2
"""
sqlContext.sql(query).toPandas()

Pandas Example

Pandas is a common abstraction for working with data in Python.

We can turn Pandas Dataframes into Spark Dataframes, the advantage of this could be scale or allowing us to run SQL statements agains the data.


In [ ]:
#import pandas library
import pandas as pd
print pd

First, let's grab some UFO data to play with


In [ ]:
!rm SIGHTINGS.csv -f
!wget https://www.quandl.com/api/v3/datasets/NUFORC/SIGHTINGS.csv

In [ ]:
#using the CSV file from earlier, we can create a Pandas Dataframe:
pandas_df = pd.read_csv("SIGHTINGS.csv")
pandas_df.head()

In [ ]:
#now convert to Spark Dataframe
spark_df = sqlContext.createDataFrame(pandas_df)

In [ ]:
#explore the first two rows:

for row in spark_df.take(2):
    print row

In [ ]:
#register the Spark Dataframe as a table
spark_df.registerTempTable("ufo_sightings")

In [ ]:
#now a SQL statement
print sqlContext.sql("select * from ufo_sightings limit 10").collect()

Visualizing the Data

Here are some simple ways to create charts using Pandas output

In order to display in the notebook we need to tell matplotlib to render inline at this point import the supporting libraries as well


In [ ]:
%matplotlib inline 
import matplotlib.pyplot as plt, numpy as np

Pandas can call a function "plot" to create the charts. Since most charts are created from aggregates the record set should be small enough to store in Pandas

We can take our UFO data from before and create a Pandas Dataframe from the Spark Dataframe


In [ ]:
ufos_df = spark_df.toPandas()

To plot we call the "plot" method and specify the type, x and y axis columns and optionally the size of the chart.

Many more details can be found here: http://pandas.pydata.org/pandas-docs/stable/visualization.html


In [ ]:
ufos_df.plot(kind='bar', x='Reports', y='Count', figsize=(12, 5))

This doesn't look good, there are too many observations, let's check how many:


In [ ]:
print sqlContext.sql("select count(*) from ufo_sightings limit 10").collect()

Ideally we could just group by year, there are many ways we could solve that:

1) parse the Reports column in SQL and output the year, then group on it 2) create a simple Python function to parse the year and call it via sql 3) as shown below: use map against the Dataframe and append a new column with "year"

Tge example below takes the existing data for each row and appends a new column "year" by taking the first for characters from the Reports column

Reports looks like this for example: 2016-01-31


In [ ]:
ufos_df = spark_df.map(lambda x: Row(**dict(x.asDict(), year=int(x.Reports[0:4]))))

Quick check to verify we get the expected results


In [ ]:
print ufos_df.take(5)

Register the new Dataframe as a table "ufo_withyear"


In [ ]:
ufos_df.registerTempTable("ufo_withyear")

Now we can group by year, order by year and filter to the last 66 years


In [ ]:
query = """
select 
    sum(count) as count, 
    year 
from ufo_withyear
where year > 1950
group by year
order by year
"""
pandas_ufos_withyears = sqlContext.sql(query).toPandas()
pandas_ufos_withyears.plot(kind='bar', x='year', y='count', figsize=(12, 5))

In [ ]: