Lab 2: Faster and Cheaper Queries with Table Partitions and Clustering

Learning Objectives

  • Create a SQL query to analyze sales from our baseline table
  • Analyze the query execution plan for performance optimization opportunities
  • Find out how much data was discarded in full table scans
  • Analyze the same query against a partitioned table
  • Learn how to create partitioned tables with SQL DDL
  • Create partitioned tables for the entire dataset
  • Run benchmark queries and compare performance to our baseline

Analyze current architecture for partitions

Let's find the largest table and see the current architecture.

From a previous lab we created the below query:


In [3]:
%%bigquery
SELECT 
  dataset_id,
  table_id,
  -- Convert bytes to GB.
  ROUND(size_bytes/pow(10,9),2) as size_gb,
  -- Convert UNIX EPOCH to a timestamp.
  TIMESTAMP_MILLIS(creation_time) AS creation_time,
  TIMESTAMP_MILLIS(last_modified_time) as last_modified_time,
  row_count,
  CASE 
    WHEN type = 1 THEN 'table'
    WHEN type = 2 THEN 'view'
  ELSE NULL
  END AS type
FROM
  `dw-workshop.tpcds_2t_baseline.__TABLES__`
ORDER BY size_gb DESC
LIMIT 1


Out[3]:
dataset_id table_id size_gb creation_time last_modified_time row_count type
0 tpcds_2t_baseline store_sales 1545.13 2019-10-13 19:15:03.190000+00:00 2019-10-13 19:15:03.190000+00:00 5762820700 table

Our store_sales table is 1,545 GB and 5.7 Billion rows.

Create a standard sales report

Your manager has asked you to query the existing data warehouse tables and build a report that shows:

  • the top 10 sales ss_net_paid for all sales on or after 2000-01-01
  • include the name of the product
  • include the name and email of the customer and whether they are a preferred customer
  • exclude customers with a NULL ss_customer_sk
  • include the date and time of the order as a formatted timestamp

Note: Develop the query in the BigQuery console so you will be able to (1) have the benefit of the query validator as you type and (2) you can view the Execution Plan after your query runs.


In [13]:
%%bigquery --verbose
SELECT  
  PARSE_TIMESTAMP (
  
  "%Y-%m-%d %T %p",
  
  CONCAT(
    CAST(d_date AS STRING),
    ' ',
    CAST(t_hour AS STRING),
    ':',
    CAST(t_minute AS STRING),
    ':',
    CAST(t_second AS STRING),
    ' ',t_am_pm)
    
    ,"America/Los_Angeles")
    
    AS timestamp,
  s.ss_item_sk,
  i.i_product_name,
  s.ss_customer_sk,
  c.c_first_name,
  c.c_last_name,
  c.c_email_address,
  c.c_preferred_cust_flag,
  s.ss_quantity,
  s.ss_net_paid
FROM
  `dw-workshop.tpcds_2t_baseline.store_sales` AS s
  JOIN 
  `dw-workshop.tpcds_2t_baseline.date_dim` AS d
  ON s.ss_sold_date_sk = d.d_date_sk
  
  JOIN
  `dw-workshop.tpcds_2t_baseline.time_dim` AS t
  ON s.ss_sold_time_sk = t.t_time_sk
  
  JOIN
  `dw-workshop.tpcds_2t_baseline.item` AS i
  ON s.ss_item_sk = i.i_item_sk
  
  JOIN 
  `dw-workshop.tpcds_2t_baseline.customer` AS c
  ON s.ss_customer_sk = c. c_customer_sk
  
WHERE d_date >= '2000-01-01' AND ss_customer_sk IS NOT NULL
ORDER BY ss_net_paid DESC
LIMIT 10

This simple report took 40+ seconds to execute and processed over 200+ GB of data. Let's see where we can improve.

Gaining insight from the Query Execution Details (part 1: high level stats)

Learning how BigQuery processes your query under-the-hood is critical to understanding where you can improve performance.

After you executed the previous query, in the BigQuery console click on Execution details

Your query plan should be largely similar to ours below. Scan through the execution statistics and answer the questions that follow.

Slot time

As you can see above, your query took 27 seconds to process 5.7 Billion rows. So what does the 10hr 35min slot time metric mean?

Recall from our discussion in Lab 1 that inside the BigQuery service are lots of virtual machines that massively process your data and query logic in parallel. These workers, or "slots", work together to process a single query job really quickly.

The BigQuery engine fully-manages the task of taking your query and farming it out to a workers who get the raw data and process the work to be done.

So say we had 30 minutes of slot time or 1800 seconds. If the query took 27 seconds in total to run, but it was 36000 seconds (or 10 hours) worth of work, how many workers at minimum worked on it? 36000/27 = 1,333

And that's assuming each worker instantly had all the data it needed (no shuffling of data between workers) and was at full capacity for all 27 seconds!

The worker quota for an on-demand query is 2,000 slots at one time so we want to find ways we can optimize and reduce the resources consumed.

Bytes shuffled

We had 382 GB of data shuffled. What does that mean?

First let's explore the architecture of BigQuery and how it can process PB+ datasets in seconds. The BigQuery team explains how the managed service is setup in this detailed blog post where the below diagram is sourced:

The actual Google services that the engine uses are

  • Dremel (the execution engine)
  • Jupiter (the petabit scale Google datacenter network)
  • Colossus (distributed clusters of storage)

Storage

Our store_sales table (1,545 GB and 5.7 Billion rows) isn't stored on just one server. In fact, BigQuery compresses and stores each column of the data and stores pieces of it across many commodity servers

Compute

When it comes time to process a query like top sales after the year 2000, BigQuery starts up a fleet of workers to grab and process pieces of data. Since most of the time no single worker has a complete picture of all 5.7 Billon rows, they need to communicate with each other by passing data back and forth. This fast in-memory data shuffling or repartitioning process can be time and resource intensive.

Below is an example diagram that shows the work-shuffle-work process for each worker:

As you can see in the Execution details, workers have a variety of tasks (waiting for data, reading it, performing computations, and writing data)

Gaining insight from the Query Execution Details (part 2: repartitioning / shuffling)

Here is the next part of our execution plan after the data is read from disk. Note the longer blue bars indicate more time spent by workers. Generally the average worker (avg) and the slowest worker (max) are aligned unless your dataset is unbalanced and skewed heavily to a few values (hotspots).

Here you can see time spent joining against other datasets and then many repartitions afterward to shuffle the billions of rows across workers for processing. You'll note that the input and output row counts for repartitions for the same as they are purely a shuffling effort across workers.

What triggers repartitions?

The performance-expensive operations in our query were:

  1. looking at every record and comparing to see if it was before or after 2000-01-01
  2. sorting large volumes of data by timestamp
  3. computing a calculated field

How much data was unused?

BigQuery had to scan and compare all records in the dataset to see if it matched our date condition. What percent of records were ultimately thrown away (pre-2000)?


In [14]:
%%bigquery

WITH stats AS (

SELECT 
  COUNT(*) AS all_records,
  COUNTIF(d_date >= '2000-01-01') AS after_2000,
  COUNTIF(d_date < '2000-01-01') AS before_2000
  
FROM
  `dw-workshop.tpcds_2t_baseline.store_sales` AS s
  JOIN 
  `dw-workshop.tpcds_2t_baseline.date_dim` AS d
  ON s.ss_sold_date_sk = d.d_date_sk
)

SELECT
  format("%'d",all_records) AS all_records,
  format("%'d",after_2000) AS after_2000,
  format("%'d",before_2000) AS before_2000,
  ROUND(after_2000 / all_records,4) AS percent_dataset_used
FROM stats


Out[14]:
all_records after_2000 before_2000 percent_dataset_used
0 5,503,485,473 3,314,029,432 2,189,456,041 0.6022

We only ended up using 60% of our dataset for analysis.

Isn't there a faster way of eliminating the other 40% of records without having to check the date value of each row?

Yes! With date-partitioned tables.

Reducing data scanned with Partitioned tables

Partitioning automatically buckets groups records of data based on a date or timestamp value which enables fast filtering by date.

We've already created a new table called dw-workshop.tpcds_2t_flat_part_clust.partitioned_table which we will show you how to do in the next section.


In [16]:
%%bigquery --verbose
SELECT  
  timestamp,
  s.ss_item_sk,
  i.i_product_name,
  s.ss_customer_sk,
  c.c_first_name,
  c.c_last_name,
  c.c_email_address,
  c.c_preferred_cust_flag,
  s.ss_quantity,
  s.ss_net_paid
FROM
  `dw-workshop.tpcds_2t_flat_part_clust.partitioned_table` AS s
    
  /*  Date and time tables denormalized as part of the partitioned table
  JOIN 
  `dw-workshop.tpcds_2t_baseline.date_dim` AS d
  ON s.ss_sold_date_sk = d.d_date_sk
  
  JOIN
  `dw-workshop.tpcds_2t_baseline.time_dim` AS t
  ON s.ss_sold_time_sk = t.t_time_sk
  */  
  
  JOIN
  `dw-workshop.tpcds_2t_baseline.item` AS i
  ON s.ss_item_sk = i.i_item_sk
  
  JOIN 
  `dw-workshop.tpcds_2t_baseline.customer` AS c
  ON s.ss_customer_sk = c. c_customer_sk
  
WHERE DATE(timestamp) >= '2000-01-01' AND ss_customer_sk IS NOT NULL
ORDER BY ss_net_paid DESC
LIMIT 10


Executing query with job ID: 60bc2893-6e6d-4a0a-98d9-f076023530d9
Query executing: 20.22s
Query complete after 22.18s
Out[16]:
timestamp ss_item_sk i_product_name ss_customer_sk c_first_name c_last_name c_email_address c_preferred_cust_flag ss_quantity ss_net_paid
0 2002-12-07 19:59:26+00:00 43596 callyn stantipriese 437938 Emily Miles Emily.Miles@mJbptc1eoN.org N 100 19972
1 2001-01-24 03:01:01+00:00 17457 ationantieseationought 320021 Carlos Knight Carlos.Knight@KCOyPHE1rH.com N 100 19972
2 2001-07-21 15:35:26+00:00 36229 n stableablecallypri 5010096 Amy Lugo Amy.Lugo@teJtjEufGCvXiRuph.com Y 100 19972
3 2002-01-05 01:32:17+00:00 2748 eingeseationable 4719256 Andrew Henderson Andrew.Henderson@O9Bv4tNyUsDHAt7pyb.com N 100 19972
4 2000-07-14 02:05:37+00:00 26269 n stcallyablecallyable 6127654 Ronald Jung Ronald.Jung@Maoo95C.edu N 100 19972
5 2002-11-19 03:22:49+00:00 17574 eseationantiationought 7200746 Justin Black Justin.Black@iiTb4pzrv2QDa.org Y 100 19964
6 2002-09-13 00:01:11+00:00 15363 pricallypriantiought 2244656 Victor Baker Victor.Baker@tVs0Nva.edu Y 100 19964
7 2000-12-31 16:26:13+00:00 40169 n stcallyoughtbarese 5965669 Shane Martinez Shane.Martinez@rTIQV8rHUiBj.com N 100 19928
8 2001-04-06 17:44:42+00:00 21245 antieseableoughtable 1514229 Daniel Cannon Daniel.Cannon@Xe.org Y 100 19928
9 2000-09-26 16:34:52+00:00 45863 pricallyeingantiese 2433894 Joyce Robles Joyce.Robles@jjiqRB7uJl7.org Y 100 19928

Performance comparison

Original Partitioned Improvement
Query time 27s 24.2s 10% faster
Bytes processed 290 GB 144 GB 50% cheaper
Slot time 10 hr 7 hr 30% more efficient
Bytes Shuffled 382 GB 293 GB 23% more efficient

Creating Partitioned Tables

You can create partitioned tables in a number of ways:

  1. Using SQL DDL From the results of a query
  2. When you create a new table schema

For a full list check out the documentation.

Note that date-partitioned tables (dedicated user-specified date/time column) are different than ingestion-time partitioned tables which are also available but not covered here.

Below is the example query to create the partitioned table we used earlier (no need to execute it).

CREATE OR REPLACE TABLE IF NOT EXISTS `dw-workshop.tpcds_2t_flat_part_clust.partitioned_table`
PARTITION BY DATE(timestamp) -- You define the column to partition on (it must be a date or time)
CLUSTER BY ss_net_paid -- Clustering is an added benefit for partitioned tables and explained later
OPTIONS (
  require_partition_filter=true -- You can mandate users must provide a WHERE clause when querying
  )
AS

SELECT  
  PARSE_TIMESTAMP (

  "%Y-%m-%d %T %p",

  CONCAT(
    CAST(d_date AS STRING),
    ' ',
    CAST(t_hour AS STRING),
    ':',
    CAST(t_minute AS STRING),
    ':',
    CAST(t_second AS STRING),
    ' ',t_am_pm)

    ,"America/Los_Angeles")

    AS timestamp,
  s.*
FROM
  `dw-workshop.tpcds_2t_flat_part_clust.store_sales` AS s
  JOIN 
  `dw-workshop.tpcds_2t_flat_part_clust.date_dim` AS d
  ON s.ss_sold_date_sk = d.d_date_sk

  JOIN
  `dw-workshop.tpcds_2t_flat_part_clust.time_dim` AS t
  ON s.ss_sold_time_sk = t.t_time_sk

Converting the TCP-DS tables to Partitioned tables

We can quickly verify that none of the tables in our baseline schema have partitioned columns by using the metadata table INFORMATION_SCHEMA


In [18]:
%%bigquery
SELECT * FROM 
 `dw-workshop.tpcds_2t_baseline.INFORMATION_SCHEMA.COLUMNS`
WHERE 
  is_partitioning_column = 'YES' OR clustering_ordinal_position IS NOT NULL


Out[18]:
table_catalog table_schema table_name column_name ordinal_position is_nullable data_type is_generated generation_expression is_stored is_hidden is_updatable is_system_defined is_partitioning_column clustering_ordinal_position

Create a new dataset to hold our partitioned tables

Let's leave the existing baseline dataset and tables and create a new dataset titled tpcds_2t_flat_part_clust


In [19]:
%%bash

## Create a BigQuery dataset for tpcds_2t_flat_part_clust if it doesn't exist
datasetexists=$(bq ls -d | grep -w tpcds_2t_flat_part_clust)

if [ -n "$datasetexists" ]; then
    echo -e "BigQuery dataset already exists, let's not recreate it."

else
    echo "Creating BigQuery dataset titled: tpcds_2t_flat_part_clust"
    
    bq --location=US mk --dataset \
        --description 'Partitioned and Clustered' \
        $PROJECT:tpcds_2t_flat_part_clust
   echo "\nHere are your current datasets:"
   bq ls
fi


BigQuery dataset already exists, let's not recreate it.

Create a new empty partitioned table

Let's pick one table to add partitioning and clustering to. It's easiest to add a partitioning column to a data table that has existing date or timestamp columns. Here we will use the store table which is a dimensional table for the name and address of each storefront for our business.

What to cluster on?

Finding a column to partition on is often the easy part. BigQuery also supports clustering on partitioned tables which can provide performance improvements for commonly filtered or sorted queries. The column(s) you specify are used to colocate related data.


In [22]:
%%bigquery
CREATE OR REPLACE TABLE tpcds_2t_flat_part_clust.store(
    s_store_sk                int64               NOT NULL,
    s_store_id                string              NOT NULL,
    s_rec_start_date          date                          ,
    s_rec_end_date            date                          ,
    s_closed_date_sk          int64                       ,
    s_store_name              string                   ,
    s_number_employees        int64                       ,
    s_floor_space             int64                       ,
    s_hours                   string                      ,
    s_manager                 string                   ,
    s_market_id               int64                       ,
    s_geography_class         string                  ,
    s_market_desc             string                  ,
    s_market_manager          string                   ,
    s_division_id             int64                       ,
    s_division_name           string                   ,
    s_company_id              int64                       ,
    s_company_name            string                   ,
    s_street_number           string                   ,
    s_street_name             string                   ,
    s_street_type             string                      ,
    s_suite_number            string                      ,
    s_city                    string                   ,
    s_county                  string                   ,
    s_state                   string                       ,
    s_zip                     string                      ,
    s_country                 string                   ,
    s_gmt_offset              numeric                  ,
    s_tax_precentage          numeric                  )

    # TODO: Specify a date field to partition on and a field to cluster on:
	PARTITION BY s_rec_start_date
	CLUSTER BY s_zip;
    
    SELECT * FROM tpcds_2t_flat_part_clust.store LIMIT 0;


Out[22]:
s_store_sk s_store_id s_rec_start_date s_rec_end_date s_closed_date_sk s_store_name s_number_employees s_floor_space s_hours s_manager ... s_street_name s_street_type s_suite_number s_city s_county s_state s_zip s_country s_gmt_offset s_tax_precentage

0 rows × 29 columns

Now that you have the empty table, it's time to populate it with data. This can take a while, feel free to cancel the execution and continue with the lab. We'll use the BigQuery Data Transfer Service later to copy over the entire dataset in seconds.


In [23]:
%%bigquery
insert into tpcds_2t_flat_part_clust.store(s_store_sk, s_store_id, s_rec_start_date, s_rec_end_date, s_closed_date_sk,
    s_store_name, s_number_employees, s_floor_space, s_hours, s_manager, s_market_id, s_geography_class, s_market_desc,
    s_market_manager, s_division_id, s_division_name, s_company_id, s_company_name, s_street_number, s_street_name,
    s_street_type, s_suite_number, s_city, s_county, s_state, s_zip, s_country, s_gmt_offset, s_tax_precentage)
select s_store_sk, s_store_id, s_rec_start_date, s_rec_end_date, s_closed_date_sk,
    s_store_name, s_number_employees, s_floor_space, s_hours, s_manager, s_market_id, s_geography_class, s_market_desc,
    s_market_manager, s_division_id, s_division_name, s_company_id, s_company_name, s_street_number, s_street_name,
    s_street_type, s_suite_number, s_city, s_county, s_state, s_zip, s_country, s_gmt_offset, s_tax_precentage
from `dw-workshop.tpcds_2t_baseline.store`;

SELECT * FROM tpcds_2t_flat_part_clust.store LIMIT 5;


Out[23]:
s_store_sk s_store_id s_rec_start_date s_rec_end_date s_closed_date_sk s_store_name s_number_employees s_floor_space s_hours s_manager ... s_street_name s_street_type s_suite_number s_city s_county s_state s_zip s_country s_gmt_offset s_tax_precentage
0 26 AAAAAAAAKBAAAAAA None 2000-03-12 NaN cally NaN NaN 8AM-4PM None ... None Avenue Suite 50 None None GA None United States None None
1 96 AAAAAAAAOFAAAAAA 2001-03-13 None 2450893.0 cally 288.0 5453633.0 8AM-4PM Denis Shah ... View Wy Suite 340 Shiloh Franklin Parish LA 79275 United States -6 0.11
2 175 AAAAAAAAPKAAAAAA 1997-03-13 None NaN anti 212.0 8062566.0 8AM-4PM Paul Vela ... Third Railroad Ln Suite D Shady Grove Luce County MI 42812 United States -5 0.07
3 157 AAAAAAAANJAAAAAA 1997-03-13 None 2451023.0 ation 262.0 7226618.0 8AM-4PM James Washington ... 3rd Drive Suite 20 Midway Luce County MI 41904 United States -5 0.05
4 67 AAAAAAAADEAAAAAA 1997-03-13 None 2451034.0 ation 272.0 6504783.0 8AM-4PM Jim Meyer ... 12th Center Avenue Suite J Shiloh Luce County MI 49275 United States -5 0.02

5 rows × 29 columns

Compare performance

In the console UI, copy and paste the below queries and run them as one statement.

SELECT * 
FROM `dw-workshop.tpcds_2t_baseline.store`  
WHERE s_rec_start_date > '2010-01-01';


SELECT * 
FROM `dw-workshop.tpcds_2t_flat_part_clust.store`
WHERE s_rec_start_date > '2010-01-01';

The results should look like the below

Why did the first query do an entire table scan of 59 KB (small table) while the second query barely processed any data 117 bytes = .11 KB?

It's because the second query hits a partitioned column and it automatically knows that there is no partition for 2010 data (the dataset goes up to 2003). Unlike the first query, it does this without having to open individual records.

Although this table is quite small, the same benefit applies to any partitioned table no matter how large.

Running the 99 benchmark queries on partitioned tables

Below are the CREATE TABLE statements for the remaining tables in our new tpcds_2t_flat_part_clust dataset. Note how you can still use clustering on a table that does not have an existing field to partition on by simply adding an empty_date column of type date.

Run the below statement to finishing creating the remaining 25 tables for our new partitioned dataset tables.


In [ ]:
%%bigquery
create table tpcds_2t_flat_part_clust.customer_address(
    ca_address_sk             int64               NOT NULL,
    ca_address_id             string              NOT NULL,
    ca_street_number          string                      ,
    ca_street_name            string                      ,
    ca_street_type            string                      ,
    ca_suite_number           string                      ,
    ca_city                   string                      ,
    ca_county                 string                      ,
    ca_state                  string                      ,
    ca_zip                    string                      ,
    ca_country                string                      ,
    ca_gmt_offset             numeric                     ,
    ca_location_type          string 					  ,
	empty_date                date                        )
	PARTITION BY empty_date
	CLUSTER BY ca_address_sk;


create table tpcds_2t_flat_part_clust.customer_demographics(
    cd_demo_sk                int64               NOT NULL,
    cd_gender                 string                       ,
    cd_marital_status         string                       ,
    cd_education_status       string                      ,
    cd_purchase_estimate      int64                       ,
    cd_credit_rating          string                      ,
    cd_dep_count              int64                       ,
    cd_dep_employed_count     int64                       ,
    cd_dep_college_count      int64                       ,
    empty_date                date                        )
	PARTITION BY empty_date
	CLUSTER BY cd_demo_sk;


create table tpcds_2t_flat_part_clust.date_dim(
    d_date_sk                 int64               NOT NULL,
    d_date_id                 string              NOT NULL,
    d_date                    date                          ,
    d_month_seq               int64                       ,
    d_week_seq                int64                       ,
    d_quarter_seq             int64                       ,
    d_year                    int64                       ,
    d_dow                     int64                       ,
    d_moy                     int64                       ,
    d_dom                     int64                       ,
    d_qoy                     int64                       ,
    d_fy_year                 int64                       ,
    d_fy_quarter_seq          int64                       ,
    d_fy_week_seq             int64                       ,
    d_day_name                string                       ,
    d_quarter_name            string                       ,
    d_holiday                 string                       ,
    d_weekend                 string                       ,
    d_following_holiday       string                       ,
    d_first_dom               int64                       ,
    d_last_dom                int64                       ,
    d_same_day_ly             int64                       ,
    d_same_day_lq             int64                       ,
    d_current_day             string                       ,
    d_current_week            string                       ,
    d_current_month           string                       ,
    d_current_quarter         string                       ,
    d_current_year            string                       ,
	empty_date                date                         )
	PARTITION BY empty_date                                   
	CLUSTER BY d_date_sk;                        

create table tpcds_2t_flat_part_clust.warehouse(
    w_warehouse_sk            int64               NOT NULL,
    w_warehouse_id            string              NOT NULL,
    w_warehouse_name          string                   ,
    w_warehouse_sq_ft         int64                       ,
    w_street_number           string                      ,
    w_street_name             string                   ,
    w_street_type             string                      ,
    w_suite_number            string                      ,
    w_city                    string                   ,
    w_county                  string                   ,
    w_state                   string                       ,
    w_zip                     string                      ,
    w_country                 string                     ,
    w_gmt_offset              numeric                    ,
    empty_date                date                        )
	PARTITION BY empty_date
	CLUSTER BY w_warehouse_sk; 


create table tpcds_2t_flat_part_clust.ship_mode(
    sm_ship_mode_sk           int64               NOT NULL,
    sm_ship_mode_id           string              NOT NULL,
    sm_type                   string                      ,
    sm_code                   string                      ,
    sm_carrier                string                      ,
    sm_contract               string                      ,
    empty_date                date                        )
	PARTITION BY empty_date
	CLUSTER BY sm_carrier; 


create table tpcds_2t_flat_part_clust.time_dim(
    t_time_sk                 int64               NOT NULL,
    t_time_id                 string              NOT NULL,
    t_time                    int64                       ,
    t_hour                    int64                       ,
    t_minute                  int64                       ,
    t_second                  int64                       ,
    t_am_pm                   string                       ,
    t_shift                   string                      ,
    t_sub_shift               string                      ,
    t_meal_time               string                      ,
	empty_date                date                        )
	PARTITION BY empty_date
	CLUSTER BY t_time;


create table tpcds_2t_flat_part_clust.reason(
    r_reason_sk               int64               NOT NULL,
    r_reason_id               string              NOT NULL,
    r_reason_desc             string                     ,
	empty_date               date                       )
	PARTITION BY empty_date
	CLUSTER BY r_reason_sk;


create table tpcds_2t_flat_part_clust.income_band(
    ib_income_band_sk         int64               NOT NULL,
    ib_lower_bound            int64                       ,
    ib_upper_bound            int64                       ,
	empty_date                date                        )
	PARTITION BY empty_date
	CLUSTER BY ib_lower_bound;


create table tpcds_2t_flat_part_clust.item(
    i_item_sk                 int64               NOT NULL,
    i_item_id                 string              NOT NULL,
    i_rec_start_date          date                          ,
    i_rec_end_date            date                          ,
    i_item_desc               string                  ,
    i_current_price           numeric                  ,
    i_wholesale_cost          numeric                  ,
    i_brand_id                int64                       ,
    i_brand                   string                      ,
    i_class_id                int64                       ,
    i_class                   string                      ,
    i_category_id             int64                       ,
    i_category                string                      ,
    i_manufact_id             int64                       ,
    i_manufact                string                      ,
    i_size                    string                      ,
    i_formulation             string                      ,
    i_color                   string                      ,
    i_units                   string                      ,
    i_container               string                      ,
    i_manager_id              int64                       ,
    i_product_name            string                      )
	PARTITION BY i_rec_start_date
	CLUSTER BY i_category;


create table tpcds_2t_flat_part_clust.store(
    s_store_sk                int64               NOT NULL,
    s_store_id                string              NOT NULL,
    s_rec_start_date          date                          ,
    s_rec_end_date            date                          ,
    s_closed_date_sk          int64                       ,
    s_store_name              string                   ,
    s_number_employees        int64                       ,
    s_floor_space             int64                       ,
    s_hours                   string                      ,
    s_manager                 string                   ,
    s_market_id               int64                       ,
    s_geography_class         string                  ,
    s_market_desc             string                  ,
    s_market_manager          string                   ,
    s_division_id             int64                       ,
    s_division_name           string                   ,
    s_company_id              int64                       ,
    s_company_name            string                   ,
    s_street_number           string                   ,
    s_street_name             string                   ,
    s_street_type             string                      ,
    s_suite_number            string                      ,
    s_city                    string                   ,
    s_county                  string                   ,
    s_state                   string                       ,
    s_zip                     string                      ,
    s_country                 string                   ,
    s_gmt_offset              numeric                  ,
    s_tax_precentage          numeric                  )
	PARTITION BY s_rec_start_date
	CLUSTER BY s_zip;


create table tpcds_2t_flat_part_clust.call_center(
    cc_call_center_sk         int64               NOT NULL,
    cc_call_center_id         string              NOT NULL,
    cc_rec_start_date         date                          ,
    cc_rec_end_date           date                          ,
    cc_closed_date_sk         int64                       ,
    cc_open_date_sk           int64                       ,
    cc_name                   string                   ,
    cc_class                  string                   ,
    cc_employees              int64                       ,
    cc_sq_ft                  int64                       ,
    cc_hours                  string                      ,
    cc_manager                string                   ,
    cc_mkt_id                 int64                       ,
    cc_mkt_class              string                     ,
    cc_mkt_desc               string                  ,
    cc_market_manager         string                   ,
    cc_division               int64                       ,
    cc_division_name          string                   ,
    cc_company                int64                       ,
    cc_company_name           string                      ,
    cc_street_number          string                      ,
    cc_street_name            string                   ,
    cc_street_type            string                      ,
    cc_suite_number           string                      ,
    cc_city                   string                   ,
    cc_county                 string                   ,
    cc_state                  string                       ,
    cc_zip                    string                      ,
    cc_country                string                   ,
    cc_gmt_offset             numeric                  ,
    cc_tax_percentage         numeric                  )
	PARTITION BY cc_rec_start_date
	CLUSTER BY cc_county;


create table tpcds_2t_flat_part_clust.customer(
    c_customer_sk             int64               NOT NULL,
    c_customer_id             string              NOT NULL,
    c_current_cdemo_sk        int64                       ,
    c_current_hdemo_sk        int64                       ,
    c_current_addr_sk         int64                       ,
    c_first_shipto_date_sk    int64                       ,
    c_first_sales_date_sk     int64                       ,
    c_salutation              string                      ,
    c_first_name              string                      ,
    c_last_name               string                      ,
    c_preferred_cust_flag     string                       ,
    c_birth_day               int64                       ,
    c_birth_month             int64                       ,
    c_birth_year              int64                       ,
    c_birth_country           string                      ,
    c_login                   string                      ,
    c_email_address           string                      ,
    c_last_review_date_sk     int64                       ,
	empty_date                date                        )
	PARTITION BY empty_date
	CLUSTER BY c_customer_sk;


create table tpcds_2t_flat_part_clust.web_site(
    web_site_sk               int64               NOT NULL,
    web_site_id               string              NOT NULL,
    web_rec_start_date        date                          ,
    web_rec_end_date          date                          ,
    web_name                  string                   ,
    web_open_date_sk          int64                       ,
    web_close_date_sk         int64                       ,
    web_class                 string                   ,
    web_manager               string                   ,
    web_mkt_id                int64                       ,
    web_mkt_class             string                   ,
    web_mkt_desc              string                  ,
    web_market_manager        string                   ,
    web_company_id            int64                       ,
    web_company_name          string                      ,
    web_street_number         string                      ,
    web_street_name           string                   ,
    web_street_type           string                      ,
    web_suite_number          string                      ,
    web_city                  string                   ,
    web_county                string                   ,
    web_state                 string                       ,
    web_zip                   string                      ,
    web_country               string                   ,
    web_gmt_offset            numeric                  ,
    web_tax_percentage        numeric                  )
	PARTITION BY web_rec_start_date
	CLUSTER BY web_site_sk;


create table tpcds_2t_flat_part_clust.store_returns(
    sr_returned_date_sk       int64                       ,
    sr_return_time_sk         int64                       ,
    sr_item_sk                int64               NOT NULL,
    sr_customer_sk            int64                       ,
    sr_cdemo_sk               int64                       ,
    sr_hdemo_sk               int64                       ,
    sr_addr_sk                int64                       ,
    sr_store_sk               int64                       ,
    sr_reason_sk              int64                       ,
    sr_ticket_number          int64               NOT NULL,
    sr_return_quantity        int64                       ,
    sr_return_amt             numeric                 ,
    sr_return_tax             numeric                  ,
    sr_return_amt_inc_tax     numeric                  ,
    sr_fee                    numeric                  ,
    sr_return_ship_cost       numeric                  ,
    sr_refunded_cash          numeric                  ,
    sr_reversed_charge        numeric                  ,
    sr_store_credit           numeric                 ,
    sr_net_loss               numeric                 ,
    empty_date                date                    )
	PARTITION BY empty_date
	CLUSTER BY sr_ticket_number;
	
	
create table tpcds_2t_flat_part_clust.household_demographics(
    hd_demo_sk                int64               NOT NULL,
    hd_income_band_sk         int64                       ,
    hd_buy_potential          string                      ,
    hd_dep_count              int64                       ,
    hd_vehicle_count          int64                       ,
    empty_date                date                        )
	PARTITION BY empty_date
	CLUSTER BY hd_buy_potential;


create table tpcds_2t_flat_part_clust.web_page(
    wp_web_page_sk            int64               NOT NULL,
    wp_web_page_id            string              NOT NULL,
    wp_rec_start_date         date                          ,
    wp_rec_end_date           date                          ,
    wp_creation_date_sk       int64                       ,
    wp_access_date_sk         int64                       ,
    wp_autogen_flag           string                       ,
    wp_customer_sk            int64                       ,
    wp_url                    string                      ,
    wp_type                   string                      ,
    wp_char_count             int64                       ,
    wp_link_count             int64                       ,
    wp_image_count            int64                       ,
    wp_max_ad_count           int64                       )
	PARTITION BY wp_rec_start_date
	CLUSTER BY wp_web_page_sk;


create table tpcds_2t_flat_part_clust.promotion(
    p_promo_sk                int64               NOT NULL,
    p_promo_id                string              NOT NULL,
    p_start_date_sk           int64                       ,
    p_end_date_sk             int64                       ,
    p_item_sk                 int64                       ,
    p_cost                    numeric                ,
    p_response_target         int64                       ,
    p_promo_name              string                      ,
    p_channel_dmail           string                       ,
    p_channel_email           string                       ,
    p_channel_catalog         string                       ,
    p_channel_tv              string                       ,
    p_channel_radio           string                       ,
    p_channel_press           string                       ,
    p_channel_event           string                       ,
    p_channel_demo            string                       ,
    p_channel_details         string                      ,
    p_purpose                 string                      ,
    p_discount_active         string                      ,
	empty_date                date                        )
	PARTITION BY empty_date
	CLUSTER BY p_promo_sk;

create table tpcds_2t_flat_part_clust.catalog_page(
    cp_catalog_page_sk        int64               NOT NULL,
    cp_catalog_page_id        string              NOT NULL,
    cp_start_date_sk          int64                       ,
    cp_end_date_sk            int64                       ,
    cp_department             string                      ,
    cp_catalog_number         int64                       ,
    cp_catalog_page_number    int64                       ,
    cp_description            string                      ,
    cp_type                   string                      ,
	empty_date                date                        )
	PARTITION BY empty_date
	CLUSTER BY cp_catalog_page_sk;


create table tpcds_2t_flat_part_clust.inventory(
    inv_date_sk               int64               NOT NULL,
    inv_item_sk               int64               NOT NULL,
    inv_warehouse_sk          int64               NOT NULL,
    inv_quantity_on_hand      int64                       ,
	empty_date                date                        )
	PARTITION BY empty_date
	CLUSTER BY inv_item_sk;


create table tpcds_2t_flat_part_clust.catalog_returns(
    cr_returned_date_sk       int64                       ,
    cr_returned_time_sk       int64                       ,
    cr_item_sk                int64               NOT NULL,
    cr_refunded_customer_sk   int64                       ,
    cr_refunded_cdemo_sk      int64                       ,
    cr_refunded_hdemo_sk      int64                       ,
    cr_refunded_addr_sk       int64                       ,
    cr_returning_customer_sk  int64                       ,
    cr_returning_cdemo_sk     int64                       ,
    cr_returning_hdemo_sk     int64                       ,
    cr_returning_addr_sk      int64                       ,
    cr_call_center_sk         int64                       ,
    cr_catalog_page_sk        int64                       ,
    cr_ship_mode_sk           int64                       ,
    cr_warehouse_sk           int64                       ,
    cr_reason_sk              int64                       ,
    cr_order_number           int64               NOT NULL,
    cr_return_quantity        int64                       ,
    cr_return_amount          numeric                  ,
    cr_return_tax             numeric                  ,
    cr_return_amt_inc_tax     numeric                  ,
    cr_fee                    numeric                  ,
    cr_return_ship_cost       numeric                  ,
    cr_refunded_cash          numeric                  ,
    cr_reversed_charge        numeric                  ,
    cr_store_credit           numeric                  ,
    cr_net_loss               numeric                  ,
	empty_date                date                      )
	PARTITION BY empty_date
	CLUSTER BY cr_item_sk;


create table tpcds_2t_flat_part_clust.web_returns(
    wr_returned_date_sk       int64                       ,
    wr_returned_time_sk       int64                       ,
    wr_item_sk                int64               NOT NULL,
    wr_refunded_customer_sk   int64                       ,
    wr_refunded_cdemo_sk      int64                       ,
    wr_refunded_hdemo_sk      int64                       ,
    wr_refunded_addr_sk       int64                       ,
    wr_returning_customer_sk  int64                       ,
    wr_returning_cdemo_sk     int64                       ,
    wr_returning_hdemo_sk     int64                       ,
    wr_returning_addr_sk      int64                       ,
    wr_web_page_sk            int64                       ,
    wr_reason_sk              int64                       ,
    wr_order_number           int64               NOT NULL,
    wr_return_quantity        int64                       ,
    wr_return_amt             numeric                  ,
    wr_return_tax             numeric                 ,
    wr_return_amt_inc_tax     numeric                  ,
    wr_fee                    numeric                  ,
    wr_return_ship_cost       numeric                  ,
    wr_refunded_cash          numeric                  ,
    wr_reversed_charge        numeric                  ,
    wr_account_credit         numeric                 ,
    wr_net_loss               numeric                  ,
	empty_date                date                      )
	PARTITION BY empty_date
	CLUSTER BY wr_web_page_sk;


create table tpcds_2t_flat_part_clust.web_sales(
    ws_sold_date_sk           int64                       ,
    ws_sold_time_sk           int64                       ,
    ws_ship_date_sk           int64                       ,
    ws_item_sk                int64               NOT NULL,
    ws_bill_customer_sk       int64                       ,
    ws_bill_cdemo_sk          int64                       ,
    ws_bill_hdemo_sk          int64                       ,
    ws_bill_addr_sk           int64                       ,
    ws_ship_customer_sk       int64                       ,
    ws_ship_cdemo_sk          int64                       ,
    ws_ship_hdemo_sk          int64                       ,
    ws_ship_addr_sk           int64                       ,
    ws_web_page_sk            int64                       ,
    ws_web_site_sk            int64                       ,
    ws_ship_mode_sk           int64                       ,
    ws_warehouse_sk           int64                       ,
    ws_promo_sk               int64                       ,
    ws_order_number           int64               NOT NULL,
    ws_quantity               int64                       ,
    ws_wholesale_cost         numeric                  ,
    ws_list_price             numeric                  ,
    ws_sales_price            numeric                  ,
    ws_ext_discount_amt       numeric                  ,
    ws_ext_sales_price        numeric                  ,
    ws_ext_wholesale_cost     numeric                  ,
    ws_ext_list_price         numeric                  ,
    ws_ext_tax                numeric                  ,
    ws_coupon_amt             numeric                  ,
    ws_ext_ship_cost          numeric                  ,
    ws_net_paid               numeric                  ,
    ws_net_paid_inc_tax       numeric                  ,
    ws_net_paid_inc_ship      numeric                  ,
    ws_net_paid_inc_ship_tax  numeric                  ,
    ws_net_profit             numeric                  ,
	empty_date                date                     )
	PARTITION BY empty_date
	CLUSTER BY ws_item_sk;


create table tpcds_2t_flat_part_clust.catalog_sales(
    cs_sold_date_sk           int64                       ,
    cs_sold_time_sk           int64                       ,
    cs_ship_date_sk           int64                       ,
    cs_bill_customer_sk       int64                       ,
    cs_bill_cdemo_sk          int64                       ,
    cs_bill_hdemo_sk          int64                       ,
    cs_bill_addr_sk           int64                       ,
    cs_ship_customer_sk       int64                       ,
    cs_ship_cdemo_sk          int64                       ,
    cs_ship_hdemo_sk          int64                       ,
    cs_ship_addr_sk           int64                       ,
    cs_call_center_sk         int64                       ,
    cs_catalog_page_sk        int64                       ,
    cs_ship_mode_sk           int64                       ,
    cs_warehouse_sk           int64                       ,
    cs_item_sk                int64               NOT NULL,
    cs_promo_sk               int64                       ,
    cs_order_number           int64               NOT NULL,
    cs_quantity               int64                       ,
    cs_wholesale_cost         numeric                  ,
    cs_list_price             numeric                  ,
    cs_sales_price            numeric                  ,
    cs_ext_discount_amt       numeric                 ,
    cs_ext_sales_price        numeric                  ,
    cs_ext_wholesale_cost     numeric                 ,
    cs_ext_list_price         numeric                  ,
    cs_ext_tax                numeric                  ,
    cs_coupon_amt             numeric                  ,
    cs_ext_ship_cost          numeric                  ,
    cs_net_paid               numeric                  ,
    cs_net_paid_inc_tax       numeric                 ,
    cs_net_paid_inc_ship      numeric                 ,
    cs_net_paid_inc_ship_tax  numeric                  ,
    cs_net_profit             numeric                  ,
	empty_date                date                     )
	PARTITION BY empty_date
	CLUSTER BY cs_item_sk;


create table tpcds_2t_flat_part_clust.store_sales(
    ss_sold_date_sk           int64                       ,
    ss_sold_time_sk           int64                       ,
    ss_item_sk                int64               NOT NULL,
    ss_customer_sk            int64                       ,
    ss_cdemo_sk               int64                       ,
    ss_hdemo_sk               int64                       ,
    ss_addr_sk                int64                       ,
    ss_store_sk               int64                       ,
    ss_promo_sk               int64                       ,
    ss_ticket_number          int64               NOT NULL,
    ss_quantity               int64                       ,
    ss_wholesale_cost         numeric                  ,
    ss_list_price             numeric                  ,
    ss_sales_price            numeric                  ,
    ss_ext_discount_amt       numeric                  ,
    ss_ext_sales_price        numeric                  ,
    ss_ext_wholesale_cost     numeric                  ,
    ss_ext_list_price         numeric                  ,
    ss_ext_tax                numeric                  ,
    ss_coupon_amt             numeric                  ,
    ss_net_paid               numeric                  ,
    ss_net_paid_inc_tax       numeric                  ,
    ss_net_profit             numeric                  ,
	empty_date                date                     )
	PARTITION BY empty_date
	CLUSTER BY ss_item_sk

Ingesting data into partitioned tables

We could simply INSERT INTO the billions of records from our baseline dataset into our new partitioned dataset but that would take quite a while (a few hours). Instead, we'll simply use the new BigQuery Dataset Copy API (beta) to populate the tables from an already existing solution in our dw-workshop project.

Use the BigQuery Data Transfer Service to copy an existing dataset

  1. Enable the BigQuery Data Transfer Service API
  2. Navigate to the BigQuery console and the existing dw-workshop dataset
  3. Click Copy Dataset

  1. In the pop-up, choose your project name and the newly created dataset name from the previous step

BE SURE TO CHOOSE THE NEW DATASET NAME IN THE DROP DOWN tpcds_2t_flat_part_clust OR YOU WILL OVERWRITE YOUR BASELINE TABLES

  1. Click Copy

  2. Wait for the transfer to complete

Verify you now have the baseline data in your project

Run the below query and confirm you see data. Note that if you omit the project-id ahead of the dataset name in the FROM clause, BigQuery will assume your default project.


In [21]:
%%bigquery
SELECT COUNT(*) AS store_transaction_count
FROM tpcds_2t_flat_part_clust.store_sales


Out[21]:
store_transaction_count
0 5762820700

Running a few benchmark queries with a shell script


In [ ]:
%%bash
# runs the SQL queries from the TPCDS benchmark 

# Pull the current Google Cloud Platform project name
export PROJECT=$(gcloud config list project --format "value(core.project)")

BQ_DATASET="tpcds_2t_flat_part_clust" # let's benchmark our new dataset
QUERY_FILE_PATH="/home/jupyter/$PROJECT/02_add_partition_and_clustering/solution/sql/full_performance_benchmark.sql" #sample_benchmark.sql
IFS=";"

# create perf table to keep track of run times for all 99 queries
printf "\033[32;1m Housekeeping tasks... \033[0m\n\n";
printf "Creating a reporting table perf to track how fast each query runs...";
perf_table_ddl="CREATE TABLE IF NOT EXISTS $BQ_DATASET.perf(performance_test_num int64, query_num int64, elapsed_time_sec int64, ran_on int64)"
bq rm -f $BQ_DATASET.perf
bq query --nouse_legacy_sql $perf_table_ddl 

start=$(date +%s)
index=0
for select_stmt in $(<$QUERY_FILE_PATH) 
do 
  # run the test until you hit a line with the string 'END OF BENCHMARK' in the file
  if [[ "$select_stmt" == *'END OF BENCHMARK'* ]]; then
    break
  fi

  printf "\n\033[32;1m Let's benchmark this query... \033[0m\n";
  printf "$select_stmt";
  
  SECONDS=0;
  bq query --use_cache=false --nouse_legacy_sql $select_stmt # critical to turn cache off for this test
  duration=$SECONDS

  # get current timestamp in milliseconds  
  ran_on=$(date +%s)

  index=$((index+1))

  printf "\n\033[32;1m Here's how long it took... \033[0m\n\n";
  echo "Query $index ran in $(($duration / 60)) minutes and $(($duration % 60)) seconds."

  printf "\n\033[32;1m Writing to our benchmark table... \033[0m\n\n";
  insert_stmt="insert into $BQ_DATASET.perf(performance_test_num, query_num, elapsed_time_sec, ran_on) values($start, $index, $duration, $ran_on)"
  printf "$insert_stmt"
  bq query --nouse_legacy_sql $insert_stmt
done

end=$(date +%s)

printf "Benchmark test complete"

Benchmarking all 99 queries


In [10]:
%%bigquery
SELECT * FROM `dw-workshop.tpcds_2t_flat_part_clust.perf` # public table
WHERE 
 # Let's only pull the results from our most recent test
 performance_test_num = (SELECT MAX(performance_test_num) FROM `dw-workshop.tpcds_2t_flat_part_clust.perf`)
ORDER BY ran_on


Out[10]:
performance_test_num query_num elapsed_time_sec ran_on
0 1571036796 1 4 1571036800
1 1571036796 2 7 1571036810
2 1571036796 3 67 1571036881
3 1571036796 4 6 1571036891
4 1571036796 5 6 1571036900
... ... ... ... ...
95 1571036796 96 8 1571040738
96 1571036796 97 25 1571040767
97 1571036796 98 25 1571040795
98 1571036796 99 25 1571040823
99 1571036796 100 1 1571040828

100 rows × 4 columns


In [11]:
%%bigquery
SELECT
  TIMESTAMP_SECONDS(MAX(performance_test_num)) AS test_date,
  MAX(performance_test_num) AS latest_performance_test_num,
  COUNT(DISTINCT query_num) AS count_queries_benchmarked,
  SUM(elapsed_time_sec) AS total_time_sec,
  MIN(elapsed_time_sec) AS fastest_query_time_sec,
  MAX(elapsed_time_sec) AS slowest_query_time_sec
FROM
  `dw-workshop.tpcds_2t_flat_part_clust.perf`
WHERE
  performance_test_num = (SELECT MAX(performance_test_num) FROM `dw-workshop.tpcds_2t_flat_part_clust.perf`)


Out[11]:
test_date latest_performance_test_num count_queries_benchmarked total_time_sec fastest_query_time_sec slowest_query_time_sec
0 2019-10-14 07:06:36+00:00 1571036796 100 3680 1 166

Results

The total time for the benchmark queries on our newly partitioned dataset is 3680 seconds or 61 minutes. (That's 23% faster than 79 minutes for the baseline)

Compare vs baseline

Using SQL we can compare our benchmark tests pretty easily.


In [ ]:
%%bigquery
# TODO write where clause filters to pull latest performance from each table (and debug why they keep getting truncated)
WITH 
add_part AS (
SELECT * FROM `dw-workshop.tpcds_2t_flat_part_clust.perf`)

, base AS (
SELECT * FROM `dw-workshop.tpcds_2t_baseline.perf` )

SELECT 
  base.query_num,
  base.elapsed_time_sec AS elapsed_time_sec_base,
  add_part.elapsed_time_sec AS elapsed_time_sec_add_part,
  add_part.elapsed_time_sec - base.elapsed_time_sec AS delta
FROM base JOIN add_part USING(query_num)
ORDER BY delta

Final Activity: Create a Data Studio report or ipynb visualization showing the differences between performance. Which queries saw the most improvement?