Input pipeline into Keras

In this notebook, we will look at how to read large datasets, datasets that may not fit into memory, using TensorFlow. We can use the tf.data pipeline to feed data to Keras models that use a TensorFlow backend.

Learning Objectives

  1. Use tf.data to read CSV files
  2. Load the training data into memory
  3. Prune the data by removing columns
  4. Use tf.data to map features and labels
  5. Adjust the batch size of our dataset
  6. Shuffle the dataset to optimize for deep learning

Each learning objective will correspond to a #TODO in this student lab notebook -- try to complete this notebook first and then review the solution notebook.

Let's start off with the Python imports that we need.


In [ ]:
%%bash
export PROJECT=$(gcloud config list project --format "value(core.project)")
echo "Your current GCP Project Name is: "$PROJECT

In [ ]:
%%bash
!pip install tensorflow==2.1.0 --user

Let's make sure we install the necessary version of tensorflow. After doing the pip install above, click Restart the kernel on the notebook so that the Python environment picks up the new packages.


In [ ]:
import os, json, math
import numpy as np
import shutil
import logging
# SET TF ERROR LOG VERBOSITY
logging.getLogger("tensorflow").setLevel(logging.ERROR)
import tensorflow as tf

print("TensorFlow version: ",tf.version.VERSION)

PROJECT = "your-gcp-project-here" # REPLACE WITH YOUR PROJECT NAME
REGION = "us-central1" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1

# Do not change these
os.environ["PROJECT"] = PROJECT
os.environ["REGION"] = REGION
os.environ["BUCKET"] = PROJECT # DEFAULT BUCKET WILL BE PROJECT ID

if PROJECT == "your-gcp-project-here":
  print("Don't forget to update your PROJECT name! Currently:", PROJECT)

In [ ]:
# If you're not using TF 2.0+, let's enable eager execution
if tf.version.VERSION < '2.0':
    print('Enabling v2 behavior and eager execution; if necessary restart kernel, and rerun notebook')
    tf.enable_v2_behavior()

Locating the CSV files

We will start with the CSV files that we wrote out in the first notebook of this sequence. Just so you don't have to run the notebook, we saved a copy in ../data


In [ ]:
!ls -l ../../data/*.csv

Use tf.data to read the CSV files

See the documentation for make_csv_dataset. If you have TFRecords (which is recommended), use make_batched_features_dataset instead.


In [ ]:
CSV_COLUMNS  = ['fare_amount',  'pickup_datetime',
                'pickup_longitude', 'pickup_latitude', 
                'dropoff_longitude', 'dropoff_latitude', 
                'passenger_count', 'key']
LABEL_COLUMN = 'fare_amount'
DEFAULTS     = [[0.0],['na'],[0.0],[0.0],[0.0],[0.0],[0.0],['na']]

In [ ]:
# load the training data
def load_dataset(pattern):
# TODO 1: Use tf.data to read CSV files
# Tip: Refer to: https://www.tensorflow.org/versions/r2.0/api_docs/python/tf/data/experimental/make_csv_dataset
  return tf.data. # complete this line

# TODO 2: Load the training data into memory
tempds = load_dataset('') # find and load the taxi-train* into memory
print(tempds)

Note that this is a prefetched dataset. If you loop over the dataset, you'll get the rows one-by-one. Let's convert each row into a Python dictionary:


In [ ]:
# print a few of the rows
for n, data in enumerate(tempds):
    row_data = {k: v.numpy() for k,v in data.items()}
    print(n, row_data)
    if n > 2:
        break

What we really need is a dictionary of features + a label. So, we have to do two things to the above dictionary. (1) remove the unwanted column "key" and (2) keep the label separate from the features.


In [ ]:
# get features, label
def features_and_labels(row_data):
    # TODO 3: Prune the data by removing column named 'key'
    for unwanted_col in ['pickup_datetime', '']: # specify column to remove 
        row_data.pop(unwanted_col)
    label = row_data.pop(LABEL_COLUMN)
    return row_data, label  # features, label

# print a few rows to make it sure works
for n, data in enumerate(tempds):
    row_data = {k: v.numpy() for k,v in data.items()}
    features, label = features_and_labels(row_data)
    print(n, label, features)
    if n > 2:
        break

Batching

Let's do both (loading, features_label) in our load_dataset function, and also add batching.


In [ ]:
def load_dataset(pattern, batch_size):
  return (
      
      # TODO 4: Use tf.data to map features and labels
      tf.data.experimental.make_csv_dataset() # complete parameters
             .map() # complete with name of features and labels
  )

# TODO 5: Experiment by adjusting batch size
# try changing the batch size and watch what happens.
tempds = load_dataset('../../data/taxi-train*', batch_size=2)


print(list(tempds.take(3))) # truncate and print as a list

Shuffling

When training a deep learning model in batches over multiple workers, it is helpful if we shuffle the data. That way, different workers will be working on different parts of the input file at the same time, and so averaging gradients across workers will help. Also, during training, we will need to read the data indefinitely.


In [ ]:
def load_dataset(pattern, batch_size=1, mode=tf.estimator.ModeKeys.EVAL):
  dataset = (tf.data.experimental.make_csv_dataset(pattern, batch_size, CSV_COLUMNS, DEFAULTS)
             .map(features_and_labels) # features, label
             .cache())
  if mode == tf.estimator.ModeKeys.TRAIN:
        
        # TODO 6: Add dataset.shuffle 1000 to our dataset and have it repeat
        # Tip: Refer to https://www.tensorflow.org/versions/r2.0/api_docs/python/tf/data/Dataset#shuffle
        dataset = dataset.shuffle(1000).repeat()
  dataset = dataset.prefetch(1) # take advantage of multi-threading; 1=AUTOTUNE
  return dataset

tempds = load_dataset('../../data/taxi-train*', 2, tf.estimator.ModeKeys.TRAIN)
print(list(tempds.take(1)))
tempds = load_dataset('../../data/taxi-valid*', 2, tf.estimator.ModeKeys.EVAL)
print(list(tempds.take(1)))

In the next notebook, we will build the model using this input pipeline.

Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.