In this advanced lab you will learn how to create and run an Apache Airflow workflow in Cloud Composer that completes the following tasks:
First, create a Cloud Composer environment if you don't have one already by doing the following:
The environment creation process is completed when the green checkmark displays to the left of the environment name on the Environments page in the GCP Console. It can take up to 20 minutes for the environment to complete the setup process. Move on to the next section - Create Cloud Storage buckets and BigQuery dataset.
In [12]:
import os
PROJECT = 'your-project-id' # REPLACE WITH YOUR PROJECT ID
REGION = 'us-central1' # REPLACE WITH YOUR REGION e.g. us-central1
# do not change these
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION
In [ ]:
%%bash
## create GCS buckets
exists=$(gsutil ls -d | grep -w gs://${PROJECT}_input/)
if [ -n "$exists" ]; then
echo "Skipping the creation of input bucket."
else
echo "Creating input bucket."
gsutil mb -l ${REGION} gs://${PROJECT}_input
echo "Loading sample data for later"
gsutil cp resources/usa_names.csv gs://${PROJECT}_input
fi
exists=$(gsutil ls -d | grep -w gs://${PROJECT}_output/)
if [ -n "$exists" ]; then
echo "Skipping the creation of output bucket."
else
echo "Creating output bucket."
gsutil mb -l ${REGION} gs://${PROJECT}_output
fi
Next, we'll create a data sink to store the ingested data from GCS
For schema click Edit as Text and paste in the below schema
state: STRING,
gender: STRING,
year: STRING,
name: STRING,
number: STRING,
created_date: STRING,
filename: STRING,
load_dt: DATE
Click Create Table
While your Cloud Composer environment is building, let’s discuss the sample file you’ll be using in this lab.
Airflow is a platform to programmatically author, schedule and monitor workflows
Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies.
Cloud Composer workflows are comprised of DAGs (Directed Acyclic Graphs). The code shown in simple_load_dag.py is the workflow code, also referred to as the DAG.
Open the file now to see how it is built. Next will be a detailed look at some of the key components of the file.
To orchestrate all the workflow tasks, the DAG imports the following operators:
In [14]:
%%writefile simple_load_dag.py
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A simple Airflow DAG that is triggered externally by a Cloud Function when a
file lands in a GCS bucket.
Once triggered the DAG performs the following steps:
1. Triggers a Google Cloud Dataflow job with the input file information received
from the Cloud Function trigger.
2. Upon completion of the Dataflow job, the input file is moved to a
gs://<target-bucket>/<success|failure>/YYYY-MM-DD/ location based on the
status of the previous step.
"""
import datetime
import logging
import os
from airflow import configuration
from airflow import models
from airflow.contrib.hooks import gcs_hook
from airflow.contrib.operators import dataflow_operator
from airflow.operators import python_operator
from airflow.utils.trigger_rule import TriggerRule
# We set the start_date of the DAG to the previous date. This will
# make the DAG immediately available for scheduling.
YESTERDAY = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
# We define some variables that we will use in the DAG tasks.
SUCCESS_TAG = 'success'
FAILURE_TAG = 'failure'
# An Airflow variable called gcp_completion_bucket is required.
# This variable will contain the name of the bucket to move the processed
# file to.
# '_names' must appear in CSV filename to be ingested (adjust as needed)
# we are only looking for files with the exact name usa_names.csv (you can specify wildcards if you like)
INPUT_BUCKET_CSV = 'gs://'+models.Variable.get('gcp_input_location')+'/usa_names.csv'
# TODO: Populate the models.Variable.get() with the actual variable name for your output bucket
COMPLETION_BUCKET = 'gs://'+models.Variable.get('gcp_completion_bu____')
DS_TAG = '{{ ds }}'
DATAFLOW_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'dataflow', 'process_delimited.py')
# The following additional Airflow variables should be set:
# gcp_project: Google Cloud Platform project id.
# gcp_temp_location: Google Cloud Storage location to use for Dataflow temp location.
DEFAULT_DAG_ARGS = {
'start_date': YESTERDAY,
'retries': 2,
# TODO: Populate the models.Variable.get() with the variable name for your GCP Project
'project_id': models.Variable.get('gcp_pro____'),
'dataflow_default_options': {
'project': models.Variable.get('gcp_pro____'),
# TODO: Populate the models.Variable.get() with the variable name for temp location
'temp_location': 'gs://'+models.Variable.get('gcp_temp_l_______'),
'runner': 'DataflowRunner'
}
}
def move_to_completion_bucket(target_bucket, target_infix, **kwargs):
"""A utility method to move an object to a target location in GCS."""
# Here we establish a connection hook to GoogleCloudStorage.
# Google Cloud Composer automatically provides a google_cloud_storage_default
# connection id that is used by this hook.
conn = gcs_hook.GoogleCloudStorageHook()
# The external trigger (Google Cloud Function) that initiates this DAG
# provides a dag_run.conf dictionary with event attributes that specify
# the information about the GCS object that triggered this DAG.
# We extract the bucket and object name from this dictionary.
source_bucket = models.Variable.get('gcp_input_location')
source_object = models.Variable.get('gcp_input_location')+'/usa_names.csv'
completion_ds = kwargs['ds']
target_object = os.path.join(target_infix, completion_ds, source_object)
logging.info('Copying %s to %s',
os.path.join(source_bucket, source_object),
os.path.join(target_bucket, target_object))
conn.copy(source_bucket, source_object, target_bucket, target_object)
logging.info('Deleting %s',
os.path.join(source_bucket, source_object))
conn.delete(source_bucket, source_object)
# Setting schedule_interval to None as this DAG is externally trigger by a Cloud Function.
# The following Airflow variables should be set for this DAG to function:
# bq_output_table: BigQuery table that should be used as the target for
# Dataflow in <dataset>.<tablename> format.
# e.g. lake.usa_names
# input_field_names: Comma separated field names for the delimited input file.
# e.g. state,gender,year,name,number,created_date
# TODO: Name the DAG id GcsToBigQueryTriggered
with models.DAG(dag_id='GcsToBigQueryTr_______',
description='A DAG triggered by an external Cloud Function',
schedule_interval=None, default_args=DEFAULT_DAG_ARGS) as dag:
# Args required for the Dataflow job.
job_args = {
'input': INPUT_BUCKET_CSV,
# TODO: Populate the models.Variable.get() with the variable name for BQ table
'output': models.Variable.get('bq_output_t____'),
# TODO: Populate the models.Variable.get() with the variable name for input field names
'fields': models.Variable.get('input_field_n____'),
'load_dt': DS_TAG
}
# Main Dataflow task that will process and load the input delimited file.
# TODO: Specify the type of operator we need to call to invoke DataFlow
dataflow_task = dataflow_operator.DataFlowPythonOp_______(
task_id="process-delimited-and-push",
py_file=DATAFLOW_FILE,
options=job_args)
# Here we create two conditional tasks, one of which will be executed
# based on whether the dataflow_task was a success or a failure.
success_move_task = python_operator.PythonOperator(task_id='success-move-to-completion',
python_callable=move_to_completion_bucket,
# A success_tag is used to move
# the input file to a success
# prefixed folder.
op_args=[models.Variable.get('gcp_completion_bucket'), SUCCESS_TAG],
provide_context=True,
trigger_rule=TriggerRule.ALL_SUCCESS)
failure_move_task = python_operator.PythonOperator(task_id='failure-move-to-completion',
python_callable=move_to_completion_bucket,
# A failure_tag is used to move
# the input file to a failure
# prefixed folder.
op_args=[models.Variable.get('gcp_completion_bucket'), FAILURE_TAG],
provide_context=True,
trigger_rule=TriggerRule.ALL_FAILED)
# The success_move_task and failure_move_task are both downstream from the
# dataflow_task.
dataflow_task >> success_move_task
dataflow_task >> failure_move_task
Now that you have a completed DAG, it's time to copy it to your Cloud Composer environment and finish the setup of your workflow.
Our DAG relies on variables to pass in values like the GCP Project. We can set these in the Admin UI.
Airflow variables are an Airflow-specific concept that is distinct from environment variables. In this step, you'll set the following six Airflow variables used by the DAG we will deploy.
In [ ]:
## Run this to display which key value pairs to input
import pandas as pd
pd.DataFrame([
('gcp_project', PROJECT),
('gcp_input_location', PROJECT + '_input'),
('gcp_temp_location', PROJECT + '_output/tmp'),
('gcp_completion_bucket', PROJECT + '_output'),
('input_field_names', 'state,gender,year,name,number,created_date'),
('bq_output_table', 'ml_pipeline.ingest_table')
], columns = ['Key', 'Value'])
The next gcloud composer command executes the Airflow CLI sub-command variables. The sub-command passes the arguments to the gcloud command line tool.
To set the three variables, run the gcloud composer command once for each row from the above table. Just as an example, to set the variable gcp_project
you could do this:
In [ ]:
%%bash
gcloud composer environments run ENVIRONMENT_NAME \
--location ${REGION} variables -- \
--set gcp_project ${PROJECT}
In [ ]:
AIRFLOW_BUCKET = 'us-central1-composer-21587538-bucket' # REPLACE WITH AIRFLOW BUCKET NAME
os.environ['AIRFLOW_BUCKET'] = AIRFLOW_BUCKET
In [ ]:
%%bash
gsutil cp simple_load_dag.py gs://${AIRFLOW_BUCKET}/dags # overwrite DAG file if it exists
gsutil cp -r dataflow/process_delimited.py gs://${AIRFLOW_BUCKET}/dags/dataflow/ # copy Dataflow job to be ran
We will be following this reference guide to setup our Cloud Function
In [ ]:
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse
# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
credentials)
project_id = 'your-project-id'
location = 'us-central1'
composer_environment = 'composer'
environment_url = (
'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
'/environments/{}').format(project_id, location, composer_environment)
composer_response = authed_session.request('GET', environment_url)
environment_data = composer_response.json()
airflow_uri = environment_data['config']['airflowUri']
# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers['location']
# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string['client_id'][0])
Important: be sure to select the input bucket and not the output bucket to avoid an endless triggering loop)
Complete the four required constants defined below in index.js code and paste it into the Cloud Function editor (the js code will not run in this notebook). The constants are:
In [ ]:
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'GcsToBigQueryTriggered';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
In [ ]:
{
"name": "nodejs-docs-samples-functions-composer-storage-trigger",
"version": "0.0.1",
"dependencies": {
"form-data": "^2.3.2",
"node-fetch": "^2.2.0"
},
"engines": {
"node": ">=8.0.0"
},
"private": true,
"license": "Apache-2.0",
"author": "Google Inc.",
"repository": {
"type": "git",
"url": "https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git"
},
"devDependencies": {
"@google-cloud/nodejs-repo-tools": "^3.3.0",
"mocha": "^6.0.0",
"proxyquire": "^2.1.0",
"sinon": "^7.2.7"
},
"scripts": {
"test": "mocha test/*.test.js --timeout=20000"
}
}