Learning Objectives
In this notebook, we will start by refactoring the linear regression we implemented in the previous lab so that is takes its data from atf.data.Dataset
, and we will learn how to implement stochastic gradient descent with it. In this case, the original dataset will be synthetic and read by the tf.data
API directly from memory.
In a second part, we will learn how to load a dataset with the tf.data
API when the dataset resides on disk.
In [ ]:
!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst
In [ ]:
# Ensure the right version of Tensorflow is installed.
!pip freeze | grep tensorflow==2.1 || pip install tensorflow==2.1
In [ ]:
import json
import math
import os
from pprint import pprint
import numpy as np
import tensorflow as tf
print(tf.version.VERSION)
Let's consider the synthetic dataset of the previous section:
In [ ]:
N_POINTS = 10
X = tf.constant(range(N_POINTS), dtype=tf.float32)
Y = 2 * X + 10
We begin with implementing a function that takes as input
epochs
)batch_size
)and returns a tf.data.Dataset
:
Remark: Note that the last batch may not contain the exact number of elements you specified because the dataset was exhausted.
If you want batches with the exact same number of elements per batch, we will have to discard the last batch by setting:
dataset = dataset.batch(batch_size, drop_remainder=True)
We will do that here.
In [ ]:
# TODO 1
def create_dataset(X, Y, epochs, batch_size):
dataset = tf.data.Dataset.from_tensor_slices((X, Y))
dataset = dataset.repeat(epochs).batch(batch_size, drop_remainder=True)
return dataset
Let's test our function by iterating twice over our dataset in batches of 3 datapoints:
In [ ]:
BATCH_SIZE = 3
EPOCH = 2
dataset = create_dataset(X, Y, epochs=1, batch_size=3)
for i, (x, y) in enumerate(dataset):
print("x:", x.numpy(), "y:", y.numpy())
assert len(x) == BATCH_SIZE
assert len(y) == BATCH_SIZE
assert EPOCH
The loss function and the function that computes the gradients are the same as before:
In [ ]:
def loss_mse(X, Y, w0, w1):
Y_hat = w0 * X + w1
errors = (Y_hat - Y)**2
return tf.reduce_mean(errors)
def compute_gradients(X, Y, w0, w1):
with tf.GradientTape() as tape:
loss = loss_mse(X, Y, w0, w1)
return tape.gradient(loss, [w0, w1])
The main difference now is that now, in the traning loop, we will iterate directly on the tf.data.Dataset
generated by our create_dataset
function.
We will configure the dataset so that it iterates 250 times over our synthetic dataset in batches of 2.
In [ ]:
# TODO 2
EPOCHS = 250
BATCH_SIZE = 2
LEARNING_RATE = .02
MSG = "STEP {step} - loss: {loss}, w0: {w0}, w1: {w1}\n"
w0 = tf.Variable(0.0)
w1 = tf.Variable(0.0)
dataset = create_dataset(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
for step, (X_batch, Y_batch) in enumerate(dataset):
dw0, dw1 = compute_gradients(X_batch, Y_batch, w0, w1)
w0.assign_sub(dw0 * LEARNING_RATE)
w1.assign_sub(dw1 * LEARNING_RATE)
if step % 100 == 0:
loss = loss_mse(X_batch, Y_batch, w0, w1)
print(MSG.format(step=step, loss=loss, w0=w0.numpy(), w1=w1.numpy()))
assert loss < 0.0001
assert abs(w0 - 2) < 0.001
assert abs(w1 - 10) < 0.001
In [ ]:
!ls -l ../data/taxi*.csv
The tf.data
API can easily read csv files using the helper function
tf.data.experimental.make_csv_dataset
If you have TFRecords (which is recommended), you may use
The first step is to define
CSV_COLUMNS
DEFAULTS
In [ ]:
CSV_COLUMNS = [
'fare_amount',
'pickup_datetime',
'pickup_longitude',
'pickup_latitude',
'dropoff_longitude',
'dropoff_latitude',
'passenger_count',
'key'
]
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]
Let's now wrap the call to make_csv_dataset
into its own function that will take only the file pattern (i.e. glob) where the dataset files are to be located:
In [ ]:
# TODO 3
def create_dataset(pattern):
return tf.data.experimental.make_csv_dataset(
pattern, 1, CSV_COLUMNS, DEFAULTS)
tempds = create_dataset('../data/taxi-train*')
print(tempds)
Note that this is a prefetched dataset, where each element is an OrderedDict
whose keys are the feature names and whose values are tensors of shape (1,)
(i.e. vectors).
Let's iterate over the two first element of this dataset using dataset.take(2)
and let's convert them ordinary Python dictionary with numpy array as values for more readability:
In [ ]:
for data in tempds.take(2):
pprint({k: v.numpy() for k, v in data.items()})
print("\n")
What we really need is a dictionary of features + a label. So, we have to do two things to the above dictionary:
Let's first implement a funciton that takes as input a row (represented as an OrderedDict
in our tf.data.Dataset
as above) and then returns a tuple with two elements:
OrderedDict
with the label droppedfare_amount
)Note that we will need to also remove the key
and pickup_datetime
column, which we won't use.
In [ ]:
UNWANTED_COLS = ['pickup_datetime', 'key']
# TODO 4a
def features_and_labels(row_data):
label = row_data.pop(LABEL_COLUMN)
features = row_data
for unwanted_col in UNWANTED_COLS:
features.pop(unwanted_col)
return features, label
Let's iterate over 2 examples from our tempds
dataset and apply our feature_and_labels
function to each of the examples to make sure it's working:
In [ ]:
for row_data in tempds.take(2):
features, label = features_and_labels(row_data)
pprint(features)
print(label, "\n")
assert UNWANTED_COLS[0] not in features.keys()
assert UNWANTED_COLS[1] not in features.keys()
assert label.shape == [1]
Let's now refactor our create_dataset
function so that it takes an additional argument batch_size
and batch the data correspondingly. We will also use the features_and_labels
function we implemented in order for our dataset to produce tuples of features and labels.
In [ ]:
# TODO 4b
def create_dataset(pattern, batch_size):
dataset = tf.data.experimental.make_csv_dataset(
pattern, batch_size, CSV_COLUMNS, DEFAULTS)
return dataset.map(features_and_labels)
Let's test that our batches are of the right size:
In [ ]:
BATCH_SIZE = 2
tempds = create_dataset('../data/taxi-train*', batch_size=2)
for X_batch, Y_batch in tempds.take(2):
pprint({k: v.numpy() for k, v in X_batch.items()})
print(Y_batch.numpy(), "\n")
assert len(Y_batch) == BATCH_SIZE
When training a deep learning model in batches over multiple workers, it is helpful if we shuffle the data. That way, different workers will be working on different parts of the input file at the same time, and so averaging gradients across workers will help. Also, during training, we will need to read the data indefinitely.
Let's refactor our create_dataset
function so that it shuffles the data, when the dataset is used for training.
We will introduce a additional argument mode
to our function to allow the function body to distinguish the case
when it needs to shuffle the data (mode == "train"
) from when it shouldn't (mode == "eval"
).
Also, before returning we will want to prefetch 1 data point ahead of time (dataset.prefetch(1)
) to speedup training:
In [ ]:
# TODO 4c
def create_dataset(pattern, batch_size=1, mode="eval"):
dataset = tf.data.experimental.make_csv_dataset(
pattern, batch_size, CSV_COLUMNS, DEFAULTS)
dataset = dataset.map(features_and_labels).cache()
if mode == "train":
dataset = dataset.shuffle(1000).repeat()
# take advantage of multi-threading; 1=AUTOTUNE
dataset = dataset.prefetch(1)
return dataset
Let's check that our function work well in both modes:
In [ ]:
tempds = create_dataset('../data/taxi-train*', 2, "train")
print(list(tempds.take(1)))
In [ ]:
tempds = create_dataset('../data/taxi-valid*', 2, "eval")
print(list(tempds.take(1)))
In the next notebook, we will build the model using this input pipeline.
Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.