Structured data prediction using Cloud ML Engine

This notebook illustrates:

  1. Creating datasets for Machine Learning using Dataflow
  2. Creating a model using the high-level Estimator API
  3. Training on Cloud ML Engine
  4. Deploying model
  5. Predicting with model

In [1]:
pip install --user apache-beam[gcp]==2.16.0

Run the command again if you are getting oauth2client error. Restart the kernel before proceeding further.


In [2]:
# change these to try this notebook out
BUCKET = 'cloud-training-demos-ml' # Replace with the your bucket name
PROJECT = 'cloud-training-demos' # Replace with your project-id
REGION = 'us-central1'

In [3]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [4]:
%%bash
gcloud config set project $PROJECT

In [5]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
  gsutil mb -l ${REGION} gs://${BUCKET}
fi
gsutil -m cp -R gs://cloud-training-demos/babyweight gs://${BUCKET}

A variety of factors seem to play a part in the baby's weight. Male babies are heavier on average than female babies. Teenaged and older moms tend to have lower-weight babies. Twins, triplets, etc. are lower weight than single births. Preemies weigh in lower as do babies born to single moms. In addition, it is important to check whether you have enough data (number of babies) for each input value. Otherwise, the model prediction against input values that doesn't have enough data may not be reliable.

In the rest of this notebook, we will use machine learning to combine all of these factors to come up with a prediction of a baby's weight.

Create ML dataset using Dataflow

We can use Cloud Dataflow to read in the BigQuery data and write it out as CSV files.

Instead of using Beam/Dataflow, I had three other options:

  1. Use Cloud Dataprep to visually author a Dataflow pipeline. Cloud Dataprep also allows me to explore the data, so we could have avoided much of the handcoding of Python/Seaborn calls above as well!
  2. Read from BigQuery directly using TensorFlow.
  3. Use the BigQuery console (http://bigquery.cloud.google.com) to run a Query and save the result as a CSV file. For larger datasets, you may have to select the option to "allow large results" and save the result into a CSV file on Google Cloud Storage. </ol>
  4. However, in this case, I want to do some preprocessing, modifying data so that we can simulate what is known if no ultrasound has been performed. If I didn't need preprocessing, I could have used the web console. Also, I prefer to script it out rather than run queries on the user interface, so I am using Cloud Dataflow for the preprocessing.

    Note that I have set in_test_mode=True in the following code -- this will run the code locally on a small subset of the data -- the full results were copied into your bucket using the following code (in the previous cell):

    gsutil -m cp -R gs://cloud-training-demos/babyweight gs://${BUCKET}
    
    If you are running in your own project, change in_test_mode=False; after you launch this, the notebook will appear to be hung. Go to the GCP webconsole to the Dataflow section and monitor the running job. It took about 30 minutes for me with autoscaling to 15 workers -- Qwiklabs accounts won't be able scale to that level, and so doing it on Qwiklabs would have taken 3-4 hours. Hence, the short-cut.

    
    
    In [6]:
    import apache_beam as beam
    import datetime
    
    def to_csv(rowdict):
        # pull columns from BQ and create a line
        import hashlib
        import copy
        CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks'.split(',')
        
        # create synthetic data where we assume that no ultrasound has been performed
        # and so we don't know sex of the baby. Let's assume that we can tell the difference
        # between single and multiple, but that the errors rates in determining exact number
        # is difficult in the absence of an ultrasound.
        no_ultrasound = copy.deepcopy(rowdict)
        w_ultrasound = copy.deepcopy(rowdict)
    
        no_ultrasound['is_male'] = 'Unknown'
        if rowdict['plurality'] > 1:
          no_ultrasound['plurality'] = 'Multiple(2+)'
        else:
          no_ultrasound['plurality'] = 'Single(1)'
          
        # Change the plurality column to strings
        w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality']-1]
        
        # Write out two rows for each input row, one with ultrasound and one without
        for result in [no_ultrasound, w_ultrasound]:
          data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
          key = hashlib.sha224(data.encode('utf-8')).hexdigest()  # hash the columns to form a key
          yield str('{},{}'.format(data, key))
      
    def preprocess(in_test_mode):
        job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
        
        if in_test_mode:
            print ('Launching local job ... hang on')
            OUTPUT_DIR = './preproc'
        else:
            print ('Launching Dataflow job {} ... hang on'.format(job_name))
            OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)
        
        options = {
            'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
            'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
            'job_name': job_name,
            'project': PROJECT,
            'teardown_policy': 'TEARDOWN_ALWAYS',
            'no_save_main_session': True
        }
        opts = beam.pipeline.PipelineOptions(flags=[], **options)
        if in_test_mode:
            RUNNER = 'DirectRunner'
        else:
            RUNNER = 'DataflowRunner'
        p = beam.Pipeline(RUNNER, options=opts)
        query = """
    SELECT
      weight_pounds,
      is_male,
      mother_age,
      plurality,
      gestation_weeks,
      FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
    FROM
      publicdata.samples.natality
    WHERE year > 2000
    AND weight_pounds > 0
    AND mother_age > 0
    AND plurality > 0
    AND gestation_weeks > 0
    AND month > 0
        """
      
        if in_test_mode:
            query = query + ' LIMIT 100' 
      
        for step in ['train', 'eval']:
            if step == 'train':
                selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query)
            else:
                selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)
    
            (p 
             | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=selquery, use_standard_sql=True))
             | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
             | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
            )
     
        job = p.run()
      
    preprocess(in_test_mode=True)
    
    
    
    In [7]:
    !ls preproc
    

    
    
    In [8]:
    %%bash
    gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*
    

    Create TensorFlow model using TensorFlow's Estimator API

    First, write an input_fn to read the data.

    
    
    In [9]:
    import shutil
    import numpy as np
    import tensorflow as tf
    
    
    
    In [10]:
    CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks,key'.split(',')
    LABEL_COLUMN = 'weight_pounds'
    KEY_COLUMN = 'key'
    DEFAULTS = [[0.0], ['null'], [0.0], ['null'], [0.0], ['nokey']]
    TRAIN_STEPS = 1000
    
    def read_dataset(prefix, pattern, batch_size=512):
        # use prefix to create filename
        filename = 'gs://{}/babyweight/preproc/{}*{}*'.format(BUCKET, prefix, pattern)
        if prefix == 'train':
            mode = tf.estimator.ModeKeys.TRAIN
            num_epochs = None # indefinitely
        else:
            mode = tf.estimator.ModeKeys.EVAL
            num_epochs = 1 # end-of-input after this
        
        # the actual input function passed to TensorFlow
        def _input_fn():
            # could be a path to one file or a file pattern.
            input_file_names = tf.train.match_filenames_once(filename)
            filename_queue = tf.train.string_input_producer(
                input_file_names, shuffle=True, num_epochs=num_epochs)
     
            # read CSV
            reader = tf.TextLineReader()
            _, value = reader.read_up_to(filename_queue, num_records=batch_size)
            if mode == tf.estimator.ModeKeys.TRAIN:
              value = tf.train.shuffle_batch([value], batch_size, capacity=10*batch_size, 
                                             min_after_dequeue=batch_size, enqueue_many=True, 
                                             allow_smaller_final_batch=False)
            value_column = tf.expand_dims(value, -1)
            columns = tf.decode_csv(value_column, record_defaults=DEFAULTS)
            features = dict(zip(CSV_COLUMNS, columns))
            features.pop(KEY_COLUMN)
            label = features.pop(LABEL_COLUMN)
            return features, label
      
        return _input_fn
    

    Next, define the feature columns

    
    
    In [11]:
    def get_wide_deep():
        # define column types
        is_male,mother_age,plurality,gestation_weeks = \
            [\
                tf.feature_column.categorical_column_with_vocabulary_list('is_male', 
                            ['True', 'False', 'Unknown']),
                tf.feature_column.numeric_column('mother_age'),
                tf.feature_column.categorical_column_with_vocabulary_list('plurality',
                            ['Single(1)', 'Twins(2)', 'Triplets(3)',
                             'Quadruplets(4)', 'Quintuplets(5)','Multiple(2+)']),
                tf.feature_column.numeric_column('gestation_weeks')
            ]
    
        # discretize
        age_buckets = tf.feature_column.bucketized_column(mother_age, 
                            boundaries=np.arange(15,45,1).tolist())
        gestation_buckets = tf.feature_column.bucketized_column(gestation_weeks, 
                            boundaries=np.arange(17,47,1).tolist())
          
        # sparse columns are wide 
        wide = [is_male,
                plurality,
                age_buckets,
                gestation_buckets]
        
        # feature cross all the wide columns and embed into a lower dimension
        crossed = tf.feature_column.crossed_column(wide, hash_bucket_size=20000)
        embed = tf.feature_column.embedding_column(crossed, 3)
        
        # continuous columns are deep
        deep = [mother_age,
                gestation_weeks,
                embed]
        return wide, deep
    

    To predict with the TensorFlow model, we also need a serving input function. We will want all the inputs from our user.

    
    
    In [12]:
    def serving_input_fn():
        feature_placeholders = {
            'is_male': tf.placeholder(tf.string, [None]),
            'mother_age': tf.placeholder(tf.float32, [None]),
            'plurality': tf.placeholder(tf.string, [None]),
            'gestation_weeks': tf.placeholder(tf.float32, [None])
        }
        features = {
            key: tf.expand_dims(tensor, -1)
            for key, tensor in feature_placeholders.items()
        }
        return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)
    

    Finally, train!

    
    
    In [13]:
    from tensorflow.contrib.learn.python.learn.utils import saved_model_export_utils
    from tensorflow.contrib.learn.python.learn import learn_runner
    
    PATTERN = "00001-of-"  # process only one of the shards, for testing purposes
    
    def train_and_evaluate(output_dir):
        wide, deep = get_wide_deep()
        estimator = tf.estimator.DNNLinearCombinedRegressor(
                             model_dir=output_dir,
                             linear_feature_columns=wide,
                             dnn_feature_columns=deep,
                             dnn_hidden_units=[64, 32])
        train_spec=tf.estimator.TrainSpec(
                             input_fn=read_dataset('train', PATTERN),
                             max_steps=TRAIN_STEPS)
        exporter = tf.estimator.FinalExporter('exporter',serving_input_fn)
        eval_spec=tf.estimator.EvalSpec(
                             input_fn=read_dataset('eval', PATTERN),
                             steps=None,
                             exporters=exporter)
        tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
        
    shutil.rmtree('babyweight_trained', ignore_errors=True) # start fresh each time
    train_and_evaluate('babyweight_trained')
    
    
    
    In [14]:
    !ls babyweight_trained
    

    Now that we have the TensorFlow code working on a subset of the data (in the code above, I was reading only the 00001-of-x file), we can package the TensorFlow code up as a Python module and train it on Cloud ML Engine.

    Train on Cloud ML Engine

    Training on Cloud ML Engine requires:

    1. Making the code a Python package
    2. Using gcloud to submit the training code to Cloud ML Engine </ol>
    3. The code in model.py is the same as in the above cells. I just moved it to a file so that I could package it up as a module. (explore the directory structure).

      
      
      In [15]:
      %%bash
      grep "^def" babyweight/trainer/model.py
      
      
      
      
      def read_dataset(prefix, batch_size=512):
      def get_wide_deep():
      def serving_input_fn():
      def experiment_fn(output_dir):
      

      After moving the code to a package, make sure it works standalone. (Note the --pattern and --train_steps lines so that I am not trying to boil the ocean on my laptop). Even then, this takes about a minute in which you won't see any output ...

      
      
      In [16]:
      %%bash
      echo "bucket=${BUCKET}"
      rm -rf babyweight_trained
      export PYTHONPATH=${PYTHONPATH}:${PWD}/babyweight
      python -m trainer.task \
         --bucket=${BUCKET} \
         --output_dir=babyweight_trained \
         --job-dir=./tmp \
         --pattern="00001-of-" --train_steps=1000
      

      Once the code works in standalone mode, you can run it on Cloud ML Engine. Note that doing it on the entire dataset will take a while. The training run took about an hour for me on STANDARD_1. So, for the purposes of Qwiklabs, this is done on a partial dataset on a smaller tier (BASIC_1). Monitor the progress of this job in the Cloud ML Engine section of the GCP Console. Once you see a green check mark, run the next Datalab cell.

      
      
      In [17]:
      %%bash
      OUTDIR=gs://${BUCKET}/babyweight/trained_model
      JOBNAME=babyweight_$(date -u +%y%m%d_%H%M%S)
      echo $OUTDIR $REGION $JOBNAME
      gsutil -m rm -rf $OUTDIR
      gcloud ai-platform jobs submit training $JOBNAME \
         --region=$REGION \
         --module-name=trainer.task \
         --package-path=$(pwd)/babyweight/trainer \
         --job-dir=$OUTDIR \
         --staging-bucket=gs://$BUCKET \
         --scale-tier=BASIC \
         --runtime-version 1.14 \
         -- \
         --bucket=${BUCKET} \
         --output_dir=${OUTDIR} \
         --pattern="00001-of-" \
         --train_steps=2000
      

      Training finished with a RMSE of 1 lb. Obviously, this is our first model. We could probably add in some features, discretize the mother's age, and do some hyper-parameter tuning to get to a lower RMSE. I'll leave that to you. If you create a better model, I'd love to hear about it -- please do write a short blog post about what you did, and tweet it at me -- @lak_gcp.

      Monitor training with TensorBoard

      To activate TensorBoard within the JupyterLab UI navigate to "File" - "New Launcher". Then double-click the 'Tensorboard' icon on the bottom row.

      TensorBoard 1 will appear in the new tab. Navigate through the three tabs to see the active TensorBoard. The 'Graphs' and 'Projector' tabs offer very interesting information including the ability to replay the tests.

      You may close the TensorBoard tab when you are finished exploring.

      Make sure that training is complete before you move to the next step. Visit the Cloud ML Engine web console and wait until you see a green check mark before you proceed

      Deploy trained model

      Deploying the trained model to act as a REST web service is a simple gcloud call.

      
      
      In [18]:
      %%bash
      gsutil ls gs://${BUCKET}/babyweight/trained_model/export/exporter
      

      If you are getting error then wait for a minute and run the command again.

      
      
      In [19]:
      %%bash
      MODEL_NAME="babyweight"
      MODEL_VERSION="qwiklab"
      MODEL_LOCATION=$(gsutil ls gs://${BUCKET}/babyweight/trained_model/export/exporter/ | tail -1)
      echo "Deleting and deploying $MODEL_NAME $MODEL_VERSION from $MODEL_LOCATION ... this will take a few minutes"
      #gcloud ai-platform versions delete ${MODEL_VERSION} --model ${MODEL_NAME}
      #gcloud ai-platform models delete ${MODEL_NAME}
      gcloud ai-platform models create ${MODEL_NAME} --regions $REGION
      gcloud ai-platform versions create ${MODEL_VERSION} --model ${MODEL_NAME} --origin ${MODEL_LOCATION} --runtime-version 1.14
      

      Use model to predict

      Send a JSON request to the endpoint of the service to make it predict a baby's weight ... I am going to try out how well the model would have predicted the weights of our two kids and a couple of variations while we are at it ...

      
      
      In [20]:
      from googleapiclient import discovery
      from oauth2client.client import GoogleCredentials
      import json
      
      credentials = GoogleCredentials.get_application_default()
      api = discovery.build('ml', 'v1', credentials=credentials)
      
      request_data = {'instances':
        [
          {
            'is_male': 'True',
            'mother_age': 26.0,
            'plurality': 'Single(1)',
            'gestation_weeks': 39
          },
          {
            'is_male': 'False',
            'mother_age': 29.0,
            'plurality': 'Single(1)',
            'gestation_weeks': 38
          },
          {
            'is_male': 'True',
            'mother_age': 26.0,
            'plurality': 'Triplets(3)',
            'gestation_weeks': 39
          },
          {
            'is_male': 'Unknown',
            'mother_age': 29.0,
            'plurality': 'Multiple(2+)',
            'gestation_weeks': 38
          },
        ]
      }
      
      parent = 'projects/%s/models/%s/versions/%s' % (PROJECT, 'babyweight', 'qwiklab')
      response = api.projects().predict(body=request_data, name=parent).execute()
      print ("response={0}".format(response))
      

      Copyright 2018 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License