Input pipeline into Keras

In this notebook, we will look at how to read large datasets, datasets that may not fit into memory, using TensorFlow. We can use the pipeline to feed data to Keras models that use a TensorFlow backend.

Learning Objectives

  1. Use to read CSV files
  2. Load the training data into memory
  3. Prune the data by removing columns
  4. Use to map features and labels
  5. Adjust the batch size of our dataset
  6. Shuffle the dataset to optimize for deep learning

Each learning objective will correspond to a #TODO in this student lab notebook -- try to complete this notebook first and then review the solution notebook.

Let's start off with the Python imports that we need.

export PROJECT=$(gcloud config list project --format "value(core.project)")
echo "Your current GCP Project Name is: "$PROJECT

!pip install tensorflow==2.1.0 --user

Let's make sure we install the necessary version of tensorflow. After doing the pip install above, click Restart the kernel on the notebook so that the Python environment picks up the new packages.

import os, json, math
import numpy as np
import shutil
import logging
import tensorflow as tf

print("TensorFlow version: ",tf.version.VERSION)

PROJECT = "your-gcp-project-here" # REPLACE WITH YOUR PROJECT NAME
REGION = "us-central1" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1

# Do not change these
os.environ["PROJECT"] = PROJECT
os.environ["REGION"] = REGION

if PROJECT == "your-gcp-project-here":
  print("Don't forget to update your PROJECT name! Currently:", PROJECT)

# If you're not using TF 2.0+, let's enable eager execution
if tf.version.VERSION < '2.0':
    print('Enabling v2 behavior and eager execution; if necessary restart kernel, and rerun notebook')

Locating the CSV files

We will start with the CSV files that we wrote out in the first notebook of this sequence. Just so you don't have to run the notebook, we saved a copy in ../data

!ls -l ../../data/*.csv

Use to read the CSV files

See the documentation for make_csv_dataset. If you have TFRecords (which is recommended), use make_batched_features_dataset instead.

CSV_COLUMNS  = ['fare_amount',  'pickup_datetime',
                'pickup_longitude', 'pickup_latitude', 
                'dropoff_longitude', 'dropoff_latitude', 
                'passenger_count', 'key']
LABEL_COLUMN = 'fare_amount'
DEFAULTS     = [[0.0],['na'],[0.0],[0.0],[0.0],[0.0],[0.0],['na']]

# load the training data
def load_dataset(pattern):
# TODO 1: Use to read CSV files
# Tip: Refer to:
  return # complete this line

# TODO 2: Load the training data into memory
tempds = load_dataset('') # find and load the taxi-train* into memory

Note that this is a prefetched dataset. If you loop over the dataset, you'll get the rows one-by-one. Let's convert each row into a Python dictionary:

# print a few of the rows
for n, data in enumerate(tempds):
    row_data = {k: v.numpy() for k,v in data.items()}
    print(n, row_data)
    if n > 2:

What we really need is a dictionary of features + a label. So, we have to do two things to the above dictionary. (1) remove the unwanted column "key" and (2) keep the label separate from the features.

# get features, label
def features_and_labels(row_data):
    # TODO 3: Prune the data by removing column named 'key'
    for unwanted_col in ['pickup_datetime', '']: # specify column to remove 
    label = row_data.pop(LABEL_COLUMN)
    return row_data, label  # features, label

# print a few rows to make it sure works
for n, data in enumerate(tempds):
    row_data = {k: v.numpy() for k,v in data.items()}
    features, label = features_and_labels(row_data)
    print(n, label, features)
    if n > 2:


Let's do both (loading, features_label) in our load_dataset function, and also add batching.

def load_dataset(pattern, batch_size):
  return (
      # TODO 4: Use to map features and labels # complete parameters
             .map() # complete with name of features and labels

# TODO 5: Experiment by adjusting batch size
# try changing the batch size and watch what happens.
tempds = load_dataset('../../data/taxi-train*', batch_size=2)

print(list(tempds.take(3))) # truncate and print as a list


When training a deep learning model in batches over multiple workers, it is helpful if we shuffle the data. That way, different workers will be working on different parts of the input file at the same time, and so averaging gradients across workers will help. Also, during training, we will need to read the data indefinitely.

def load_dataset(pattern, batch_size=1, mode=tf.estimator.ModeKeys.EVAL):
  dataset = (, batch_size, CSV_COLUMNS, DEFAULTS)
             .map(features_and_labels) # features, label
  if mode == tf.estimator.ModeKeys.TRAIN:
        # TODO 6: Add dataset.shuffle 1000 to our dataset and have it repeat
        # Tip: Refer to
        dataset = dataset.shuffle(1000).repeat()
  dataset = dataset.prefetch(1) # take advantage of multi-threading; 1=AUTOTUNE
  return dataset

tempds = load_dataset('../../data/taxi-train*', 2, tf.estimator.ModeKeys.TRAIN)
tempds = load_dataset('../../data/taxi-valid*', 2, tf.estimator.ModeKeys.EVAL)

In the next notebook, we will build the model using this input pipeline.

