Example of using the Google Cloud Client Library for BigQuery

This is the recommended way to programmatically access BigQuery.

The API documentation is here: https://googleapis.github.io/google-cloud-python/latest/bigquery/reference.html. Because it is impossible to cover the full API, we strongly suggest that you have a browser window open to the documentation as you read through this notebook and try it out.

Install library and extensions if needed

On Notebook instances on Google Cloud, the BigQuery client library is already installed.


In [ ]:
# Uncomment if necessary.
# !python -m pip install --upgrade google.cloud.bigquery

Authenticate and build stubs


In [11]:
PROJECT='cloud-training-demos'  # CHANGE THIS
from google.cloud import bigquery
bq = bigquery.Client(project=PROJECT)

Dataset manipulation

Get info about a dataset


In [12]:
# information about the ch04 dataset in our project
dataset_id = "{}.ch04".format(PROJECT)
dsinfo = bq.get_dataset(dataset_id)
print(dsinfo.dataset_id)
print(dsinfo.created)


ch04
2019-01-26 00:41:01.350000+00:00

By default, the project in the Client is used


In [13]:
# information about the ch04 dataset in our project
dsinfo = bq.get_dataset("ch04")
print(dsinfo.dataset_id)
print(dsinfo.created)


ch04
2019-01-26 00:41:01.350000+00:00

Get info about a dataset in some other project


In [14]:
dsinfo = bq.get_dataset('bigquery-public-data.london_bicycles')
print('{} created on {}'.format(dsinfo.dataset_id, dsinfo.created))


london_bicycles created on 2017-05-25 13:26:18.055000+00:00

Another way is to create a dataset reference


In [15]:
from google.cloud.bigquery.dataset import DatasetReference
dsinfo = bq.get_dataset('bigquery-public-data.london_bicycles')
print('{} created on {} in {}'.format(dsinfo.dataset_id, dsinfo.created, dsinfo.location))
for access in dsinfo.access_entries:
    if access.role == 'READER':
        print(access)


london_bicycles created on 2017-05-25 13:26:18.055000+00:00 in EU
<AccessEntry: role=READER, specialGroup=allAuthenticatedUsers>
<AccessEntry: role=READER, domain=google.com>
<AccessEntry: role=READER, specialGroup=projectReaders>

Deleting a dataset


In [17]:
bq.delete_dataset('ch05', not_found_ok=True)

Creating a dataset


In [18]:
dataset_id = "{}.ch05".format(PROJECT)
ds = bq.create_dataset(dataset_id, exists_ok=True)
print('{} created on {} in {}'.format(ds.dataset_id, ds.created, ds.location))


ch05 created on 2019-03-12 16:58:20.438000+00:00 in US

Creating a dataset in EU


In [21]:
dataset_id = "{}.ch05eu".format(PROJECT)
dsinfo = bigquery.Dataset(dataset_id)
dsinfo.location = 'EU'
ds = bq.create_dataset(dsinfo, exists_ok=True)
print('{} created on {} in {}'.format(ds.dataset_id, ds.created, ds.location))


ch05eu created on 2019-03-12 16:59:49.085000+00:00 in EU

Updating a dataset


In [22]:
dsinfo = bq.get_dataset("ch05")
print(dsinfo.description)
dsinfo.description = "Chapter 5 of BigQuery: The Definitive Guide"
dsinfo = bq.update_dataset(dsinfo, ['description'])
print(dsinfo.description)


None
Chapter 5 of BigQuery: The Definitive Guide

Adding access to a dataset programmatically


In [23]:
dsinfo = bq.get_dataset("ch05")
entry = bigquery.AccessEntry(
    role="READER",
    entity_type="userByEmail",
    entity_id="vlakshmanan@google.com",
)
if entry not in dsinfo.access_entries:
  entries = list(dsinfo.access_entries)
  entries.append(entry)
  dsinfo.access_entries = entries
  dsinfo = bq.update_dataset(dsinfo, ["access_entries"])  # API request
else:
  print('{} already has access'.format(entry.entity_id))
print(dsinfo.access_entries)


[<AccessEntry: role=WRITER, specialGroup=projectWriters>, <AccessEntry: role=OWNER, specialGroup=projectOwners>, <AccessEntry: role=OWNER, userByEmail=vlakshmanan@google.com>, <AccessEntry: role=READER, specialGroup=projectReaders>, <AccessEntry: role=READER, userByEmail=vlakshmanan@google.com>]

Table manipulation

List tables in dataset


In [24]:
# list tables in dataset
tables = bq.list_tables("bigquery-public-data.london_bicycles")
for table in tables:
    print(table.table_id)


cycle_hire
cycle_stations

View table properties


In [13]:
table = bq.get_table("bigquery-public-data.london_bicycles.cycle_stations")
print('{} rows in {} (descr: {})'.format(table.num_rows, table.table_id, table.description))
for field in table.schema:
  if 'count' in field.name:
    print(field)


787 rows in cycle_stations (descr: None)
SchemaField('bikes_count', 'INTEGER', 'NULLABLE', '', ())
SchemaField('docks_count', 'INTEGER', 'NULLABLE', '', ())

Deleting a table


In [26]:
bq.delete_table('ch05.temp_table', not_found_ok=True)

Creating a table


In [27]:
table_id = '{}.ch05.temp_table'.format(PROJECT)
table = bq.create_table(table_id, exists_ok=True)
print('{} created on {}'.format(table.table_id, table.created))


temp_table created on 2019-03-12 17:17:24.127000+00:00

Update table schema


In [28]:
schema = [
  bigquery.SchemaField("chapter", "INTEGER", mode="REQUIRED"),
  bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
]
table_id = '{}.ch05.temp_table'.format(PROJECT)
table = bq.get_table(table_id)
print(table.etag)
table.schema = schema
table = bq.update_table(table, ["schema"])
print(table.schema)
print(table.etag)


F77wTFawTyp59e+U3hIMFg==
[SchemaField('chapter', 'INTEGER', 'REQUIRED', None, ()), SchemaField('title', 'STRING', 'REQUIRED', None, ())]
YdsTki4u3FOYeBXiHuHsrg==

Insert rows into table


In [29]:
rows = [
  (1, u'What is BigQuery?'),
  (2, u'Query essentials'),
]
print(table.table_id, table.num_rows)
errors = bq.insert_rows(table, rows)
print(errors)
table = bq.get_table(table_id)
print(table.table_id, table.num_rows) # won't be updated because streaming


temp_table 0
[]
temp_table 0

In [20]:
## This will fail because the data type on the 2nd row is wrong
rows = [
  ('3', u'Operating on data types'),
  ('wont work', u'This will fail'),
  ('4', u'Loading data into BigQuery'),
]
errors = bq.insert_rows(table, rows)
print(errors)


[{'index': 1, 'errors': [{'reason': 'invalid', 'location': 'chapter', 'debugInfo': '', 'message': 'Cannot convert value to integer (bad value):wont work'}]}, {'index': 0, 'errors': [{'reason': 'stopped', 'location': '', 'debugInfo': '', 'message': ''}]}, {'index': 2, 'errors': [{'reason': 'stopped', 'location': '', 'debugInfo': '', 'message': ''}]}]

Creating an empty table with schema


In [30]:
schema = [
  bigquery.SchemaField("chapter", "INTEGER", mode="REQUIRED"),
  bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
]

table_id = '{}.ch05.temp_table2'.format(PROJECT)
table = bigquery.Table(table_id, schema)
table = bq.create_table(table, exists_ok=True)
print('{} created on {}'.format(table.table_id, table.created))
print(table.schema)


temp_table2 created on 2019-03-12 17:19:25.915000+00:00
[SchemaField('chapter', 'INTEGER', 'REQUIRED', None, ()), SchemaField('title', 'STRING', 'REQUIRED', None, ())]

In [22]:
# remove the two temporary tables
bq.delete_table('ch05.temp_table', not_found_ok=True)
bq.delete_table('ch05.temp_table2', not_found_ok=True)

Loading a Pandas data frame


In [ ]:
!python -m pip install pyarrow

In [35]:
bq.delete_table('ch05.temp_table3', not_found_ok=True)

import pandas as pd
data = [
  (1, u'What is BigQuery?'),
  (2, u'Query essentials'),
]
df = pd.DataFrame(data, columns=['chapter', 'title'])
table_id = '{}.ch05.temp_table3'.format(PROJECT)
job = bq.load_table_from_dataframe(df, table_id)
job.result() # blocks and waits
print("Loaded {} rows into {}".format(job.output_rows, table_id))


Loaded 2 rows into cloud-training-demos.ch05.temp_table3

By default, this appends rows:


In [36]:
print('Num rows = ', bq.get_table(table_id).num_rows)
job = bq.load_table_from_dataframe(df, table_id)
job.result() # blocks and waits
print("Loaded {} rows into {}".format(job.output_rows, table_id))
print('Num rows = ', bq.get_table(table_id).num_rows)


Num rows =  2
Loaded 2 rows into cloud-training-demos.ch05.temp_table3
Num rows =  4

Using the write disposition allows you to truncate the table


In [41]:
from google.cloud.bigquery.job import LoadJobConfig, WriteDisposition, CreateDisposition

print('Num rows = ', bq.get_table(table_id).num_rows)
load_config = LoadJobConfig(
  create_disposition=CreateDisposition.CREATE_IF_NEEDED,
  write_disposition=WriteDisposition.WRITE_TRUNCATE)
job = bq.load_table_from_dataframe(df, table_id, job_config=load_config)
job.result() # blocks and waits
print("Loaded {} rows into {}".format(job.output_rows, table_id))
print('Num rows = ', bq.get_table(table_id).num_rows)


Num rows =  6
Loaded 2 rows into cloud-training-demos.ch05.temp_table3
Num rows =  2

Loading from a URI


In [42]:
import time

job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.CSV
job_config.null_marker = 'NULL'

uri = "gs://bigquery-oreilly-book/college_scorecard.csv"
table_id = '{}.ch05.college_scorecard_gcs'.format(PROJECT)
job = bq.load_table_from_uri(uri, table_id, job_config=job_config)
while not job.done():
  print('.', end='', flush=True)
  time.sleep(0.1)
print('Done')
table = bq.get_table(table_id)
print("Loaded {} rows into {}.".format(table.num_rows, table.table_id))


................................................................................................................................Done
Loaded 7175 rows into college_scorecard_gcs.

In [43]:
bq.delete_table('ch05.college_scorecard_gcs', not_found_ok=True)

Loading from a file object


In [29]:
import time
import gzip

job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.CSV
job_config.null_marker = 'NULL'

table_id = '{}.ch05.college_scorecard_local'.format(PROJECT)

with gzip.open('../04_load/college_scorecard.csv.gz') as fp:
  job = bq.load_table_from_file(fp, table_id, job_config=job_config)
while not job.done():
  print('.', end='', flush=True)
  time.sleep(0.1)
print('Done')
table = bq.get_table(table_id)
print("Loaded {} rows into {}.".format(table.num_rows, table.table_id))


...........................................................................................................................................................................................................Done
Loaded 14350 rows into college_scorecard_local.

In [30]:
bq.delete_table('ch05.college_scorecard_local', not_found_ok=True)

Copying a table


In [31]:
# copy london stations table to our dataset
source_tbl = 'bigquery-public-data.london_bicycles.cycle_stations'
dest_tbl = '{}.ch05eu.cycle_stations_copy'.format(PROJECT)
job = bq.copy_table(source_tbl, dest_tbl, location='EU')
job.result() # blocks and waits
dest_table = bq.get_table(dest_tbl)
print(dest_table.num_rows)


787

Exporting from a table to Cloud Storage


In [ ]:
BUCKET=PROJECT + '-eu-temp'
!gsutil mb -l EU gs://$BUCKET

In [33]:
source_tbl = 'bigquery-public-data.london_bicycles.cycle_stations'
dest_uri = 'gs://{}/tmp/exported/cycle_stations'.format(BUCKET)
config = bigquery.job.ExtractJobConfig(
  destination_format=bigquery.job.DestinationFormat.NEWLINE_DELIMITED_JSON)
job = bq.extract_table(source_tbl, dest_uri, location='EU', job_config=config)
job.result() # blocks and waits

!gsutil cat $dest_uri | head -5


{"id":"381","install_date":"2011-01-28","installed":true,"latitude":51.51953043,"locked":"false","longitude":-0.13577731,"name":"Charlotte Street, Fitzrovia","bikes_count":"0","docks_count":"14","nbEmptyDocks":"14","temporary":false,"terminal_name":"002669"}
{"id":"82","install_date":"2010-07-13","installed":true,"latitude":51.514274,"locked":"false","longitude":-0.111257,"name":"Chancery Lane, Holborn","bikes_count":"0","docks_count":"15","nbEmptyDocks":"15","temporary":false,"terminal_name":"003453"}
{"id":"23","install_date":"2010-07-06","installed":true,"latitude":51.51943538,"locked":"false","longitude":-0.119123345,"name":"Red Lion Square, Holborn","bikes_count":"0","docks_count":"16","nbEmptyDocks":"16","temporary":false,"terminal_name":"003421"}
{"id":"56","install_date":"2010-07-10","installed":true,"latitude":51.52058381,"locked":"false","longitude":-0.154701411,"name":"Paddington Street, Marylebone","bikes_count":"0","docks_count":"16","nbEmptyDocks":"16","temporary":false,"terminal_name":"001033"}
{"id":"120","install_date":"2010-07-15","installed":true,"latitude":51.51573534,"locked":"false","longitude":-0.093080779,"name":"The Guildhall, Guildhall","bikes_count":"0","docks_count":"17","nbEmptyDocks":"16","temporary":false,"terminal_name":"001044"}

In [ ]:
!gsutil rm -rf gs://$BUCKET
!gsutil rb -f gs://$BUCKET

Browsing a table


In [35]:
table_id = 'bigquery-public-data.london_bicycles.cycle_stations'
table = bq.get_table(table_id)
print("Total number of rows = {}".format(table.num_rows)) # 787
fields = [field for field in table.schema 
                if 'count' in field.name or field.name == 'id']
print("Extracting only {}".format(fields))
rows = bq.list_rows(table, 
                    start_index=300, 
                    max_results=5, 
                    selected_fields=fields)
fmt = '{!s:<10} ' * len(rows.schema)
print(fmt.format(*[field.name for field in rows.schema]))
for row in rows:
  print(fmt.format(*row))


Total number of rows = 787
Extracting only [SchemaField('id', 'INTEGER', 'NULLABLE', '', ()), SchemaField('bikes_count', 'INTEGER', 'NULLABLE', '', ()), SchemaField('docks_count', 'INTEGER', 'NULLABLE', '', ())]
id         bikes_count docks_count 
689        9          30         
345        9          33         
644        9          36         
365        9          47         
404        10         13         

Query and get result


In [36]:
query = """
SELECT 
  start_station_name 
  , AVG(duration) as duration
  , COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire 
GROUP BY start_station_name 
ORDER BY num_trips DESC
LIMIT 10
"""
print(query)


SELECT 
  start_station_name 
  , AVG(duration) as duration
  , COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire 
GROUP BY start_station_name 
ORDER BY num_trips DESC
LIMIT 10

Dry run


In [37]:
config = bigquery.QueryJobConfig()
config.dry_run = True
job = bq.query(query, location='EU', job_config=config)
print("This query will process {} bytes.".format(job.total_bytes_processed))


This query will process 903989528 bytes.

Actual execution


In [38]:
# send query request
job = bq.query(query, location='EU')
fmt = '{!s:<40} {:>10d} {:>10d}'
for row in job:
    fields = (row['start_station_name'], 
              (int)(0.5 + row['duration']), 
              row['num_trips']) 
    print(fmt.format(*fields))


Belgrove Street , King's Cross                 1011     234458
Hyde Park Corner, Hyde Park                    2783     215629
Waterloo Station 3, Waterloo                    866     201630
Black Lion Gate, Kensington Gardens            3588     161952
Albert Gate, Hyde Park                         2359     155647
Waterloo Station 1, Waterloo                    992     145910
Wormwood Street, Liverpool Street               976     119447
Hop Exchange, The Borough                      1218     115135
Wellington Arch, Hyde Park                     2276     110260
Triangle Car Park, Hyde Park                   2233     108347

Query result to Pandas dataframe


In [39]:
query = """
SELECT 
  start_station_name 
  , AVG(duration) as duration
  , COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire 
GROUP BY start_station_name
"""
df = bq.query(query, location='EU').to_dataframe()
print(df.describe())


          duration      num_trips
count   880.000000     880.000000
mean   1348.351153   27692.273864
std     434.057829   23733.621289
min       0.000000       1.000000
25%    1078.684974   13033.500000
50%    1255.889223   23658.500000
75%    1520.504055   35450.500000
max    4836.380090  234458.000000

Parameterized query to get only trips longer than some duration


In [40]:
query2 = """
SELECT 
  start_station_name 
  , COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire 
WHERE duration >= @min_duration
GROUP BY start_station_name 
ORDER BY num_trips DESC
LIMIT 10
"""
print(query2)


SELECT 
  start_station_name 
  , COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire 
WHERE duration >= @min_duration
GROUP BY start_station_name 
ORDER BY num_trips DESC
LIMIT 10


In [41]:
config = bigquery.QueryJobConfig()
config.query_parameters = [
  bigquery.ScalarQueryParameter('min_duration', "INT64", 600)
]
job = bq.query(query2, location='EU', job_config=config)
fmt = '{!s:<40} {:>10d}'
for row in job:
    fields = (row['start_station_name'], 
              row['num_trips']) 
    print(fmt.format(*fields))


Hyde Park Corner, Hyde Park                  203592
Belgrove Street , King's Cross               168110
Waterloo Station 3, Waterloo                 148809
Albert Gate, Hyde Park                       145794
Black Lion Gate, Kensington Gardens          137930
Waterloo Station 1, Waterloo                 106092
Wellington Arch, Hyde Park                   102770
Triangle Car Park, Hyde Park                  99368
Wormwood Street, Liverpool Street             82483
Palace Gate, Kensington Gardens               80342

Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License