Feature Engineering

In this notebook, you will learn how to incorporate feature engineering into your pipeline.

  • Working with feature columns
  • Adding feature crosses in TensorFlow
  • Reading data from BigQuery
  • Creating datasets using Dataflow
  • Using a wide-and-deep model

In [ ]:
%%bash
sudo pip install httplib2==0.12.0 apache-beam[gcp]==2.16.0

After doing a pip install, restart your kernel by selecting kernel from the menu and clicking Restart Kernel before proceeding further


In [ ]:
import tensorflow as tf
import apache_beam as beam
import shutil
print(tf.__version__)

1. Environment variables for project and bucket

  • Your project id is the *unique* string that identifies your project (not the project name). You can find this from the GCP Console dashboard's Home page. My dashboard reads: Project ID: cloud-training-demos
  • Cloud training often involves saving and restoring model files. Therefore, we should create a single-region bucket. If you don't have a bucket already, I suggest that you create one from the GCP console (because it will dynamically check whether the bucket name you want is available)
  • </ol> Change the cell below to reflect your Project ID and bucket name.

    
    
    In [ ]:
    import os
    PROJECT = 'cloud-training-demos'    # CHANGE THIS
    BUCKET = 'cloud-training-demos-ml' # REPLACE WITH YOUR BUCKET NAME. Use a regional bucket in the region you selected.
    REGION = 'us-central1' # Choose an available region for Cloud MLE from https://cloud.google.com/ml-engine/docs/regions.
    
    
    
    In [ ]:
    # for bash
    os.environ['PROJECT'] = PROJECT
    os.environ['BUCKET'] = BUCKET
    os.environ['REGION'] = REGION
    os.environ['TFVERSION'] = '1.8' 
    
    ## ensure we're using python3 env
    os.environ['CLOUDSDK_PYTHON'] = 'python3'
    
    
    
    In [ ]:
    %%bash
    gcloud config set project $PROJECT
    gcloud config set compute/region $REGION
    
    ## ensure we predict locally with our current Python environment
    gcloud config set ml_engine/local_python `which python`
    

    2. Specifying query to pull the data

    Let's pull out a few extra columns from the timestamp.

    
    
    In [ ]:
    def create_query(phase, EVERY_N):
      if EVERY_N == None:
        EVERY_N = 4 #use full dataset
        
      #select and pre-process fields
      base_query = """
    SELECT
      (tolls_amount + fare_amount) AS fare_amount,
      DAYOFWEEK(pickup_datetime) AS dayofweek,
      HOUR(pickup_datetime) AS hourofday,
      pickup_longitude AS pickuplon,
      pickup_latitude AS pickuplat,
      dropoff_longitude AS dropofflon,
      dropoff_latitude AS dropofflat,
      passenger_count*1.0 AS passengers,
      CONCAT(STRING(pickup_datetime), STRING(pickup_longitude), STRING(pickup_latitude), STRING(dropoff_latitude), STRING(dropoff_longitude)) AS key
    FROM
      [nyc-tlc:yellow.trips]
    WHERE
      trip_distance > 0
      AND fare_amount >= 2.5
      AND pickup_longitude > -78
      AND pickup_longitude < -70
      AND dropoff_longitude > -78
      AND dropoff_longitude < -70
      AND pickup_latitude > 37
      AND pickup_latitude < 45
      AND dropoff_latitude > 37
      AND dropoff_latitude < 45
      AND passenger_count > 0
      """
      
      #add subsampling criteria by modding with hashkey
      if phase == 'train': 
        query = "{} AND ABS(HASH(pickup_datetime)) % {} < 2".format(base_query,EVERY_N)
      elif phase == 'valid': 
        query = "{} AND ABS(HASH(pickup_datetime)) % {} == 2".format(base_query,EVERY_N)
      elif phase == 'test':
        query = "{} AND ABS(HASH(pickup_datetime)) % {} == 3".format(base_query,EVERY_N)
      return query
        
    print(create_query('valid', 100)) #example query using 1% of data
    

    Try the query above in https://bigquery.cloud.google.com/table/nyc-tlc:yellow.trips if you want to see what it does (ADD LIMIT 10 to the query!)

    3. Preprocessing Dataflow job from BigQuery

    This code reads from BigQuery and saves the data as-is on Google Cloud Storage. We can do additional preprocessing and cleanup inside Dataflow, but then we'll have to remember to repeat that prepreprocessing during inference. It is better to use tf.transform which will do this book-keeping for you, or to do preprocessing within your TensorFlow model. We will look at this in future notebooks. For now, we are simply moving data from BigQuery to CSV using Dataflow.

    While we could read from BQ directly from TensorFlow (See: https://www.tensorflow.org/api_docs/python/tf/contrib/cloud/BigQueryReader), it is quite convenient to export to CSV and do the training off CSV. Let's use Dataflow to do this at scale.

    Because we are running this on the Cloud, you should go to the GCP Console (https://console.cloud.google.com/dataflow) to look at the status of the job. It will take several minutes for the preprocessing job to launch.

    
    
    In [ ]:
    %%bash
    if gsutil ls | grep -q gs://${BUCKET}/taxifare/ch4/taxi_preproc/; then
      gsutil -m rm -rf gs://$BUCKET/taxifare/ch4/taxi_preproc/
    fi
    

    First, let's define a function for preprocessing the data

    
    
    In [ ]:
    import datetime
    
    ####
    # Arguments:
    #   -rowdict: Dictionary. The beam bigquery reader returns a PCollection in
    #     which each row is represented as a python dictionary
    # Returns:
    #   -rowstring: a comma separated string representation of the record with dayofweek
    #     converted from int to string (e.g. 3 --> Tue)
    ####
    def to_csv(rowdict):
      days = ['null', 'Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']
      CSV_COLUMNS = 'fare_amount,dayofweek,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers,key'.split(',')
      rowdict['dayofweek'] = days[rowdict['dayofweek']]
      rowstring = ','.join([str(rowdict[k]) for k in CSV_COLUMNS])
      return rowstring
    
    
    ####
    # Arguments:
    #   -EVERY_N: Integer. Sample one out of every N rows from the full dataset.
    #     Larger values will yield smaller sample
    #   -RUNNER: 'DirectRunner' or 'DataflowRunner'. Specfy to run the pipeline
    #     locally or on Google Cloud respectively. 
    # Side-effects:
    #   -Creates and executes dataflow pipeline. 
    #     See https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline
    ####
    def preprocess(EVERY_N, RUNNER):
      job_name = 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://{0}/taxifare/ch4/taxi_preproc/'.format(BUCKET)
    
      #dictionary of pipeline options
      options = {
        'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
        'job_name': 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
        'project': PROJECT,
        'runner': RUNNER
      }
      #instantiate PipelineOptions object using options dictionary
      opts = beam.pipeline.PipelineOptions(flags=[], **options)
      #instantantiate Pipeline object using PipelineOptions
      with beam.Pipeline(options=opts) as p:
          for phase in ['train', 'valid']:
            query = create_query(phase, EVERY_N) 
            outfile = os.path.join(OUTPUT_DIR, '{}.csv'.format(phase))
            (
              p | 'read_{}'.format(phase) >> beam.io.Read(beam.io.BigQuerySource(query=query))
                | 'tocsv_{}'.format(phase) >> beam.Map(to_csv)
                | 'write_{}'.format(phase) >> beam.io.Write(beam.io.WriteToText(outfile))
            )
      print("Done")
    

    Now, let's run pipeline locally. This takes upto 5 minutes. You will see a message "Done" when it is done.

    
    
    In [ ]:
    preprocess(50*10000, 'DirectRunner')
    
    
    
    In [ ]:
    %%bash
    gsutil ls gs://$BUCKET/taxifare/ch4/taxi_preproc/
    

    4. Run Beam pipeline on Cloud Dataflow

    Run pipeline on cloud on a larger sample size.

    
    
    In [ ]:
    %%bash
    if gsutil ls | grep -q gs://${BUCKET}/taxifare/ch4/taxi_preproc/; then
      gsutil -m rm -rf gs://$BUCKET/taxifare/ch4/taxi_preproc/
    fi
    

    The following step will take 15-20 minutes. Monitor job progress on the Cloud Console, in the Dataflow section

    
    
    In [ ]:
    preprocess(50*100, 'DataflowRunner') 
    #change first arg to None to preprocess full dataset
    

    Once the job completes, observe the files created in Google Cloud Storage

    
    
    In [ ]:
    %%bash
    gsutil ls -l gs://$BUCKET/taxifare/ch4/taxi_preproc/
    
    
    
    In [ ]:
    %%bash
    #print first 10 lines of first shard of train.csv
    gsutil cat "gs://$BUCKET/taxifare/ch4/taxi_preproc/train.csv-00000-of-*" | head
    

    5. Develop model with new inputs

    Download the first shard of the preprocessed data to enable local development.

    
    
    In [ ]:
    %%bash
    if [ -d sample ]; then
      rm -rf sample
    fi
    mkdir sample
    gsutil cat "gs://$BUCKET/taxifare/ch4/taxi_preproc/train.csv-00000-of-*" > sample/train.csv
    gsutil cat "gs://$BUCKET/taxifare/ch4/taxi_preproc/valid.csv-00000-of-*" > sample/valid.csv
    

    We have two new inputs in the INPUT_COLUMNS, three engineered features, and the estimator involves bucketization and feature crosses.

    
    
    In [ ]:
    %%bash
    grep -A 20 "INPUT_COLUMNS =" taxifare/trainer/model.py
    
    
    
    In [ ]:
    %%bash
    grep -A 50 "build_estimator" taxifare/trainer/model.py
    
    
    
    In [ ]:
    %%bash
    grep -A 15 "add_engineered(" taxifare/trainer/model.py
    

    Try out the new model on the local sample (this takes 5 minutes) to make sure it works fine.

    
    
    In [ ]:
    %%bash
    rm -rf taxifare.tar.gz taxi_trained
    export PYTHONPATH=${PYTHONPATH}:${PWD}/taxifare
    python -m trainer.task \
      --train_data_paths=${PWD}/sample/train.csv \
      --eval_data_paths=${PWD}/sample/valid.csv  \
      --output_dir=${PWD}/taxi_trained \
      --train_steps=10 \
      --job-dir=/tmp
    
    
    
    In [ ]:
    %%bash
    ls taxi_trained/export/exporter/
    

    You can use saved_model_cli to look at the exported signature. Note that the model doesn't need any of the engineered features as inputs. It will compute latdiff, londiff, euclidean from the provided inputs, thanks to the add_engineered call in the serving_input_fn.

    
    
    In [ ]:
    %%bash
    model_dir=$(ls ${PWD}/taxi_trained/export/exporter | tail -1)
    saved_model_cli show --dir ${PWD}/taxi_trained/export/exporter/${model_dir} --all
    
    
    
    In [ ]:
    %%writefile /tmp/test.json
    {"dayofweek": "Sun", "hourofday": 17, "pickuplon": -73.885262, "pickuplat": 40.773008, "dropofflon": -73.987232, "dropofflat": 40.732403, "passengers": 2}
    
    
    
    In [ ]:
    %%bash
    model_dir=$(ls ${PWD}/taxi_trained/export/exporter)
    gcloud ml-engine local predict \
      --model-dir=${PWD}/taxi_trained/export/exporter/${model_dir} \
      --json-instances=/tmp/test.json
    

    5. Train on cloud

    This will take 10-15 minutes even though the prompt immediately returns after the job is submitted. Monitor job progress on the Cloud Console, in the AI Platform section and wait for the training job to complete.

    
    
    In [ ]:
    %%bash
    OUTDIR=gs://${BUCKET}/taxifare/ch4/taxi_trained
    JOBNAME=lab4a_$(date -u +%y%m%d_%H%M%S)
    echo $OUTDIR $REGION $JOBNAME
    gsutil -m rm -rf $OUTDIR
    gcloud ai-platform jobs submit training $JOBNAME \
      --region=$REGION \
      --module-name=trainer.task \
      --package-path=${PWD}/taxifare/trainer \
      --job-dir=$OUTDIR \
      --staging-bucket=gs://$BUCKET \
      --scale-tier=BASIC \
      --runtime-version=$TFVERSION \
      -- \
      --train_data_paths="gs://$BUCKET/taxifare/ch4/taxi_preproc/train*" \
      --eval_data_paths="gs://${BUCKET}/taxifare/ch4/taxi_preproc/valid*"  \
      --train_steps=5000 \
      --output_dir=$OUTDIR
    

    The RMSE is now 8.33249, an improvement over the 9.3 that we were getting ... of course, we won't know until we train/validate on a larger dataset. Still, this is promising. But before we do that, let's do hyper-parameter tuning.

    Use the Cloud Console link to monitor the job and do NOT proceed until the job is done.

    
    
    In [ ]:
    %%bash
    gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1
    
    
    
    In [ ]:
    %%bash
    model_dir=$(gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1)
    saved_model_cli show --dir ${model_dir} --all
    
    
    
    In [ ]:
    %%bash
    model_dir=$(gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1)
    gcloud ml-engine local predict \
      --model-dir=${model_dir} \
      --json-instances=/tmp/test.json
    

    Optional: deploy model to cloud

    
    
    In [ ]:
    %%bash
    MODEL_NAME="feateng"
    MODEL_VERSION="v1"
    MODEL_LOCATION=$(gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1)
    echo "Run these commands one-by-one (the very first time, you'll create a model and then create a version)"
    #gcloud ai-platform versions delete ${MODEL_VERSION} --model ${MODEL_NAME}
    #gcloud ai-platform delete ${MODEL_NAME}
    gcloud ai-platform models create ${MODEL_NAME} --regions $REGION
    gcloud ai-platform versions create ${MODEL_VERSION} --model ${MODEL_NAME} --origin ${MODEL_LOCATION} --runtime-version $TFVERSION
    
    
    
    In [ ]:
    %%bash
    gcloud ai-platform predict --model=feateng --version=v1 --json-instances=/tmp/test.json
    

    6. Hyper-parameter tune

    Look at hyper-parameter tuning notebook to decide what parameters to use for model. Based on that run, I ended up choosing:

    1. train_batch_size: 512
    2. nbuckets: 16
    3. hidden_units: "64 64 64 8"

    This gives an RMSE of 5, a considerable improvement from the 8.3 we were getting earlier ... Let's try this over a larger dataset.

    Optional: Run Cloud training on 2 million row dataset

    This run uses as input 2 million rows and takes ~20 minutes with 10 workers (STANDARD_1 pricing tier). The model is exactly the same as above. The only changes are to the input (to use the larger dataset) and to the Cloud MLE tier (to use STANDARD_1 instead of BASIC -- STANDARD_1 is approximately 10x more powerful than BASIC). Because the Dataflow preprocessing takes about 15 minutes, we train here using CSV files in a public bucket.

    When doing distributed training, use train_steps instead of num_epochs. The distributed workers don't know how many rows there are, but we can calculate train_steps = num_rows * num_epochs / train_batch_size. In this case, we have 2141023 * 100 / 512 = 418168 train steps.

    
    
    In [ ]:
    %%bash
    
    WARNING -- this uses significant resources and is optional. Remove this line to run the block.
    
    OUTDIR=gs://${BUCKET}/taxifare/feateng2m
    JOBNAME=lab4a_$(date -u +%y%m%d_%H%M%S)
    TIER=STANDARD_1 
    echo $OUTDIR $REGION $JOBNAME
    gsutil -m rm -rf $OUTDIR
    gcloud ai-platform jobs submit training $JOBNAME \
       --region=$REGION \
       --module-name=trainer.task \
       --package-path=${PWD}/taxifare/trainer \
       --job-dir=$OUTDIR \
       --staging-bucket=gs://$BUCKET \
       --scale-tier=$TIER \
       --runtime-version=$TFVERSION \
       -- \
       --train_data_paths="gs://${BUCKET}/taxifare/train*" \
       --eval_data_paths="gs://${BUCKET}/taxifare/valid*"  \
       --output_dir=$OUTDIR \
       --train_steps=418168 \
       --train_batch_size=512 --nbuckets=16 --hidden_units="64 64 64 8"
    

    The RMSE after training on the 2-million-row dataset is \$3.03. This graph shows the improvements so far ...

    
    
    In [ ]:
    import pandas as pd
    import seaborn as sns
    import numpy as np
    import matplotlib.pyplot as plt
    
    df = pd.DataFrame({'Lab' : pd.Series(['1a', '2-3', '4a', '4b', '4c']),
                  'Method' : pd.Series(['Heuristic Benchmark', 'tf.learn', '+Feature Eng.', '+ Hyperparam', '+ 2m rows']),
                  'RMSE': pd.Series([8.026, 9.4, 8.3, 5.0, 3.03]) })
    
    ax = sns.barplot(data = df, x = 'Method', y = 'RMSE')
    ax.set_ylabel('RMSE (dollars)')
    ax.set_xlabel('Labs/Methods')
    plt.plot(np.linspace(-20, 120, 1000), [5] * 1000, 'b');
    
    
    
    In [ ]:
    %%bash
    gsutil -m mv gs://${BUCKET}/taxifare/ch4/ gs://${BUCKET}/taxifare/ch4_1m/
    

    Copyright 2016 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License