This is the recommended way to programmatically access BigQuery.
The API documentation is here: https://googleapis.github.io/google-cloud-python/latest/bigquery/reference.html. Because it is impossible to cover the full API, we strongly suggest that you have a browser window open to the documentation as you read through this notebook and try it out.
In [ ]:
# Uncomment if necessary.
# !python -m pip install --upgrade google.cloud.bigquery
In [11]:
PROJECT='cloud-training-demos' # CHANGE THIS
from google.cloud import bigquery
bq = bigquery.Client(project=PROJECT)
In [12]:
# information about the ch04 dataset in our project
dataset_id = "{}.ch04".format(PROJECT)
dsinfo = bq.get_dataset(dataset_id)
print(dsinfo.dataset_id)
print(dsinfo.created)
By default, the project in the Client is used
In [13]:
# information about the ch04 dataset in our project
dsinfo = bq.get_dataset("ch04")
print(dsinfo.dataset_id)
print(dsinfo.created)
Get info about a dataset in some other project
In [14]:
dsinfo = bq.get_dataset('bigquery-public-data.london_bicycles')
print('{} created on {}'.format(dsinfo.dataset_id, dsinfo.created))
Another way is to create a dataset reference
In [15]:
from google.cloud.bigquery.dataset import DatasetReference
dsinfo = bq.get_dataset('bigquery-public-data.london_bicycles')
print('{} created on {} in {}'.format(dsinfo.dataset_id, dsinfo.created, dsinfo.location))
for access in dsinfo.access_entries:
if access.role == 'READER':
print(access)
Deleting a dataset
In [17]:
bq.delete_dataset('ch05', not_found_ok=True)
Creating a dataset
In [18]:
dataset_id = "{}.ch05".format(PROJECT)
ds = bq.create_dataset(dataset_id, exists_ok=True)
print('{} created on {} in {}'.format(ds.dataset_id, ds.created, ds.location))
Creating a dataset in EU
In [21]:
dataset_id = "{}.ch05eu".format(PROJECT)
dsinfo = bigquery.Dataset(dataset_id)
dsinfo.location = 'EU'
ds = bq.create_dataset(dsinfo, exists_ok=True)
print('{} created on {} in {}'.format(ds.dataset_id, ds.created, ds.location))
Updating a dataset
In [22]:
dsinfo = bq.get_dataset("ch05")
print(dsinfo.description)
dsinfo.description = "Chapter 5 of BigQuery: The Definitive Guide"
dsinfo = bq.update_dataset(dsinfo, ['description'])
print(dsinfo.description)
Adding access to a dataset programmatically
In [23]:
dsinfo = bq.get_dataset("ch05")
entry = bigquery.AccessEntry(
role="READER",
entity_type="userByEmail",
entity_id="vlakshmanan@google.com",
)
if entry not in dsinfo.access_entries:
entries = list(dsinfo.access_entries)
entries.append(entry)
dsinfo.access_entries = entries
dsinfo = bq.update_dataset(dsinfo, ["access_entries"]) # API request
else:
print('{} already has access'.format(entry.entity_id))
print(dsinfo.access_entries)
List tables in dataset
In [24]:
# list tables in dataset
tables = bq.list_tables("bigquery-public-data.london_bicycles")
for table in tables:
print(table.table_id)
View table properties
In [13]:
table = bq.get_table("bigquery-public-data.london_bicycles.cycle_stations")
print('{} rows in {} (descr: {})'.format(table.num_rows, table.table_id, table.description))
for field in table.schema:
if 'count' in field.name:
print(field)
Deleting a table
In [26]:
bq.delete_table('ch05.temp_table', not_found_ok=True)
Creating a table
In [27]:
table_id = '{}.ch05.temp_table'.format(PROJECT)
table = bq.create_table(table_id, exists_ok=True)
print('{} created on {}'.format(table.table_id, table.created))
Update table schema
In [28]:
schema = [
bigquery.SchemaField("chapter", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
]
table_id = '{}.ch05.temp_table'.format(PROJECT)
table = bq.get_table(table_id)
print(table.etag)
table.schema = schema
table = bq.update_table(table, ["schema"])
print(table.schema)
print(table.etag)
Insert rows into table
In [29]:
rows = [
(1, u'What is BigQuery?'),
(2, u'Query essentials'),
]
print(table.table_id, table.num_rows)
errors = bq.insert_rows(table, rows)
print(errors)
table = bq.get_table(table_id)
print(table.table_id, table.num_rows) # won't be updated because streaming
In [20]:
## This will fail because the data type on the 2nd row is wrong
rows = [
('3', u'Operating on data types'),
('wont work', u'This will fail'),
('4', u'Loading data into BigQuery'),
]
errors = bq.insert_rows(table, rows)
print(errors)
Creating an empty table with schema
In [30]:
schema = [
bigquery.SchemaField("chapter", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
]
table_id = '{}.ch05.temp_table2'.format(PROJECT)
table = bigquery.Table(table_id, schema)
table = bq.create_table(table, exists_ok=True)
print('{} created on {}'.format(table.table_id, table.created))
print(table.schema)
In [22]:
# remove the two temporary tables
bq.delete_table('ch05.temp_table', not_found_ok=True)
bq.delete_table('ch05.temp_table2', not_found_ok=True)
Loading a Pandas data frame
In [ ]:
!python -m pip install pyarrow
In [35]:
bq.delete_table('ch05.temp_table3', not_found_ok=True)
import pandas as pd
data = [
(1, u'What is BigQuery?'),
(2, u'Query essentials'),
]
df = pd.DataFrame(data, columns=['chapter', 'title'])
table_id = '{}.ch05.temp_table3'.format(PROJECT)
job = bq.load_table_from_dataframe(df, table_id)
job.result() # blocks and waits
print("Loaded {} rows into {}".format(job.output_rows, table_id))
By default, this appends rows:
In [36]:
print('Num rows = ', bq.get_table(table_id).num_rows)
job = bq.load_table_from_dataframe(df, table_id)
job.result() # blocks and waits
print("Loaded {} rows into {}".format(job.output_rows, table_id))
print('Num rows = ', bq.get_table(table_id).num_rows)
Using the write disposition allows you to truncate the table
In [41]:
from google.cloud.bigquery.job import LoadJobConfig, WriteDisposition, CreateDisposition
print('Num rows = ', bq.get_table(table_id).num_rows)
load_config = LoadJobConfig(
create_disposition=CreateDisposition.CREATE_IF_NEEDED,
write_disposition=WriteDisposition.WRITE_TRUNCATE)
job = bq.load_table_from_dataframe(df, table_id, job_config=load_config)
job.result() # blocks and waits
print("Loaded {} rows into {}".format(job.output_rows, table_id))
print('Num rows = ', bq.get_table(table_id).num_rows)
Loading from a URI
In [42]:
import time
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.CSV
job_config.null_marker = 'NULL'
uri = "gs://bigquery-oreilly-book/college_scorecard.csv"
table_id = '{}.ch05.college_scorecard_gcs'.format(PROJECT)
job = bq.load_table_from_uri(uri, table_id, job_config=job_config)
while not job.done():
print('.', end='', flush=True)
time.sleep(0.1)
print('Done')
table = bq.get_table(table_id)
print("Loaded {} rows into {}.".format(table.num_rows, table.table_id))
In [43]:
bq.delete_table('ch05.college_scorecard_gcs', not_found_ok=True)
Loading from a file object
In [29]:
import time
import gzip
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.CSV
job_config.null_marker = 'NULL'
table_id = '{}.ch05.college_scorecard_local'.format(PROJECT)
with gzip.open('../04_load/college_scorecard.csv.gz') as fp:
job = bq.load_table_from_file(fp, table_id, job_config=job_config)
while not job.done():
print('.', end='', flush=True)
time.sleep(0.1)
print('Done')
table = bq.get_table(table_id)
print("Loaded {} rows into {}.".format(table.num_rows, table.table_id))
In [30]:
bq.delete_table('ch05.college_scorecard_local', not_found_ok=True)
Copying a table
In [31]:
# copy london stations table to our dataset
source_tbl = 'bigquery-public-data.london_bicycles.cycle_stations'
dest_tbl = '{}.ch05eu.cycle_stations_copy'.format(PROJECT)
job = bq.copy_table(source_tbl, dest_tbl, location='EU')
job.result() # blocks and waits
dest_table = bq.get_table(dest_tbl)
print(dest_table.num_rows)
Exporting from a table to Cloud Storage
In [ ]:
BUCKET=PROJECT + '-eu-temp'
!gsutil mb -l EU gs://$BUCKET
In [33]:
source_tbl = 'bigquery-public-data.london_bicycles.cycle_stations'
dest_uri = 'gs://{}/tmp/exported/cycle_stations'.format(BUCKET)
config = bigquery.job.ExtractJobConfig(
destination_format=bigquery.job.DestinationFormat.NEWLINE_DELIMITED_JSON)
job = bq.extract_table(source_tbl, dest_uri, location='EU', job_config=config)
job.result() # blocks and waits
!gsutil cat $dest_uri | head -5
In [ ]:
!gsutil rm -rf gs://$BUCKET
!gsutil rb -f gs://$BUCKET
Browsing a table
In [35]:
table_id = 'bigquery-public-data.london_bicycles.cycle_stations'
table = bq.get_table(table_id)
print("Total number of rows = {}".format(table.num_rows)) # 787
fields = [field for field in table.schema
if 'count' in field.name or field.name == 'id']
print("Extracting only {}".format(fields))
rows = bq.list_rows(table,
start_index=300,
max_results=5,
selected_fields=fields)
fmt = '{!s:<10} ' * len(rows.schema)
print(fmt.format(*[field.name for field in rows.schema]))
for row in rows:
print(fmt.format(*row))
In [36]:
query = """
SELECT
start_station_name
, AVG(duration) as duration
, COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire
GROUP BY start_station_name
ORDER BY num_trips DESC
LIMIT 10
"""
print(query)
Dry run
In [37]:
config = bigquery.QueryJobConfig()
config.dry_run = True
job = bq.query(query, location='EU', job_config=config)
print("This query will process {} bytes.".format(job.total_bytes_processed))
Actual execution
In [38]:
# send query request
job = bq.query(query, location='EU')
fmt = '{!s:<40} {:>10d} {:>10d}'
for row in job:
fields = (row['start_station_name'],
(int)(0.5 + row['duration']),
row['num_trips'])
print(fmt.format(*fields))
Query result to Pandas dataframe
In [39]:
query = """
SELECT
start_station_name
, AVG(duration) as duration
, COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire
GROUP BY start_station_name
"""
df = bq.query(query, location='EU').to_dataframe()
print(df.describe())
Parameterized query to get only trips longer than some duration
In [40]:
query2 = """
SELECT
start_station_name
, COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire
WHERE duration >= @min_duration
GROUP BY start_station_name
ORDER BY num_trips DESC
LIMIT 10
"""
print(query2)
In [41]:
config = bigquery.QueryJobConfig()
config.query_parameters = [
bigquery.ScalarQueryParameter('min_duration', "INT64", 600)
]
job = bq.query(query2, location='EU', job_config=config)
fmt = '{!s:<40} {:>10d}'
for row in job:
fields = (row['start_station_name'],
row['num_trips'])
print(fmt.format(*fields))
Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License