In [1]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Use a Python 3 kernel (not PySpark) to allow you to configure the SparkSession in the notebook and include the spark-bigquery-connector required to use the BigQuery Storage API.
In [2]:
!scala -version
In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('1.1. BigQuery Storage & Spark DataFrames - Python')\
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar') \
.getOrCreate()
In [4]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
In [5]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Select required columns and apply a filter using where()
which is an alias for filter()
then cache the table
In [7]:
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
.cache()
df_wiki_en
Out[7]:
Group by title and order by page views to see the top pages
In [8]:
import pyspark.sql.functions as F
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Out[8]:
Write the Spark Dataframe to BigQuery table using BigQuery Storage connector. This will also create the table if it does not exist. The GCS bucket and BigQuery dataset must already exist.
If the GCS bucket and BigQuery dataset do not exist they will need to be created before running df.write
In [9]:
# Update to your GCS bucket
gcs_bucket = 'dataproc-bucket-name'
# Update to your BigQuery dataset name you created
bq_dataset = 'dataset_name'
# Enter BigQuery table name you want to create or overwite.
# If the table does not exist it will be created when you run the write function
bq_table = 'wiki_total_pageviews'
df_wiki_en_totals.write \
.format("bigquery") \
.option("table","{}.{}".format(bq_dataset, bq_table)) \
.option("temporaryGcsBucket", gcs_bucket) \
.mode('overwrite') \
.save()
Use the BigQuery magic to check if the data was created successfully in BigQuery. This will run the SQL query in BigQuery and the return the results
In [10]:
%%bigquery
SELECT title, total_views
FROM dataset_name.wiki_total_pageviews
ORDER BY total_views DESC
LIMIT 10
Out[10]: