In [20]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
In [2]:
!rm world_bank* -f
!wget https://raw.githubusercontent.com/bradenrc/sparksql_pot/master/world_bank.json.gz
In [3]:
example1_df = sqlContext.read.json("world_bank.json.gz")
In [4]:
print example1_df.printSchema()
In [5]:
for row in example1_df.take(2):
print row
print "*" * 20
In [6]:
#Simply use the Dataframe Object to create the table:
example1_df.registerTempTable("world_bank")
In [7]:
#now that the table is registered we can execute sql commands
#NOTE that the returned object is another Dataframe:
temp_df = sqlContext.sql("select * from world_bank limit 2")
print type(temp_df)
print "*" * 20
print temp_df
In [8]:
#one nice feature of the notebooks and python is that we can show it in a table via Pandas
sqlContext.sql("select id, borrower from world_bank limit 2").toPandas()
Out[8]:
In [9]:
#Here is a simple group by example:
query = """
select
regionname ,
count(*) as project_count
from world_bank
group by regionname
order by count(*) desc
"""
sqlContext.sql(query).toPandas()
Out[9]:
In [10]:
#subselect works as well:
query = """
select * from
(select
regionname ,
count(*) as project_count
from world_bank
group by regionname
order by count(*) desc) table_alias
limit 2
"""
sqlContext.sql(query).toPandas()
Out[10]:
In the example below a simple RDD is created with Random Data in two columns and an ID column.
In [11]:
import random
#first let's create a simple RDD
#create a Python list of lists for our example
data_e2 = []
for x in range(1,6):
random_int = int(random.random() * 10)
data_e2.append([x, random_int, random_int^2])
#create the RDD with the random list of lists
rdd_example2 = sc.parallelize(data_e2)
print rdd_example2.collect()
In [12]:
from pyspark.sql.types import *
#now we can assign some header information
# The schema is encoded in a string.
schemaString = "ID VAL1 VAL2"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaExample = sqlContext.createDataFrame(rdd_example2, schema)
# Register the DataFrame as a table.
schemaExample.registerTempTable("example2")
# Pull the data
print schemaExample.collect()
In [13]:
#In Dataframes we can reference the columns names for example:
for row in schemaExample.take(2):
print row.ID, row.VAL1, row.VAL2
In [14]:
#Again a simple sql example:
sqlContext.sql("select * from example2").toPandas()
Out[14]:
In [15]:
#Remember this RDD:
print type(rdd_example2)
print rdd_example2.collect()
In [16]:
#we can use Row to specify the name of the columns with a Map, then use that to create the Dataframe
from pyspark.sql import Row
rdd_example3 = rdd_example2.map(lambda x: Row(id=x[0], val1=x[1], val2=x[2]))
print rdd_example3.collect()
In [17]:
#now we can convert rdd_example3 to a Dataframe
df_example3 = rdd_example3.toDF()
df_example3.registerTempTable("df_example3")
print type(df_example3)
In [18]:
#now a simple SQL statement
sqlContext.sql("select * from df_example3").toPandas()
Out[18]:
In [19]:
query = """
select
*
from
example2 e2
inner join df_example3 e3 on
e2.id = e3.id
"""
print sqlContext.sql(query).toPandas()
In [ ]:
#Alternatively you can join within Python as well
df_example4 = df_example3.join(schemaExample, schemaExample["id"] == df_example3["ID"] )
for row in df_example4.take(5):
print row
In [ ]:
#first we create a Python function:
def simple_function(v):
return int(v * 10)
#test the function
print simple_function(3)
In [ ]:
#now we can register the function for use in SQL
sqlContext.registerFunction("simple_function", simple_function)
In [ ]:
#now we can apply the filter in a SQL Statement
query = """
select
ID,
VAL1,
VAL2,
simple_function(VAL1) as s_VAL1,
simple_function(VAL2) as s_VAL1
from
example2
"""
sqlContext.sql(query).toPandas()
In [ ]:
#note that the VAL1 and VAL2 look like strings, we can cast them as well
query = """
select
ID,
VAL1,
VAL2,
simple_function(cast(VAL1 as int)) as s_VAL1,
simple_function(cast(VAL2 as int)) as s_VAL1
from
example2
"""
sqlContext.sql(query).toPandas()
In [ ]:
#import pandas library
import pandas as pd
print pd
First, let's grab some UFO data to play with
In [ ]:
!rm SIGHTINGS.csv -f
!wget https://www.quandl.com/api/v3/datasets/NUFORC/SIGHTINGS.csv
In [ ]:
#using the CSV file from earlier, we can create a Pandas Dataframe:
pandas_df = pd.read_csv("SIGHTINGS.csv")
pandas_df.head()
In [ ]:
#now convert to Spark Dataframe
spark_df = sqlContext.createDataFrame(pandas_df)
In [ ]:
#explore the first two rows:
for row in spark_df.take(2):
print row
In [ ]:
#register the Spark Dataframe as a table
spark_df.registerTempTable("ufo_sightings")
In [ ]:
#now a SQL statement
print sqlContext.sql("select * from ufo_sightings limit 10").collect()
In order to display in the notebook we need to tell matplotlib to render inline at this point import the supporting libraries as well
In [ ]:
%matplotlib inline
import matplotlib.pyplot as plt, numpy as np
Pandas can call a function "plot" to create the charts. Since most charts are created from aggregates the record set should be small enough to store in Pandas
We can take our UFO data from before and create a Pandas Dataframe from the Spark Dataframe
In [ ]:
ufos_df = spark_df.toPandas()
To plot we call the "plot" method and specify the type, x and y axis columns and optionally the size of the chart.
Many more details can be found here: http://pandas.pydata.org/pandas-docs/stable/visualization.html
In [ ]:
ufos_df.plot(kind='bar', x='Reports', y='Count', figsize=(12, 5))
This doesn't look good, there are too many observations, let's check how many:
In [ ]:
print sqlContext.sql("select count(*) from ufo_sightings limit 10").collect()
1) parse the Reports column in SQL and output the year, then group on it 2) create a simple Python function to parse the year and call it via sql 3) as shown below: use map against the Dataframe and append a new column with "year"
Tge example below takes the existing data for each row and appends a new column "year" by taking the first for characters from the Reports column
Reports looks like this for example: 2016-01-31
In [ ]:
ufos_df = spark_df.map(lambda x: Row(**dict(x.asDict(), year=int(x.Reports[0:4]))))
Quick check to verify we get the expected results
In [ ]:
print ufos_df.take(5)
Register the new Dataframe as a table "ufo_withyear"
In [ ]:
ufos_df.registerTempTable("ufo_withyear")
Now we can group by year, order by year and filter to the last 66 years
In [ ]:
query = """
select
sum(count) as count,
year
from ufo_withyear
where year > 1950
group by year
order by year
"""
pandas_ufos_withyears = sqlContext.sql(query).toPandas()
pandas_ufos_withyears.plot(kind='bar', x='year', y='count', figsize=(12, 5))
In [ ]: