In [1]:
%pip install --upgrade --quiet 'apache-beam[gcp]'
In [1]:
# CHANGE THIS to try this notebook out
PROJECT='cloud-training-demos'
# London Bicycles is in EU, so process it in EU and make sure bucket is in EU
BUCKET='cloud-training-demos-eu'
REGION='europe-west1'
Let's get the duration of all the rides in a station
In [2]:
from google.cloud import bigquery
bq = bigquery.Client(project=PROJECT)
query = """
SELECT duration
FrOM `bigquery-public-data.london_bicycles.cycle_hire`
WHERE start_station_id = 708
"""
df = bq.query(query, location='EU').to_dataframe()
print(df.describe())
Let's plot the distribution of these rides
In [3]:
ax = df.plot.hist(range=[0, 3000], bins=20);
Fit to a Gamma distribution
In [4]:
from scipy import stats
ag,bg,cg = stats.gamma.fit(df['duration'])
print(ag, bg, cg)
In [5]:
%%writefile requirements.txt
numpy
scipy
In [ ]:
# output dataset
!bq --location=EU mk --dataset ch05eu
In [7]:
import apache_beam as beam
import logging
import datetime, os
def compute_fit(row):
from scipy import stats
import numpy as np
durations = row['duration_array']
ag, bg, cg = stats.gamma.fit(durations)
if np.isfinite(ag) and np.isfinite(bg) and np.isfinite(cg):
result = {}
result['station_id'] = str(row['start_station_id'])
result['ag'] = ag
result['bg'] = bg
result['cg'] = cg
yield result
def run_job(in_test_mode=True):
import shutil, os, subprocess
job_name = 'computestats' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
if in_test_mode:
print('Launching local job ... hang on')
OUTPUT_DIR = './station_stats'
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
os.makedirs(OUTPUT_DIR)
else:
print('Launching Dataflow job {} ... hang on'.format(job_name))
OUTPUT_DIR = 'gs://{0}/station_stats'.format(BUCKET)
try:
subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
except:
pass
options = {
'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
'job_name': job_name,
'project': PROJECT,
'region': REGION,
'teardown_policy': 'TEARDOWN_ALWAYS',
'no_save_main_session': True,
'requirements_file': 'requirements.txt'
}
opts = beam.pipeline.PipelineOptions(flags = [], **options)
if in_test_mode:
RUNNER = 'DirectRunner'
else:
RUNNER = 'DataflowRunner'
if in_test_mode:
query = """
SELECT start_station_id, ARRAY_AGG(duration) AS duration_array
FROM `bigquery-public-data.london_bicycles.cycle_hire`
WHERE start_station_id BETWEEN 700 AND 710
GROUP BY start_station_id
"""
else:
query = """
SELECT start_station_id, ARRAY_AGG(duration) AS duration_array
FROM `bigquery-public-data.london_bicycles.cycle_hire`
GROUP BY start_station_id
"""
with beam.Pipeline(RUNNER, options = opts) as p:
(p
| 'read_bq' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
| 'compute_fit' >> beam.FlatMap(compute_fit)
| 'write_bq' >> beam.io.gcp.bigquery.WriteToBigQuery(
'ch05eu.station_stats', schema='station_id:string,ag:FLOAT64,bg:FLOAT64,cg:FLOAT64')
)
run_job(in_test_mode = False)
print('Done')
In [8]:
%%bigquery stations
SELECT * from ch05eu.station_stats
In [9]:
stats = stations.describe()
stats
Out[9]:
In [10]:
stations.plot.hist('ag', range=[stats['ag']['25%'], stats['ag']['75%']]);
In [11]:
stations.plot.hist('bg', range=[stats['bg']['25%'], stats['bg']['75%']]);
In [12]:
stations.plot.hist('cg', range=[stats['cg']['25%'], stats['cg']['75%']]);
In [13]:
ax = stations.plot.scatter('ag', 'cg');
ax.set_xlim(stats['ag']['25%'], stats['ag']['75%'])
ax.set_ylim(stats['cg']['25%'], stats['cg']['75%'])
Out[13]:
Copyright Google Inc. 2019 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.