Introduction to Spark SQL via PySpark


Goals:

  • Get familiarized with the basics of Spark SQL and PySpark
  • Learn to create a SparkSession
  • Verify if Jupyter can talk to Spark Master

What is Spark SQL?

  • It is a Spark module that leverages Sparks functional programming APIs to allow SQL relational processing tasks on (semi)structured data.
  • Spark SQL provides Spark with more information about the structure of both the data and the computation being performed

What is PySpark?

PySpark is the Python API for Spark.

How do I start using Pyspark and SparkSQL?

  • Start by importing the PySpark SQL SparkSession Class and creating a SparkSession instance .
  • A SparkSession class is considered the entry point to programming Spark with the Dataset and DataFrame API.
  • A SparkSession can be used create DataFrames, register DataFrames as tables, execute SQL over tables, cache tables, and read parquet files.

Import SparkSession Class


In [1]:
from pyspark.sql import SparkSession

What is a SparkSession?

  • It is the driver process that controls a spark application
  • A SparkSession instance is responsible for executing the driver program’s commands (code) across executors (in a cluster) to complete a given task.
  • You can have as many SparkSessions as you want in a single Spark application.

How do I create a SparkSession?

  • You can use the SparkSession class attribute called Builder.
  • The class attribute builder allows you to run some of the following functions:
    • appName: Sets a name for the application
    • master: URL for the Spark master (Local or Spark standalone cluster)
    • enableHiveSupport: Enables Hive support, including connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined functions.
    • getOrCreate:Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.

Create a SparkSession instance

  • Define a spark variable
  • Pass values to the appName and master functions
    • For the master function, we are going to use the HELK's Spark Master container (helk-spark-master)

In [2]:
spark = SparkSession.builder \
    .appName("Python Spark SQL basic example") \
    .master("spark://helk-spark-master:7077") \
    .enableHiveSupport() \
    .getOrCreate()

Check the SparkSession variable


In [3]:
spark


Out[3]:

SparkSession - hive

SparkContext

Spark UI

Version
v2.4.0
Master
spark://helk-spark-master:7077
AppName
Python Spark SQL basic example

What is a Dataframe?

  • In Spark, a dataframe is the most common Structured API, and it is used to represent data in a table format with rows and columns.
  • Think of a dataframe as a spreadsheet with headers. The difference is that one Spark Dataframe can be distributed across several computers due to its large size or high computation requirements for faster analysis.
  • The list of column names from a dataframe with its respective data types is called the schema

Is a Spark Dataframe the same as a Python Pandas Dataframe?

  • A Python dataframe sits on one computer whereas a Spark Dataframe, once again, can be distributed across several computers.
  • PySpark allows the conversion from Python Pandas dataframes to Spark dataframes.

Create your first Dataframe

Let's create our first dataframe by using range and toDF functions.

  • One column named numbers
  • 10 rows containing numbers from 0-9

range(start, end=None, step=1, numPartitions=None)

  • Create a DataFrame with single pyspark.sql.types.LongType column named id, containing elements in a range from start to end (exclusive) with step value step.

toDF(*cols)

  • Returns a new class:DataFrame that with new specified column names

In [4]:
first_df = spark.range(10).toDF("numbers")

In [5]:
first_df.show()


+-------+
|numbers|
+-------+
|      0|
|      1|
|      2|
|      3|
|      4|
|      5|
|      6|
|      7|
|      8|
|      9|
+-------+

Create another Dataframe

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

  • Creates a DataFrame from an RDD, a list or a pandas.DataFrame.
  • When schema is a list of column names, the type of each column will be inferred from data.
  • When schema is None, it will try to infer the schema (column names and types) from data, which should be an RDD of Row, or namedtuple, or dict.

In [6]:
dog_data=[['Pedro','Doberman',3],['Clementine','Golden Retriever',8],['Norah','Great Dane',6]\
         ,['Mabel','Austrailian Shepherd',1],['Bear','Maltese',4],['Bill','Great Dane',10]]
dog_df=spark.createDataFrame(dog_data, ['name','breed','age'])

In [7]:
dog_df.show()


+----------+--------------------+---+
|      name|               breed|age|
+----------+--------------------+---+
|     Pedro|            Doberman|  3|
|Clementine|    Golden Retriever|  8|
|     Norah|          Great Dane|  6|
|     Mabel|Austrailian Shepherd|  1|
|      Bear|             Maltese|  4|
|      Bill|          Great Dane| 10|
+----------+--------------------+---+

Check the Dataframe schema

  • We are going to do apply a concept called schema inference which lets spark takes its best guess at figuring out the schema.
  • Spark reads part of the dataframe and then tries to parse the types of data in each row.
  • You can also define a strict schema when you read in data which does not let Spark guess. This is recommended for production use cases.

schema

  • Returns the schema of this DataFrame as a pyspark.sql.types.StructType.

In [8]:
dog_df.schema


Out[8]:
StructType(List(StructField(name,StringType,true),StructField(breed,StringType,true),StructField(age,LongType,true)))

printSchema()

  • Prints out the schema in the tree format

In [9]:
dog_df.printSchema()


root
 |-- name: string (nullable = true)
 |-- breed: string (nullable = true)
 |-- age: long (nullable = true)

Access Dataframe Columns

select(*cols)

  • Projects a set of expressions and returns a new DataFrame.

Access Dataframes's columns by attribute (df.name):


In [10]:
dog_df.select("name").show()


+----------+
|      name|
+----------+
|     Pedro|
|Clementine|
|     Norah|
|     Mabel|
|      Bear|
|      Bill|
+----------+

Access Dataframe's columns by indexing (df['name']).

  • According to Sparks documentation, the indexing form is the recommended one because it is future proof and won’t break with column names that are also attributes on the DataFrame class.

In [11]:
dog_df.select(dog_df["name"]).show()


+----------+
|      name|
+----------+
|     Pedro|
|Clementine|
|     Norah|
|     Mabel|
|      Bear|
|      Bill|
+----------+

Filter Dataframe

filter(condition)

  • Filters rows using the given condition.

Select dogs that are older than 4 years


In [12]:
dog_df.filter(dog_df["age"] > 4).show()


+----------+----------------+---+
|      name|           breed|age|
+----------+----------------+---+
|Clementine|Golden Retriever|  8|
|     Norah|      Great Dane|  6|
|      Bill|      Great Dane| 10|
+----------+----------------+---+

Group Dataframe

groupBy(*cols)

  • Groups the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.

group dogs and count them by their age


In [13]:
dog_df.groupBy(dog_df["age"]).count().show()


+---+-----+
|age|count|
+---+-----+
|  6|    1|
|  1|    1|
| 10|    1|
|  3|    1|
|  8|    1|
|  4|    1|
+---+-----+

Run SQL queries on your Dataframe

createOrReplaceTempView(name)

  • Creates or replaces a local temporary view with this DataFrame.
  • The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.

Register the current Dataframe as a SQL temporary view


In [14]:
dog_df.createOrReplaceTempView("dogs")

sql_dog_df = spark.sql("SELECT * FROM dogs")
sql_dog_df.show()


+----------+--------------------+---+
|      name|               breed|age|
+----------+--------------------+---+
|     Pedro|            Doberman|  3|
|Clementine|    Golden Retriever|  8|
|     Norah|          Great Dane|  6|
|     Mabel|Austrailian Shepherd|  1|
|      Bear|             Maltese|  4|
|      Bill|          Great Dane| 10|
+----------+--------------------+---+


In [15]:
sql_dog_df = spark.sql("SELECT * FROM dogs WHERE name='Pedro'")
sql_dog_df.show()


+-----+--------+---+
| name|   breed|age|
+-----+--------+---+
|Pedro|Doberman|  3|
+-----+--------+---+