SparkSQL Lab:

From this lab, you would write code to execute SQL query in Spark. Makes your analytic life simpler and faster.

During this lab we will cover:

Part 1: Linking with SparkSQL

Part 2: Loading data programmatically

Part 3: User-Defined Functions

Part 4: Caching for performance

Part 5: Your show time - How many authors tagged as spam?

Reference for Spark RDD Spark's Python API

Part 1: Linking with SparkSQL


In [ ]:
from pyspark.sql import HiveContext, Row
sqlContext= HiveContext(sc)

In [ ]:
sqlContext

Part 2: Loading data programmatically

(2a) Read local JSON file to DataFrame

Now, try to read json file from Spark Example. Thank for the hashed spam data from PIXNET PIXNET HACKATHON 2015


In [ ]:
jsonfile = "file:///opt/spark-1.4.1-bin-hadoop2.6/examples/src/main/resources/people.json"
df = sqlContext.read.load(jsonfile, format="json")

Show time: Query top 2 row


In [ ]:
# TODO: Replace <FILL IN> with appropriate code
df.<FILL IN>

In [ ]:
#print df's schema
df.printSchema()

(2b) Read from Hive

Don't forget the configuration of Hive should be done by placing your hive-site.xml file in conf/.


In [ ]:
sqlContext.sql("SHOW TABLES").show()

In this class, you will use pixnet_user_log_1000 for further works


In [ ]:
sqlContext.sql("SELECT *\ 
                FROM pixnet_user_log_1000").printSchema()

How many rows in pixnet_user_log


In [ ]:
from datetime import datetime
start_time = datetime.now()

df2 = sqlContext.sql("SELECT * \
                    FROM pixnet_user_log_1000")

end_time = datetime.now()

print df2.count()
print('Duration: {}'.format(end_time - start_time))

In [ ]:
df2.select('time').show(2)

Part 3: User-Defined Functions

In part 3, you will create your first UDF in Spark SQL with elegant lambda


In [ ]:
#registers this RDD as a temporary table using the given name.
df2.registerTempTable("people")

# Create an UDF for how long some text is
# example from user guide, length function
sqlContext.registerFunction("strLenPython", lambda x: len(x)) 

# split function for parser
sqlContext.registerFunction("strDate", lambda x: x.split("T")[0])

# put udf with expected columns
results = sqlContext.sql("SELECT author, \
                                 strDate(time) AS dt, \
                                 strLenPython(action) AS lenAct \
                            FROM people")

In [ ]:
# or you could use df
df2.select('author').show(3)

#
strLenPython = udf(lambda s: len(s), IntegerType())
df2.select(strLenPython(df2.action).alias('action_len')).show(2)

In [ ]:
# print top 5 results
results.show(5)

Part 4: Caching for performance

Saving to persistent tables

saveAsTable : Saves the contents of this DataFrame to a data source as a table.


In [19]:
sqlContext.cacheTable("people")

In [ ]:
start_time = datetime.now()

sqlContext.sql("SELECT * FROM people").count()

end_time = datetime.now()
print('Duration: {}'.format(end_time - start_time))

In [ ]:
sqlContext.sql("SELECT   strDate(time) AS dt,\
                         count(distinct author) AS cnt \
                FROM     people \
                GROUP BY strDate(time)").show(5)

In [ ]:
sqlContext.uncacheTable("people")

Part 5: Your show time - How many authors are tagged as spam from pixnet_user_log?

Here are two hive tables you will need:

(1) author with action pixnet_user_spam

(2) author with spam tag pixnet_user_log_1000


In [ ]:
# TODO: Replace <FILL IN> with appropriate code
result = <FILL IN>

Don't forget to stop sc


In [23]:
sc.stop()