This notebook illustrates:
In [1]:
pip install --user apache-beam[gcp]==2.16.0
Run the command again if you are getting oauth2client error. Restart the kernel before proceeding further.
In [2]:
# change these to try this notebook out
BUCKET = 'cloud-training-demos-ml' # Replace with the your bucket name
PROJECT = 'cloud-training-demos' # Replace with your project-id
REGION = 'us-central1'
In [3]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION
In [4]:
%%bash
gcloud config set project $PROJECT
In [5]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
gsutil mb -l ${REGION} gs://${BUCKET}
fi
gsutil -m cp -R gs://cloud-training-demos/babyweight gs://${BUCKET}
A variety of factors seem to play a part in the baby's weight. Male babies are heavier on average than female babies. Teenaged and older moms tend to have lower-weight babies. Twins, triplets, etc. are lower weight than single births. Preemies weigh in lower as do babies born to single moms. In addition, it is important to check whether you have enough data (number of babies) for each input value. Otherwise, the model prediction against input values that doesn't have enough data may not be reliable.
In the rest of this notebook, we will use machine learning to combine all of these factors to come up with a prediction of a baby's weight.
We can use Cloud Dataflow to read in the BigQuery data and write it out as CSV files.
Instead of using Beam/Dataflow, I had three other options:
However, in this case, I want to do some preprocessing, modifying data so that we can simulate what is known if no ultrasound has been performed. If I didn't need preprocessing, I could have used the web console. Also, I prefer to script it out rather than run queries on the user interface, so I am using Cloud Dataflow for the preprocessing.
Note that I have set in_test_mode=True in the following code -- this will run the code locally on a small subset of the data -- the full results were copied into your bucket using the following code (in the previous cell):
gsutil -m cp -R gs://cloud-training-demos/babyweight gs://${BUCKET}If you are running in your own project, change in_test_mode=False; after you launch this, the notebook will appear to be hung. Go to the GCP webconsole to the Dataflow section and monitor the running job. It took about 30 minutes for me with autoscaling to 15 workers -- Qwiklabs accounts won't be able scale to that level, and so doing it on Qwiklabs would have taken 3-4 hours. Hence, the short-cut.
In [6]:
import apache_beam as beam
import datetime
def to_csv(rowdict):
# pull columns from BQ and create a line
import hashlib
import copy
CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks'.split(',')
# create synthetic data where we assume that no ultrasound has been performed
# and so we don't know sex of the baby. Let's assume that we can tell the difference
# between single and multiple, but that the errors rates in determining exact number
# is difficult in the absence of an ultrasound.
no_ultrasound = copy.deepcopy(rowdict)
w_ultrasound = copy.deepcopy(rowdict)
no_ultrasound['is_male'] = 'Unknown'
if rowdict['plurality'] > 1:
no_ultrasound['plurality'] = 'Multiple(2+)'
else:
no_ultrasound['plurality'] = 'Single(1)'
# Change the plurality column to strings
w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality']-1]
# Write out two rows for each input row, one with ultrasound and one without
for result in [no_ultrasound, w_ultrasound]:
data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
key = hashlib.sha224(data.encode('utf-8')).hexdigest() # hash the columns to form a key
yield str('{},{}'.format(data, key))
def preprocess(in_test_mode):
job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
if in_test_mode:
print ('Launching local job ... hang on')
OUTPUT_DIR = './preproc'
else:
print ('Launching Dataflow job {} ... hang on'.format(job_name))
OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)
options = {
'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
'job_name': job_name,
'project': PROJECT,
'teardown_policy': 'TEARDOWN_ALWAYS',
'no_save_main_session': True
}
opts = beam.pipeline.PipelineOptions(flags=[], **options)
if in_test_mode:
RUNNER = 'DirectRunner'
else:
RUNNER = 'DataflowRunner'
p = beam.Pipeline(RUNNER, options=opts)
query = """
SELECT
weight_pounds,
is_male,
mother_age,
plurality,
gestation_weeks,
FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
publicdata.samples.natality
WHERE year > 2000
AND weight_pounds > 0
AND mother_age > 0
AND plurality > 0
AND gestation_weeks > 0
AND month > 0
"""
if in_test_mode:
query = query + ' LIMIT 100'
for step in ['train', 'eval']:
if step == 'train':
selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query)
else:
selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)
(p
| '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=selquery, use_standard_sql=True))
| '{}_csv'.format(step) >> beam.FlatMap(to_csv)
| '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
)
job = p.run()
preprocess(in_test_mode=True)
In [7]:
!ls preproc
In [8]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*
First, write an input_fn to read the data.
In [9]:
import shutil
import numpy as np
import tensorflow as tf
In [10]:
CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks,key'.split(',')
LABEL_COLUMN = 'weight_pounds'
KEY_COLUMN = 'key'
DEFAULTS = [[0.0], ['null'], [0.0], ['null'], [0.0], ['nokey']]
TRAIN_STEPS = 1000
def read_dataset(prefix, pattern, batch_size=512):
# use prefix to create filename
filename = 'gs://{}/babyweight/preproc/{}*{}*'.format(BUCKET, prefix, pattern)
if prefix == 'train':
mode = tf.estimator.ModeKeys.TRAIN
num_epochs = None # indefinitely
else:
mode = tf.estimator.ModeKeys.EVAL
num_epochs = 1 # end-of-input after this
# the actual input function passed to TensorFlow
def _input_fn():
# could be a path to one file or a file pattern.
input_file_names = tf.train.match_filenames_once(filename)
filename_queue = tf.train.string_input_producer(
input_file_names, shuffle=True, num_epochs=num_epochs)
# read CSV
reader = tf.TextLineReader()
_, value = reader.read_up_to(filename_queue, num_records=batch_size)
if mode == tf.estimator.ModeKeys.TRAIN:
value = tf.train.shuffle_batch([value], batch_size, capacity=10*batch_size,
min_after_dequeue=batch_size, enqueue_many=True,
allow_smaller_final_batch=False)
value_column = tf.expand_dims(value, -1)
columns = tf.decode_csv(value_column, record_defaults=DEFAULTS)
features = dict(zip(CSV_COLUMNS, columns))
features.pop(KEY_COLUMN)
label = features.pop(LABEL_COLUMN)
return features, label
return _input_fn
Next, define the feature columns
In [11]:
def get_wide_deep():
# define column types
is_male,mother_age,plurality,gestation_weeks = \
[\
tf.feature_column.categorical_column_with_vocabulary_list('is_male',
['True', 'False', 'Unknown']),
tf.feature_column.numeric_column('mother_age'),
tf.feature_column.categorical_column_with_vocabulary_list('plurality',
['Single(1)', 'Twins(2)', 'Triplets(3)',
'Quadruplets(4)', 'Quintuplets(5)','Multiple(2+)']),
tf.feature_column.numeric_column('gestation_weeks')
]
# discretize
age_buckets = tf.feature_column.bucketized_column(mother_age,
boundaries=np.arange(15,45,1).tolist())
gestation_buckets = tf.feature_column.bucketized_column(gestation_weeks,
boundaries=np.arange(17,47,1).tolist())
# sparse columns are wide
wide = [is_male,
plurality,
age_buckets,
gestation_buckets]
# feature cross all the wide columns and embed into a lower dimension
crossed = tf.feature_column.crossed_column(wide, hash_bucket_size=20000)
embed = tf.feature_column.embedding_column(crossed, 3)
# continuous columns are deep
deep = [mother_age,
gestation_weeks,
embed]
return wide, deep
To predict with the TensorFlow model, we also need a serving input function. We will want all the inputs from our user.
In [12]:
def serving_input_fn():
feature_placeholders = {
'is_male': tf.placeholder(tf.string, [None]),
'mother_age': tf.placeholder(tf.float32, [None]),
'plurality': tf.placeholder(tf.string, [None]),
'gestation_weeks': tf.placeholder(tf.float32, [None])
}
features = {
key: tf.expand_dims(tensor, -1)
for key, tensor in feature_placeholders.items()
}
return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)
Finally, train!
In [13]:
from tensorflow.contrib.learn.python.learn.utils import saved_model_export_utils
from tensorflow.contrib.learn.python.learn import learn_runner
PATTERN = "00001-of-" # process only one of the shards, for testing purposes
def train_and_evaluate(output_dir):
wide, deep = get_wide_deep()
estimator = tf.estimator.DNNLinearCombinedRegressor(
model_dir=output_dir,
linear_feature_columns=wide,
dnn_feature_columns=deep,
dnn_hidden_units=[64, 32])
train_spec=tf.estimator.TrainSpec(
input_fn=read_dataset('train', PATTERN),
max_steps=TRAIN_STEPS)
exporter = tf.estimator.FinalExporter('exporter',serving_input_fn)
eval_spec=tf.estimator.EvalSpec(
input_fn=read_dataset('eval', PATTERN),
steps=None,
exporters=exporter)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
shutil.rmtree('babyweight_trained', ignore_errors=True) # start fresh each time
train_and_evaluate('babyweight_trained')
In [14]:
!ls babyweight_trained
Now that we have the TensorFlow code working on a subset of the data (in the code above, I was reading only the 00001-of-x file), we can package the TensorFlow code up as a Python module and train it on Cloud ML Engine.
Training on Cloud ML Engine requires:
The code in model.py is the same as in the above cells. I just moved it to a file so that I could package it up as a module. (explore the directory structure).
In [15]:
%%bash
grep "^def" babyweight/trainer/model.py
After moving the code to a package, make sure it works standalone. (Note the --pattern and --train_steps lines so that I am not trying to boil the ocean on my laptop). Even then, this takes about a minute in which you won't see any output ...
In [16]:
%%bash
echo "bucket=${BUCKET}"
rm -rf babyweight_trained
export PYTHONPATH=${PYTHONPATH}:${PWD}/babyweight
python -m trainer.task \
--bucket=${BUCKET} \
--output_dir=babyweight_trained \
--job-dir=./tmp \
--pattern="00001-of-" --train_steps=1000
Once the code works in standalone mode, you can run it on Cloud ML Engine. Note that doing it on the entire dataset will take a while. The training run took about an hour for me on STANDARD_1. So, for the purposes of Qwiklabs, this is done on a partial dataset on a smaller tier (BASIC_1). Monitor the progress of this job in the Cloud ML Engine section of the GCP Console. Once you see a green check mark, run the next Datalab cell.
In [17]:
%%bash
OUTDIR=gs://${BUCKET}/babyweight/trained_model
JOBNAME=babyweight_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ai-platform jobs submit training $JOBNAME \
--region=$REGION \
--module-name=trainer.task \
--package-path=$(pwd)/babyweight/trainer \
--job-dir=$OUTDIR \
--staging-bucket=gs://$BUCKET \
--scale-tier=BASIC \
--runtime-version 1.14 \
-- \
--bucket=${BUCKET} \
--output_dir=${OUTDIR} \
--pattern="00001-of-" \
--train_steps=2000
Training finished with a RMSE of 1 lb. Obviously, this is our first model. We could probably add in some features, discretize the mother's age, and do some hyper-parameter tuning to get to a lower RMSE. I'll leave that to you. If you create a better model, I'd love to hear about it -- please do write a short blog post about what you did, and tweet it at me -- @lak_gcp.
To activate TensorBoard within the JupyterLab UI navigate to "File" - "New Launcher". Then double-click the 'Tensorboard' icon on the bottom row.
TensorBoard 1 will appear in the new tab. Navigate through the three tabs to see the active TensorBoard. The 'Graphs' and 'Projector' tabs offer very interesting information including the ability to replay the tests.
You may close the TensorBoard tab when you are finished exploring.
Make sure that training is complete before you move to the next step. Visit the Cloud ML Engine web console and wait until you see a green check mark before you proceed
Deploying the trained model to act as a REST web service is a simple gcloud call.
In [18]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/trained_model/export/exporter
If you are getting error then wait for a minute and run the command again.
In [19]:
%%bash
MODEL_NAME="babyweight"
MODEL_VERSION="qwiklab"
MODEL_LOCATION=$(gsutil ls gs://${BUCKET}/babyweight/trained_model/export/exporter/ | tail -1)
echo "Deleting and deploying $MODEL_NAME $MODEL_VERSION from $MODEL_LOCATION ... this will take a few minutes"
#gcloud ai-platform versions delete ${MODEL_VERSION} --model ${MODEL_NAME}
#gcloud ai-platform models delete ${MODEL_NAME}
gcloud ai-platform models create ${MODEL_NAME} --regions $REGION
gcloud ai-platform versions create ${MODEL_VERSION} --model ${MODEL_NAME} --origin ${MODEL_LOCATION} --runtime-version 1.14
Send a JSON request to the endpoint of the service to make it predict a baby's weight ... I am going to try out how well the model would have predicted the weights of our two kids and a couple of variations while we are at it ...
In [20]:
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
import json
credentials = GoogleCredentials.get_application_default()
api = discovery.build('ml', 'v1', credentials=credentials)
request_data = {'instances':
[
{
'is_male': 'True',
'mother_age': 26.0,
'plurality': 'Single(1)',
'gestation_weeks': 39
},
{
'is_male': 'False',
'mother_age': 29.0,
'plurality': 'Single(1)',
'gestation_weeks': 38
},
{
'is_male': 'True',
'mother_age': 26.0,
'plurality': 'Triplets(3)',
'gestation_weeks': 39
},
{
'is_male': 'Unknown',
'mother_age': 29.0,
'plurality': 'Multiple(2+)',
'gestation_weeks': 38
},
]
}
parent = 'projects/%s/models/%s/versions/%s' % (PROJECT, 'babyweight', 'qwiklab')
response = api.projects().predict(body=request_data, name=parent).execute()
print ("response={0}".format(response))
Copyright 2018 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License