Google Cloud Datalab, with the pipeline subcommand, enables productionizing (i.e. scheduling and orchestrating) notebooks that accomplish ETL with BigQuery and GCS. It uses Apache Airflow (https://airflow.apache.org/start.html) as the underlying technology for orchestrating and scheduling.
Disclaimer: This is still in the experimental stage.
Set up a Google Cloud Composer environment using the instructions here: https://cloud.google.com/composer/docs/quickstart, and specify 'datalab' as a python dependency using the instructions here: https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies. The examples in the cells below assume that a Composer environment is available.
Alternately, you could also set up your own VM with Airflow as a long-running process. Run the "Airflow Setup" notebook (under samples/contrib/pipeline/); it will setup a GCE VM with the Airflow Scheduler and the dashboard webserver.
The pipeline subcommand in the cells below (and for the pipelines to be deployed successfully) needs either the Composer setup or the Airflow setup.
In [1]:
import datetime
import google.datalab.bigquery as bq
import google.datalab.contrib.bigquery.commands
import google.datalab.contrib.pipeline.airflow
import google.datalab.contrib.pipeline.composer
import google.datalab.kernel
import google.datalab.storage as storage
from google.datalab import Context
project = Context.default().project_id
# Composer variables (change this as per your preference)
environment = 'rajivpb-composer-next'
location = 'us-central1'
# Airflow setup variables
vm_name = 'datalab-airflow'
gcs_dag_bucket_name = project + '-' + vm_name
gcs_dag_file_path = 'dags'
# Setup GCS bucket and BQ datastes
bucket_name = project + '-bq_pipeline'
bucket = storage.Bucket(bucket_name)
bucket.create()
print(bucket.exists())
dataset_name = 'bq_pipeline'
dataset = bq.Dataset(dataset_name)
dataset.create()
print(dataset.exists())
# Start and end timestamps for our pipelines.
start = datetime.datetime.now()
formatted_start = start.strftime('%Y%m%dT%H%M%S')
end = start + datetime.timedelta(minutes=5)
In [2]:
%bq pipeline -h
The pipeline subcommand deploys and orchestrates an ETL pipeline. It supports specifying either an existing BQ table or a GCS path (with accompanying schema) as the data input, executing a transformation with BQ SQL and producing an output of the results (again, either a BQ table or a GCS path). This pipeline can be executed on a schedule. Additionally, parameters can be specified to templatize or customize the pipeline.
In [3]:
github_archive = 'githubarchive.month.201802'
In [4]:
%%bq query --name my_pull_request_events
SELECT id, created_at, repo.name FROM input
WHERE actor.login = 'rajivpb' AND type = 'PullRequestEvent'
In [5]:
# We designate the following 'output' for our pipeline.
results_table = project + '.' + dataset_name + '.' + 'pr_events_' + formatted_start
# Pipeline name is made unique by suffixing a timestamp
pipeline_name = 'github_once_' + formatted_start
In [6]:
%%bq pipeline --name $pipeline_name -e $environment -l $location
input:
table: $github_archive
transformation:
query: my_pull_request_events
output:
table: $results_table
mode: overwrite
schedule:
start: $start
end: $end
interval: '@once'
catchup: True
Out[6]:
When the above cell is run, a pipeline is deployed and the results of the query are written into the BQ results table (i.e. $results_table). It could take 5-10 min between when the cell is executed for the result_table to show up. Below, we'll see additional examples for alternate ways of specifying the source, the source-types supported, and for customizing the pipeline.
The parameters section provides the ability to customize the inputs and outputs of the pipeline. These parameters are merged with the SQL query parameters into a list, and are specified in the cell body (along the same lines as the %bq execute command, for example).
In addition to parameters that the users can define, the following mapping keys have been made available for formatting strings and are designed to capture common scenarios around parameterizing the pipeline with the execution timestamp.
In [7]:
# The source/input is formatted with the built-in mapping keys _ts_year and
# _ts_month and these are evaluated (or "bound") at the time of pipeline
# execution. This could be at some point in the future, or at some point in the
# "past" in cases where a backfill job is being executed.
github_archive_current_month = 'githubarchive.month.%(_ts_year)s%(_ts_month)s'
# The destination/output is formatted with additional user-defined parameters
# 'project', 'dataset', and 'user'. These are evaluated/bound at the time of
# execution of the %bq pipeline cell.
results_table = '%(project)s.%(dataset_name)s.%(user)s_pr_events_%(_ts_nodash)s'
pipeline_name = 'github_parameterized_' + formatted_start
In [8]:
%%bq query --name my_pull_request_events
SELECT id, created_at, repo.name FROM input
WHERE actor.login = @user AND type = 'PullRequestEvent'
In [9]:
%%bq pipeline --name $pipeline_name -e $environment -l $location
input:
table: $github_archive_current_month
transformation:
query: my_pull_request_events
output:
table: $results_table
mode: overwrite
parameters:
- name: user
type: STRING
value: 'rajivpb'
- name: project
type: STRING
value: $project
- name: dataset_name
type: STRING
value: $dataset_name
schedule:
start: $start
end: $end
interval: '@once'
catchup: True
Out[9]:
In [10]:
gcs_input_path = 'gs://cloud-datalab-samples/cars.csv'
gcs_output_path = 'gs://%(bucket_name)s/all_makes_%(_ts_nodash)s.csv'
pipeline_name = 'gcs_to_gcs_transform_' + formatted_start
In [11]:
%%bq query --name all_makes
SELECT Make FROM input
In [12]:
%%bq pipeline --name $pipeline_name -e $environment -l $location
input:
path: $gcs_input_path
schema:
- name: Year
type: INTEGER
- name: Make
type: STRING
- name: Model
type: STRING
- name: Description
type: STRING
- name: Price
type: FLOAT
csv:
skip: 1
transformation:
query: all_makes
output:
path: $gcs_output_path
parameters:
- name: bucket_name
type: STRING
value: $bucket_name
schedule:
start: $start
end: $end
interval: '@once'
catchup: True
Out[12]:
In [13]:
bq_load_results_table = '%(project)s.%(dataset_name)s.cars_load'
pipeline_name = 'load_gcs_to_bq_' + formatted_start
In [14]:
%%bq pipeline --name $pipeline_name -e $environment -l $location
load:
path: $gcs_input_path
schema:
- name: Year
type: INTEGER
- name: Make
type: STRING
- name: Model
type: STRING
- name: Description
type: STRING
- name: Price
type: FLOAT
csv:
skip: 1
table: $bq_load_results_table
mode: overwrite
parameters:
- name: project
type: STRING
value: $project
- name: dataset_name
type: STRING
value: $dataset_name
schedule:
start: $start
end: $end
interval: '@once'
catchup: True
Out[14]:
Similar to load, pipeline can also be used to perform the equivalent of the %bq extract command. To illustrate, we extract the data in the table that was the result of the 'load' pipeline, and write it to a GCS file.
Now, it's possible that if you "Ran All Cells" in this notebook, this pipeline gets deployed at the same time as the previous load-pipeline, in which case the source table isn't yet ready. Hence we set retries to 3, with a delay of 90 seconds and hope that the table eventually does get created and this pipeline is successful.
In [15]:
gcs_extract_path = 'gs://%(bucket_name)s/cars_extract_%(_ts_nodash)s.csv'
pipeline_name = 'extract_bq_to_gcs_' + formatted_start
In [16]:
%%bq pipeline --name $pipeline_name -e $environment -l $location
extract:
table: $bq_load_results_table
path: $gcs_extract_path
format: csv
csv:
delimiter: '#'
parameters:
- name: bucket_name
type: STRING
value: $bucket_name
- name: project
type: STRING
value: $project
- name: dataset_name
type: STRING
value: $dataset_name
schedule:
start: $start
interval: '@once'
catchup: True
retries: 3
retry_delay_seconds: 90
Out[16]:
In [17]:
# You will see two files named all_makes_<timestamp> and cars_extract_<timestamp>
# under the bucket:
!gsutil ls gs://$bucket_name
# You will see three tables named cars_load, pr_events_<timestamp> and
# <user>_pr_events_<timestamp> under the BigQuery dataset:
!bq ls $dataset_name
In [ ]:
# Delete the contents of the GCS bucket, the GCS bucket itself, and the BQ
# dataset. Uncomment the lines below and execute.
#!gsutil rm -r gs://$bucket_name
#!bq rm -r -f $dataset_name
In [ ]:
# If you chose the Airflow VM (in the Setup), this will delete the VM. Uncomment the
# line below and execute.
#!gcloud compute instances stop $vm_name --zone us-central1-b --quiet
In [ ]:
# This just verifies that cleanup actually worked. Run this after running the
# 'Cleanup' cell
#Should show two error messages like "BucketNotFoundException: 404 gs://..."
!gsutil ls gs://$bucket_name
!gsutil ls gs://$gcs_dag_bucket_name/dags
#Should show an error message like "BigQuery error in ls operation: Not found ..."
!bq ls $dataset_name