本文档改编自databricks官方说明Notebook 《A Gentle Introduction to Apache Spark on Databricks》(https://docs.databricks.com/_static/notebooks/gentle-introduction-to-apache-spark.html)

First let's explore the previously mentioned SparkSession. We can access it via the spark variable. As explained, the Spark Session is the core location for where Apache Spark related information is stored. For Spark 1.X the variables are sqlContext and sc.

在Spark 2.x中Spark的统一入口为SparkSession类,可以使用spark变量访问;而在Spark 1.x中,SparkSQL的入口为SQLContext类,对应通过变量sqlContext或sc访问,这两个变量在2.0中依然可用

Cells can be executed by hitting shift+enter while the cell is selected.

选定的Cell可以使用shift+enter快捷键直接运行


In [2]:
sqlContext
sc

In [3]:
## If you're on 2.X the spark session is made available with the variable below
#####

spark

We can use the Spark Context to access information but we can also use it to parallelize a collection as well. Here we'll parallelize a small python range that will provide a return type of DataFrame.


In [5]:
firstDataFrame = spark.range(1000000) #生成一个100万行的DataFrame,将自动生成该DataFrame的Schema

print firstDataFrame

In [6]:
# An example of a transformation
# select the ID column values and multiply them by 2
# Transform类操作,在该DataFrame上执行Select子句
secondDataFrame = firstDataFrame.selectExpr("(id * 2) as value")
# as value 语句影响secondDataFrame的Schema定义,原id列的类型及select子句中的操作影响对应secondDataFrame中结果列的类型
print secondDataFrame

In [7]:
# an example of an action
# take the first 5 values that we have in our firstDataFrame
# Action类操作,从DataFrame中提取前五条记录
print firstDataFrame.take(5)
# take the first 5 values that we have in our secondDataFrame
print secondDataFrame.take(5)

In [8]:
%fs ls /databricks-datasets/Rdatasets/data-001/datasets.csv

In [9]:
# 以diamonds.csv为例演示根据CSV生成DataFrame实例
# read的相关用法详见 http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader
dataPath = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"

# CSV的第一行为Head
# 多次读取文件以自动检测列的类型
# 定义Load操作,从dataPath生成DataFrame
diamonds = spark.read.option("header","true").option("inferSchema", "true").csv(dataPath) 
# 等价为 diamonds = spark.read.csv(dataPath,header=True, inferSchema=True) 
print diamonds
# inferSchema means we will automatically figure out column types 
# at a cost of reading the data more than once

Now that we've loaded in the data, we're going to perform computations on it. This provide us a convenient tour of some of the basic functionality and some of the nice features that makes running Spark on Databricks the simplest! In order to be able to perform our computations, we need to understand more about the data. We can do this with the display function.

使用display函数可以更方便地查看Dataframe中的数据


In [11]:
display(diamonds)

What makes display exceptional is the fact that we can very easily create some more sophisticated graphs by clicking the graphing icon that you can see below. Here's a plot that allows us to compare price, color, and cut.

使用display函数显示的内容可以方便地点击左下图表图标选择不同的可视化展现方式


In [13]:
display(diamonds)

Now that we've explored the data, let's return to understanding transformations and actions. I'm going to create several transformations and then an action. After that we will inspect exactly what's happening under the hood.

These transformations are simple, first we group by two variables, cut and color and then compute the average price. Then we're going to inner join that to the original dataset on the column color. Then we'll select the average price as well as the carat from that new dataset.


In [15]:
# 等价于SQL语句中的GROUP BY子句、SELECT子句中的Average()函数
df1 = diamonds.groupBy("cut", "color").avg("price") # a simple grouping
# 等价于SQL语句中的 JOIN ... ON子句、SELECT子句
# 参数how定义了Join的类型,默认为inner,可取值为inner, outer, left_outer, right_outer, leftsemi.
df2 = df1\
  .join(diamonds, on='color', how='inner')\
  .select("`avg(price)`", "carat")
# a simple join and selecting some columns

These transformations are now complete in a sense but nothing has happened. As you'll see above we don't get any results back!

The reason for that is because these computations are lazy in order to build up the entire flow of data from start to finish required by the user. This is an intelligent optimization for two key reasons. Any calculation can be recomputed from the very source data allowing Apache Spark to handle any failures that occur along the way, and successfully handle stragglers. Secondly, Apache Spark can optimize computation so that data and computation can be pipelined as we mentioned above. Therefore, with each transformation Apache Spark creates a plan for how it will perform this work.

To get a sense for what this plan consists of, we can use the explain method. Remember that none of our computations have been executed yet, so all this explain method does is tells us the lineage for how to compute this exact dataset.


In [17]:
# 显示运算至该DataFrame的执行计划
df2.explain()

Now explaining the above results is outside of this introductory tutorial, but please feel free to read through it. What you should deduce from this is that Spark has generated a plan for how it hopes to execute the given query. Let's now run an action in order to execute the above plan.


In [19]:
# 统计DataFrame的行数
df2.count()

Caching

One of the significant parts of Apache Spark is its ability to store things in memory during computation. This is a neat trick that you can use as a way to speed up access to commonly queried tables or pieces of data. This is also great for iterative algorithms that work over and over again on the same data. While many see this as a panacea for all speed issues, think of it much more like a tool that you can use. Other important concepts like data partitioning, clustering and bucketing can end up having a much greater effect on the execution of your job than caching, however, remember - these are all tools in your tool kit!

To cache a DataFrame or RDD, simply use the cache method.


In [21]:
df2.cache()

Caching, like a transformation, is performed lazily. That means that it won't store the data in memory until you call an action on that dataset.

Here's a simple example. We've created our df2 DataFrame which is essentially a logical plan that tells us how to compute that exact DataFrame. We've told Apache Spark to cache that data after we compute it for the first time. So let's call a full scan of the data with a count twice. The first time, this will create the DataFrame, cache it in memory, then return the result. The second time, rather than recomputing that whole DataFrame, it will just hit the version that it has in memory.

Let's take a look at how we can discover this.


In [23]:
df2.count()

However after we've now counted the data. We'll see that the explain ends up being quite different.


In [25]:
df2.count()