Triggering a Cloud Composer Pipeline with a Google Cloud Function

In this advanced lab you will learn how to create and run an Apache Airflow workflow in Cloud Composer that completes the following tasks:

  • Watches for new CSV data to be uploaded to a Cloud Storage bucket
  • A Cloud Function call triggers the Cloud Composer Airflow DAG to run when a new file is detected
  • The workflow finds the input file that triggered the workflow and executes a Cloud Dataflow job to transform and output the data to BigQuery
  • Moves the original input file to a different Cloud Storage bucket for storing processed files

Part One: Create Cloud Composer environment and workflow

First, create a Cloud Composer environment if you don't have one already by doing the following:

  1. In the Navigation menu under Big Data, select Composer
  2. Select Create
  3. Set the following parameters:
    • Name: mlcomposer
    • Location: us-central1
    • Other values at defaults
  4. Select Create

The environment creation process is completed when the green checkmark displays to the left of the environment name on the Environments page in the GCP Console. It can take up to 20 minutes for the environment to complete the setup process. Move on to the next section - Create Cloud Storage buckets and BigQuery dataset.

Set environment variables


In [12]:
import os
PROJECT = 'your-project-id' # REPLACE WITH YOUR PROJECT ID
REGION = 'us-central1' # REPLACE WITH YOUR REGION e.g. us-central1

# do not change these
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

Create Cloud Storage buckets

Create two Cloud Storage Multi-Regional buckets in your project.

  • project-id_input
  • project-id_output

Run the below to automatically create the buckets and load some sample data:


In [ ]:
%%bash
## create GCS buckets
exists=$(gsutil ls -d | grep -w gs://${PROJECT}_input/)
if [ -n "$exists" ]; then
   echo "Skipping the creation of input bucket."
else
   echo "Creating input bucket."
   gsutil mb -l ${REGION} gs://${PROJECT}_input
   echo "Loading sample data for later"
   gsutil cp resources/usa_names.csv gs://${PROJECT}_input
fi

exists=$(gsutil ls -d | grep -w gs://${PROJECT}_output/)
if [ -n "$exists" ]; then
   echo "Skipping the creation of output bucket."
else
   echo "Creating output bucket."
   gsutil mb -l ${REGION} gs://${PROJECT}_output
fi

Create BigQuery Destination Dataset and Table

Next, we'll create a data sink to store the ingested data from GCS

Create a new Dataset

  1. In the Navigation menu, select BigQuery
  2. Then click on your qwiklabs project ID
  3. Click Create Dataset
  4. Name your dataset ml_pipeline and leave other values at defaults
  5. Click Create Dataset

Create a new empty table

  1. Click on the newly created dataset
  2. Click Create Table
  3. For Destination Table name specify ingest_table
  4. For schema click Edit as Text and paste in the below schema

    state: STRING,
    gender: STRING,
    year: STRING,
    name: STRING,
    number: STRING,
    created_date: STRING,
    filename: STRING,
    load_dt: DATE

  5. Click Create Table

Review of Airflow concepts

While your Cloud Composer environment is building, let’s discuss the sample file you’ll be using in this lab.

Airflow is a platform to programmatically author, schedule and monitor workflows

Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies.

Core concepts

  • DAG - A Directed Acyclic Graph is a collection of tasks, organised to reflect their relationships and dependencies.
  • Operator - The description of a single task, it is usually atomic. For example, the BashOperator is used to execute bash command.
  • Task - A parameterised instance of an Operator; a node in the DAG.
  • Task Instance - A specific run of a task; characterised as: a DAG, a Task, and a point in time. It has an indicative state: running, success, failed, skipped, …

    The rest of the Airflow concepts can be found here.

Complete the DAG file

Cloud Composer workflows are comprised of DAGs (Directed Acyclic Graphs). The code shown in simple_load_dag.py is the workflow code, also referred to as the DAG.

Open the file now to see how it is built. Next will be a detailed look at some of the key components of the file.

To orchestrate all the workflow tasks, the DAG imports the following operators:

  • DataFlowPythonOperator
  • PythonOperator

    Action: **Complete the # TODOs in the simple_load_dag.py DAG file below** file while you wait for your Composer environment to be setup.

In [14]:
%%writefile simple_load_dag.py
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A simple Airflow DAG that is triggered externally by a Cloud Function when a
file lands in a GCS bucket.
Once triggered the DAG performs the following steps:
1. Triggers a Google Cloud Dataflow job with the input file information received
   from the Cloud Function trigger.
2. Upon completion of the Dataflow job, the input file is moved to a
   gs://<target-bucket>/<success|failure>/YYYY-MM-DD/ location based on the
   status of the previous step.
"""

import datetime
import logging
import os

from airflow import configuration
from airflow import models
from airflow.contrib.hooks import gcs_hook
from airflow.contrib.operators import dataflow_operator
from airflow.operators import python_operator
from airflow.utils.trigger_rule import TriggerRule

# We set the start_date of the DAG to the previous date. This will
# make the DAG immediately available for scheduling.
YESTERDAY = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

# We define some variables that we will use in the DAG tasks.
SUCCESS_TAG = 'success'
FAILURE_TAG = 'failure'

# An Airflow variable called gcp_completion_bucket is required.
# This variable will contain the name of the bucket to move the processed
# file to.

# '_names' must appear in CSV filename to be ingested (adjust as needed)
# we are only looking for files with the exact name usa_names.csv (you can specify wildcards if you like)
INPUT_BUCKET_CSV = 'gs://'+models.Variable.get('gcp_input_location')+'/usa_names.csv' 

# TODO: Populate the models.Variable.get() with the actual variable name for your output bucket
COMPLETION_BUCKET = 'gs://'+models.Variable.get('gcp_completion_bu____')
DS_TAG = '{{ ds }}'
DATAFLOW_FILE = os.path.join(
    configuration.get('core', 'dags_folder'), 'dataflow', 'process_delimited.py')

# The following additional Airflow variables should be set:
# gcp_project:         Google Cloud Platform project id.
# gcp_temp_location:   Google Cloud Storage location to use for Dataflow temp location.
DEFAULT_DAG_ARGS = {
    'start_date': YESTERDAY,
    'retries': 2,

    # TODO: Populate the models.Variable.get() with the variable name for your GCP Project
    'project_id': models.Variable.get('gcp_pro____'),
    'dataflow_default_options': {
        'project': models.Variable.get('gcp_pro____'),

        # TODO: Populate the models.Variable.get() with the variable name for temp location
        'temp_location': 'gs://'+models.Variable.get('gcp_temp_l_______'),
        'runner': 'DataflowRunner'
    }
}


def move_to_completion_bucket(target_bucket, target_infix, **kwargs):
    """A utility method to move an object to a target location in GCS."""
    # Here we establish a connection hook to GoogleCloudStorage.
    # Google Cloud Composer automatically provides a google_cloud_storage_default
    # connection id that is used by this hook.
    conn = gcs_hook.GoogleCloudStorageHook()

    # The external trigger (Google Cloud Function) that initiates this DAG
    # provides a dag_run.conf dictionary with event attributes that specify
    # the information about the GCS object that triggered this DAG.
    # We extract the bucket and object name from this dictionary.
    source_bucket = models.Variable.get('gcp_input_location')
    source_object = models.Variable.get('gcp_input_location')+'/usa_names.csv' 
    completion_ds = kwargs['ds']

    target_object = os.path.join(target_infix, completion_ds, source_object)

    logging.info('Copying %s to %s',
                 os.path.join(source_bucket, source_object),
                 os.path.join(target_bucket, target_object))
    conn.copy(source_bucket, source_object, target_bucket, target_object)

    logging.info('Deleting %s',
                 os.path.join(source_bucket, source_object))
    conn.delete(source_bucket, source_object)


# Setting schedule_interval to None as this DAG is externally trigger by a Cloud Function.
# The following Airflow variables should be set for this DAG to function:
# bq_output_table: BigQuery table that should be used as the target for
#                  Dataflow in <dataset>.<tablename> format.
#                  e.g. lake.usa_names
# input_field_names: Comma separated field names for the delimited input file.
#                  e.g. state,gender,year,name,number,created_date

# TODO: Name the DAG id GcsToBigQueryTriggered
with models.DAG(dag_id='GcsToBigQueryTr_______',
                description='A DAG triggered by an external Cloud Function',
                schedule_interval=None, default_args=DEFAULT_DAG_ARGS) as dag:
    # Args required for the Dataflow job.
    job_args = {
        'input': INPUT_BUCKET_CSV,

        # TODO: Populate the models.Variable.get() with the variable name for BQ table
        'output': models.Variable.get('bq_output_t____'),

        # TODO: Populate the models.Variable.get() with the variable name for input field names
        'fields': models.Variable.get('input_field_n____'),
        'load_dt': DS_TAG
    }

    # Main Dataflow task that will process and load the input delimited file.
    # TODO: Specify the type of operator we need to call to invoke DataFlow
    dataflow_task = dataflow_operator.DataFlowPythonOp_______(
        task_id="process-delimited-and-push",
        py_file=DATAFLOW_FILE,
        options=job_args)

    # Here we create two conditional tasks, one of which will be executed
    # based on whether the dataflow_task was a success or a failure.
    success_move_task = python_operator.PythonOperator(task_id='success-move-to-completion',
                                                       python_callable=move_to_completion_bucket,
                                                       # A success_tag is used to move
                                                       # the input file to a success
                                                       # prefixed folder.
                                                       op_args=[models.Variable.get('gcp_completion_bucket'), SUCCESS_TAG],
                                                       provide_context=True,
                                                       trigger_rule=TriggerRule.ALL_SUCCESS)

    failure_move_task = python_operator.PythonOperator(task_id='failure-move-to-completion',
                                                       python_callable=move_to_completion_bucket,
                                                       # A failure_tag is used to move
                                                       # the input file to a failure
                                                       # prefixed folder.
                                                       op_args=[models.Variable.get('gcp_completion_bucket'), FAILURE_TAG],
                                                       provide_context=True,
                                                       trigger_rule=TriggerRule.ALL_FAILED)

    # The success_move_task and failure_move_task are both downstream from the
    # dataflow_task.
    dataflow_task >> success_move_task
    dataflow_task >> failure_move_task


Overwriting simple_load_dag.py

Viewing environment information

Now that you have a completed DAG, it's time to copy it to your Cloud Composer environment and finish the setup of your workflow.

  1. Go back to Composer to check on the status of your environment.
  2. Once your environment has been created, click the name of the environment to see its details.

    The Environment details page provides information, such as the Airflow web UI URL, Google Kubernetes Engine cluster ID, name of the Cloud Storage bucket connected to the DAGs folder.

    Cloud Composer uses Cloud Storage to store Apache Airflow DAGs, also known as workflows. Each environment has an associated Cloud Storage bucket. Cloud Composer schedules only the DAGs in the Cloud Storage bucket.

Setting Airflow variables

Our DAG relies on variables to pass in values like the GCP Project. We can set these in the Admin UI.

Airflow variables are an Airflow-specific concept that is distinct from environment variables. In this step, you'll set the following six Airflow variables used by the DAG we will deploy.


In [ ]:
## Run this to display which key value pairs to input
import pandas as pd
pd.DataFrame([
  ('gcp_project', PROJECT),
  ('gcp_input_location', PROJECT + '_input'),
  ('gcp_temp_location', PROJECT + '_output/tmp'),
  ('gcp_completion_bucket', PROJECT + '_output'),
  ('input_field_names', 'state,gender,year,name,number,created_date'),
  ('bq_output_table', 'ml_pipeline.ingest_table')
], columns = ['Key', 'Value'])

Option 1: Set the variables using the Airflow webserver UI

  1. In your Airflow environment, select Admin > Variables
  2. Populate each key value in the table with the required variables from the above table

Option 2: Set the variables using the Airflow CLI

The next gcloud composer command executes the Airflow CLI sub-command variables. The sub-command passes the arguments to the gcloud command line tool.

To set the three variables, run the gcloud composer command once for each row from the above table. Just as an example, to set the variable gcp_project you could do this:


In [ ]:
%%bash
gcloud composer environments run ENVIRONMENT_NAME \
 --location ${REGION} variables -- \
 --set gcp_project ${PROJECT}

Copy your Airflow bucket name

  1. Navigate to your Cloud Composer instance


  2. Select DAGs Folder


  3. You will be taken to the Google Cloud Storage bucket that Cloud Composer has created automatically for your Airflow instance


  4. Copy the bucket name into the variable below (example: us-central1-composer-08f6edeb-bucket)

In [ ]:
AIRFLOW_BUCKET = 'us-central1-composer-21587538-bucket' # REPLACE WITH AIRFLOW BUCKET NAME
os.environ['AIRFLOW_BUCKET'] = AIRFLOW_BUCKET

Copy your Airflow files to your Airflow bucket


In [ ]:
%%bash
gsutil cp simple_load_dag.py gs://${AIRFLOW_BUCKET}/dags # overwrite DAG file if it exists
gsutil cp -r dataflow/process_delimited.py gs://${AIRFLOW_BUCKET}/dags/dataflow/ # copy Dataflow job to be ran

To access the Airflow web interface using the GCP Console:

  1. Go back to the Composer Environments page.
  2. In the Airflow webserver column for the environment, click the new window icon.
  3. The Airflow web UI opens in a new browser window.

Trigger DAG run manually

Running your DAG manually ensures that it operates successfully even in the absence of triggered events.

  1. Trigger the DAG manually click the play button under Links

Part Two: Trigger DAG run automatically from a file upload to GCS

Now that your manual workflow runs successfully, you will now trigger it based on an external event.

Create a Cloud Function to trigger your workflow

We will be following this reference guide to setup our Cloud Function

  1. In the code block below, uncomment the project_id, location, and composer_environment and populate them
  2. Run the below code to get your CLIENT_ID (needed later)

In [ ]:
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

project_id = 'your-project-id'
location = 'us-central1'
composer_environment = 'composer'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
composer_response = authed_session.request('GET', environment_url)
environment_data = composer_response.json()
airflow_uri = environment_data['config']['airflowUri']

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers['location']

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string['client_id'][0])

Create the Cloud Function

  1. Navigate to Compute > Cloud Functions
  2. Select Create function
  3. For name specify 'gcs-dag-trigger-function'
  4. For trigger type select 'Cloud Storage'
  5. For event type select 'Finalize/Create'
  6. For bucket, specify the input bucket you created earlier

Important: be sure to select the input bucket and not the output bucket to avoid an endless triggering loop)

populate index.js

Complete the four required constants defined below in index.js code and paste it into the Cloud Function editor (the js code will not run in this notebook). The constants are:

  • PROJECT_ID
  • CLIENT_ID (from earlier)
  • WEBSERVER_ID (part of Airflow webserver URL)
  • DAG_NAME (GcsToBigQueryTriggered)

In [ ]:
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'GcsToBigQueryTriggered';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

populate package.json

Copy and paste the below into package.json


In [ ]:
{
  "name": "nodejs-docs-samples-functions-composer-storage-trigger",
  "version": "0.0.1",
  "dependencies": {
    "form-data": "^2.3.2",
    "node-fetch": "^2.2.0"
  },
  "engines": {
    "node": ">=8.0.0"
  },
  "private": true,
  "license": "Apache-2.0",
  "author": "Google Inc.",
  "repository": {
    "type": "git",
    "url": "https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git"
  },
  "devDependencies": {
    "@google-cloud/nodejs-repo-tools": "^3.3.0",
    "mocha": "^6.0.0",
    "proxyquire": "^2.1.0",
    "sinon": "^7.2.7"
  },
  "scripts": {
    "test": "mocha test/*.test.js --timeout=20000"
  }
}
  1. For Function to execute, specify triggerDag (note: case sensitive)
  2. Select Create

Upload CSVs and Monitor

  1. Practice uploading and editing CSVs named usa_names.csv into your input bucket (note: the DAG filters to only ingest CSVs with 'usa_names.csv' as the filepath. Adjust this as needed in the DAG code.)
  2. Troubleshoot Cloud Function call errors by monitoring the logs. In the below screenshot we filter in Logging for our most recent Dataflow job and are scrolling through to ensure the job is processing and outputting records to BigQuery

  1. Troubleshoot Airflow workflow errors by monitoring the Browse > DAG Runs

Congratulations!

You’ve have completed this advanced lab on triggering a workflow with a Cloud Function.