Gather training data by querying BigQuery
GCP, BigQuery, Kubeflow, Pipeline
A Kubeflow Pipeline component to submit a query to BigQuery and store the result in a Cloud Storage bucket.
Use this Kubeflow component to:
Argument | Description | Optional | Data type | Accepted values | Default |
---|---|---|---|---|---|
query | The query used by BigQuery to fetch the results. | No | String | ||
project_id | The project ID of the Google Cloud Platform (GCP) project to use to execute the query. | No | GCPProjectID | ||
dataset_id | The ID of the persistent BigQuery dataset to store the results of the query. If the dataset does not exist, the operation will create a new one. | Yes | String | None | |
table_id | The ID of the BigQuery table to store the results of the query. If the table ID is absent, the operation will generate a random ID for the table. | Yes | String | None | |
output_gcs_path | The path to the Cloud Storage bucket to store the query output. | Yes | GCSPath | None | |
dataset_location | The location where the dataset is created. Defaults to US. | Yes | String | US | |
job_config | The full configuration specification for the query job. See QueryJobConfig for details. | Yes | Dict | A JSONobject which has the same structure as QueryJobConfig | None |
The input data is a BigQuery job containing a query that pulls data f rom various sources.
Name | Description | Type |
---|---|---|
output_gcs_path | The path to the Cloud Storage bucket containing the query output in CSV format. | GCSPath |
To use the component, the following requirements must be met:
roles/bigquery.admin
role of the project.roles/storage.objectCreator
role of the Cloud Storage output bucket.This Kubeflow Pipeline component is used to:
Submit a query to BigQuery.
Use the code below as an example of how to run your BigQuery job.
Note: The following sample code works in an IPython notebook or directly in Python code.
In [ ]:
%%capture --no-stderr
KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'
!pip3 install $KFP_PACKAGE --upgrade
In [ ]:
import kfp.components as comp
bigquery_query_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/bigquery/query/component.yaml')
help(bigquery_query_op)
In [ ]:
QUERY = 'SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` LIMIT 10'
In [ ]:
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'
GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash
In [ ]:
# Optional Parameters
EXPERIMENT_NAME = 'Bigquery -Query'
OUTPUT_PATH = '{}/bigquery/query/questions.csv'.format(GCS_WORKING_DIR)
In [ ]:
import kfp.dsl as dsl
import json
@dsl.pipeline(
name='Bigquery query pipeline',
description='Bigquery query pipeline'
)
def pipeline(
query=QUERY,
project_id = PROJECT_ID,
dataset_id='',
table_id='',
output_gcs_path=OUTPUT_PATH,
dataset_location='US',
job_config=''
):
bigquery_query_op(
query=query,
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
output_gcs_path=output_gcs_path,
dataset_location=dataset_location,
job_config=job_config)
In [ ]:
pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
In [ ]:
#Specify pipeline argument values
arguments = {}
#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
In [ ]:
!gsutil cat OUTPUT_PATH
By deploying or using this software you agree to comply with the AI Hub Terms of Service and the Google APIs Terms of Service. To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.