Image Captioning with Attention in Tensorflow 2.0

This notebook modifies the Image Captioning with Attention Tensorflow 2.0 notebook to work with kubeflow pipelines. This pipeline creates a model that can caption an image.

Before running notebook:

Make sure you completed the setup instructions in the README (including creating the base image).

Install Kubeflow pipelines

Install the kfp package if you haven't already.


In [ ]:
!pip3 install kfp --upgrade

Activate service account credentials

This allows for using gsutil from the notebook to upload the dataset to GCS.


In [ ]:
!gcloud auth activate-service-account --key-file=${GOOGLE_APPLICATION_CREDENTIALS}

Download dataset and upload to GCS

First, you have to download the MS COCO dataset. This sample uses both the 2014 train images and 2014 train/val annotations. The following cells download a small subset (<1000 imgs) of the dataset and the annotations to the GCS bucket specified below with GCS_DATASET_PATH.


In [ ]:
# Location to download dataset and put onto GCS (should be associated
# with Kubeflow project)
GCS_BUCKET = 'gs://[YOUR-BUCKET-NAME]'
GCS_DATASET_PATH = GCS_BUCKET + '/ms-coco'

Download images

Downloads images to ${GCS_DATASET_PATH}/train2014/train2014


In [ ]:
# Download images (use -x to ignore ~99% of images)
!gsutil -m rsync -x ".*0\.jpg|.*1\.jpg|.*2\.jpg|.*3\.jpg|.*4\.jpg|.*5\.jpg|.*6\.jpg|.*7\.jpg|.*8\.jpg|.*09\.jpg|.*19\.jpg|.*29\.jpg|.*39\.jpg|.*49\.jpg|.*59\.jpg|.*69\.jpg|.*79\.jpg|.*89\.jpg" gs://images.cocodataset.org/train2014 {GCS_DATASET_PATH}/train2014/train2014

# To download the entire dataset uncomment and use the following command instead
# !gsutil -m rsync gs://images.cocodataset.org/train2014 {GCS_DATASET_PATH}/train2014/train2014

Download annotations

For some reason MS COCO blocks using gsutil with the annotations (GitHub issue here). You can work around this by downloading it locally, and then uploading it to GCS.


In [ ]:
# Download to local, upload to GCS, then delete local download
!wget http://images.cocodataset.org/annotations/annotations_trainval2014.zip
!unzip annotations_trainval2014.zip -d annotations_trainval2014
!gsutil -m cp -r annotations_trainval2014 {GCS_DATASET_PATH}
!rm -r annotations_trainval2014
!rm annotations_trainval2014.zip

Setup project info and imports


In [ ]:
# Kubeflow project settings
PROJECT_NAME = '[YOUR-PROJECT-NAME]' 
PIPELINE_STORAGE_PATH = GCS_BUCKET + '/ms-coco/components' # path to save pipeline component images
BASE_IMAGE = 'gcr.io/%s/img-cap:latest' % PROJECT_NAME # using image created in README instructions

# Target images for creating components
PREPROCESS_IMG = 'gcr.io/%s/ms-coco/preprocess:latest' % PROJECT_NAME
TOKENIZE_IMG = 'gcr.io/%s/ms-coco/tokenize:latest' % PROJECT_NAME
TRAIN_IMG = 'gcr.io/%s/ms-coco/train:latest' % PROJECT_NAME
PREDICT_IMG = 'gcr.io/%s/ms-coco/predict:latest' % PROJECT_NAME

In [ ]:
import kfp
import kfp.dsl as dsl
from kfp import compiler

Create pipeline components

Data preprocessing component

This component takes num_examples images from dataset_path and feeds them through the deep CNN inceptionV3 (without the head). The model outputs a tensor of shape (64 x 2048) that represents (2048) features obtained after dividing the image into an 8x8 (64) grid. The resulting model outputs are stored in OUTPUT_DIR.


In [ ]:
@dsl.python_component(
    name='img_data_preprocessing',
    description='preprocesses images with inceptionV3',
    base_image=BASE_IMAGE
)
def preprocess(dataset_path: str, num_examples: int, OUTPUT_DIR: str, 
        batch_size: int) -> str:
    import json
    import numpy as np
    import tensorflow as tf
    from tensorflow.python.lib.io import file_io
    from sklearn.utils import shuffle
    
    if OUTPUT_DIR == 'default':
        OUTPUT_DIR = dataset_path + '/preprocess/'
    
    annotation_file = dataset_path + '/annotations_trainval2014/annotations/captions_train2014.json'
    PATH = dataset_path + '/train2014/train2014/'
    files_downloaded = tf.io.gfile.listdir(PATH)
    
    # Read the json file (CHANGE open() TO file_io.FileIO to use GCS)
    with file_io.FileIO(annotation_file, 'r') as f:
        annotations = json.load(f)

    # Store captions and image names in vectors
    all_captions = []
    all_img_name_vector = []
    
    print('Determining which images are in storage...')
    for annot in annotations['annotations']:
        caption = '<start> ' + annot['caption'] + ' <end>'
        image_id = annot['image_id']
        img_name = 'COCO_train2014_' + '%012d.jpg' % (image_id)
        full_coco_image_path = PATH + img_name
        
        if img_name in files_downloaded: # Only have subset
            all_img_name_vector.append(full_coco_image_path)
            all_captions.append(caption)

    # Shuffle captions and image_names together
    train_captions, img_name_vector = shuffle(all_captions,
                                              all_img_name_vector,
                                              random_state=1)

    # Select the first num_examples captions/imgs from the shuffled set
    train_captions = train_captions[:num_examples]
    img_name_vector = img_name_vector[:num_examples]
    

    
    # Preprocess the images before feeding into inceptionV3
    def load_image(image_path):
        img = tf.io.read_file(image_path)
        img = tf.image.decode_jpeg(img, channels=3)
        img = tf.image.resize(img, (299, 299))
        img = tf.keras.applications.inception_v3.preprocess_input(img)
        return img, image_path
    
    # Create model for processing images 
    image_model = tf.keras.applications.InceptionV3(include_top=False,
                                                weights='imagenet')
    new_input = image_model.input
    hidden_layer = image_model.layers[-1].output
    image_features_extract_model = tf.keras.Model(new_input, hidden_layer)
    
    # Save extracted features in GCS
    print('Extracting features from images...')
    
    # Get unique images
    encode_train = sorted(set(img_name_vector))
    
    image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
    image_dataset = image_dataset.map(
        load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(batch_size)
    
    for img, path in image_dataset:
        batch_features = image_features_extract_model(img)
        batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))

        for bf, p in zip(batch_features, path):
            path_of_feature = p.numpy().decode("utf-8")
            
            # Save to a different location and as numpy array
            path_of_feature = path_of_feature.replace('.jpg', '.npy')
            path_of_feature = path_of_feature.replace(PATH, OUTPUT_DIR)
            np.save(file_io.FileIO(path_of_feature, 'w'), bf.numpy())
    
    # Create array for locations of preprocessed images
    preprocessed_imgs = [img.replace('.jpg', '.npy') for img in img_name_vector]
    preprocessed_imgs = [img.replace(PATH, OUTPUT_DIR) for img in preprocessed_imgs]
    
    # Save train_captions and preprocessed_imgs to file
    train_cap_path = OUTPUT_DIR + 'train_captions.npy' # array of captions
    preprocessed_imgs_path = OUTPUT_DIR + 'preprocessed_imgs.py'# array of paths to preprocessed images
    
    train_captions = np.array(train_captions)
    np.save(file_io.FileIO(train_cap_path, 'w'), train_captions)
    
    preprocessed_imgs = np.array(preprocessed_imgs)
    np.save(file_io.FileIO(preprocessed_imgs_path, 'w'), preprocessed_imgs)
    
    return (train_cap_path, preprocessed_imgs_path)

In [ ]:
preprocessing_img_op = compiler.build_python_component(
    component_func=preprocess,
    staging_gcs_path=PIPELINE_STORAGE_PATH,
    base_image=BASE_IMAGE,
    dependency=[kfp.compiler.VersionedDependency(name='scikit-learn', version='0.21.2')],
    target_image=PREPROCESS_IMG)

Tokenizing component

This component takes the training captions from the previous step and tokenizes them to convert them into numerical values so that they can be fed into the model as input. It outputs the tokenized captions in OUTPUT_DIR.


In [ ]:
@dsl.python_component(
    name='tokenize_captions',
    description='Tokenize captions to create training data',
    base_image=BASE_IMAGE
)
def tokenize_captions(dataset_path: str, preprocess_output: str, OUTPUT_DIR: str,
        top_k: int) -> str:
    import pickle
    import tensorflow as tf
    import numpy as np
    from tensorflow.python.lib.io import file_io
    from io import BytesIO
    from ast import literal_eval as make_tuple
    
    # Convert output from string to tuple and unpack
    preprocess_output = make_tuple(preprocess_output)
    train_caption_path = preprocess_output[0]
    
    if OUTPUT_DIR == 'default':
        OUTPUT_DIR = dataset_path + '/tokenize/'
    
    tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
    f = BytesIO(file_io.read_file_to_string(train_caption_path, 
                                            binary_mode=True))
    train_captions = np.load(f)
    
    # Tokenize captions
    tokenizer.fit_on_texts(train_captions)
    train_seqs = tokenizer.texts_to_sequences(train_captions)
    tokenizer.word_index['<pad>'] = 0
    tokenizer.index_word[0] = '<pad>'
    
    cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')
    
    # Find the maximum length of any caption in our dataset
    def calc_max_length(tensor):
        return max(len(t) for t in tensor)
    
    max_length = calc_max_length(train_seqs)
    
    # Save tokenizer
    tokenizer_file_path = OUTPUT_DIR + 'tokenizer.pickle'
    with file_io.FileIO(tokenizer_file_path, 'wb') as output:
        pickle.dump(tokenizer, output, protocol=pickle.HIGHEST_PROTOCOL)
        
    # Save train_seqs
    cap_vector_file_path = OUTPUT_DIR + 'cap_vector.npy'
    np.save(file_io.FileIO(cap_vector_file_path, 'w'), cap_vector)
    
    return str(max_length), tokenizer_file_path, cap_vector_file_path

In [ ]:
tokenize_captions_op = compiler.build_python_component(
    component_func=tokenize_captions,
    staging_gcs_path=PIPELINE_STORAGE_PATH,
    base_image=BASE_IMAGE,
    target_image=TOKENIZE_IMG)

Component for training model (and saving it)

This component trains the model by creating a tf.data.Dataset from the captions and preprocessed images. The trained model is saved in train_output_dir/checkpoints/. The training loss is plotted in tensorboard. There are various parameters of the model(s) that can be tuned, but it uses the values from the original notebook by default.


In [ ]:
@dsl.python_component(
    name='model_training',
    description='Trains image captioning model',
    base_image=BASE_IMAGE
)
def train_model(dataset_path: str, preprocess_output: str, 
        tokenizing_output: str, train_output_dir: str, valid_output_dir: str, 
        batch_size: int, embedding_dim: int, units: int, EPOCHS: int)-> str:
    import json
    import time
    import pickle
    import models
    import numpy as np
    import tensorflow as tf
    from io import BytesIO
    from datetime import datetime
    from sklearn.model_selection import train_test_split
    from tensorflow.python.lib.io import file_io
    from ast import literal_eval as make_tuple
    
    # Convert output from string to tuple and unpack
    preprocess_output = make_tuple(preprocess_output)
    tokenizing_output = make_tuple(tokenizing_output)
    
    # Unpack tuples
    preprocessed_imgs_path = preprocess_output[1]
    tokenizer_path = tokenizing_output[1]
    cap_vector_file_path = tokenizing_output[2]
    
    if valid_output_dir == 'default':
        valid_output_dir = dataset_path + '/valid/'
    
    if train_output_dir == 'default':
        train_output_dir = dataset_path + '/train/'
    
    # load img_name_vector
    f = BytesIO(file_io.read_file_to_string(preprocessed_imgs_path, binary_mode=True))
    img_name_vector = np.load(f)
    
    # Load cap_vector
    f = BytesIO(file_io.read_file_to_string(cap_vector_file_path, binary_mode=True))
    cap_vector = np.load(f)
    
    # Load tokenizer
    with file_io.FileIO(tokenizer_path, 'rb') as src:
        tokenizer = pickle.load(src)
    
    # Split data into training and testing
    img_name_train, img_name_val, cap_train, cap_val = train_test_split(
                                                            img_name_vector,
                                                            cap_vector,
                                                            test_size=0.2,
                                                            random_state=0)
    
    # Create tf.data dataset for training
    BUFFER_SIZE = 1000 # common size used for shuffling dataset
    vocab_size = len(tokenizer.word_index) + 1
    num_steps = len(img_name_train) // batch_size
    
    # Shape of the vector extracted from InceptionV3 is (64, 2048)
    features_shape = 2048
    
    # Load the numpy files
    def map_func(img_name, cap):
        f = BytesIO(file_io.read_file_to_string(img_name.decode('utf-8'), binary_mode=True))
        img_tensor = np.load(f)
        return img_tensor, cap
    
    dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))

    # Use map to load the numpy files in parallel
    dataset = dataset.map(lambda item1, item2: tf.numpy_function(
              map_func, [item1, item2], [tf.float32, tf.int32]),
              num_parallel_calls=tf.data.experimental.AUTOTUNE)

    # Shuffle and batch
    dataset = dataset.shuffle(BUFFER_SIZE).batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    
    # get models from models.py
    encoder = models.CNN_Encoder(embedding_dim)
    decoder = models.RNN_Decoder(embedding_dim, units, vocab_size)
    
    optimizer = tf.keras.optimizers.Adam()
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction='none')
    
    # Create loss function
    def loss_function(real, pred):
        mask = tf.math.logical_not(tf.math.equal(real, 0))
        loss_ = loss_object(real, pred)

        mask = tf.cast(mask, dtype=loss_.dtype)
        loss_ *= mask

        return tf.reduce_mean(loss_)
    
    # Create check point for training model
    ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer = optimizer)
    ckpt_manager = tf.train.CheckpointManager(ckpt, train_output_dir + 'checkpoints/', max_to_keep=5)
    start_epoch = 0
    if ckpt_manager.latest_checkpoint:
        start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])
            
    # Create training step
    loss_plot = []
    @tf.function
    def train_step(img_tensor, target):
        loss = 0

        # initializing the hidden state for each batch
        # because the captions are not related from image to image
        hidden = decoder.reset_state(batch_size=target.shape[0])

        dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * batch_size, 1)

        with tf.GradientTape() as tape:
            features = encoder(img_tensor)

            for i in range(1, target.shape[1]):
                # passing the features through the decoder
                predictions, hidden, _ = decoder(dec_input, features, hidden)

                loss += loss_function(target[:, i], predictions)

                # using teacher forcing
                dec_input = tf.expand_dims(target[:, i], 1)

        total_loss = (loss / int(target.shape[1]))

        trainable_variables = encoder.trainable_variables + decoder.trainable_variables

        gradients = tape.gradient(loss, trainable_variables)

        optimizer.apply_gradients(zip(gradients, trainable_variables))

        return loss, total_loss
    
    # Create summary writers and loss for plotting loss in tensorboard
    tensorboard_dir = train_output_dir + 'logs/' + datetime.now().strftime("%Y%m%d-%H%M%S")
    train_summary_writer = tf.summary.create_file_writer(tensorboard_dir)
    train_loss = tf.keras.metrics.Mean('train_loss', dtype=tf.float32)
    
    # Train model
    path_to_most_recent_ckpt = None
    for epoch in range(start_epoch, EPOCHS):
        start = time.time()
        total_loss = 0

        for (batch, (img_tensor, target)) in enumerate(dataset):
            batch_loss, t_loss = train_step(img_tensor, target)
            total_loss += t_loss
            train_loss(t_loss)
            if batch % 100 == 0:
                print ('Epoch {} Batch {} Loss {:.4f}'.format(
                  epoch + 1, batch, batch_loss.numpy() / int(target.shape[1])))
        
        
        
        # Storing the epoch end loss value to plot in tensorboard
        with train_summary_writer.as_default():
            tf.summary.scalar('loss per epoch', train_loss.result(), step=epoch)
        
        train_loss.reset_states()
        
        if epoch % 5 == 0:
            path_to_most_recent_ckpt = ckpt_manager.save()

        print ('Epoch {} Loss {:.6f}'.format(epoch + 1,
                                             total_loss/num_steps))
        print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))
    
    # Add plot of loss in tensorboard
    metadata ={
        'outputs': [{
            'type': 'tensorboard',
            'source': tensorboard_dir,
        }]
    }
    with open('/mlpipeline-ui-metadata.json', 'w') as f:
        json.dump(metadata, f)
    
    # Save validation data to use for predictions
    val_cap_path = valid_output_dir + 'captions.npy'
    np.save(file_io.FileIO(val_cap_path, 'w'), cap_val)
    
    val_img_path = valid_output_dir + 'images.npy'
    np.save(file_io.FileIO(val_img_path, 'w'), img_name_val)
    
    return path_to_most_recent_ckpt, val_cap_path, val_img_path

In [ ]:
model_train_op = compiler.build_python_component(
    component_func=train_model,
    staging_gcs_path=PIPELINE_STORAGE_PATH,
    base_image=BASE_IMAGE,
    dependency=[kfp.compiler.VersionedDependency(name='scikit-learn', version='0.21.2')],
    target_image=TRAIN_IMG)

Component for model prediction

This component uses the model to predict on a new image. It prints the predicted and real caption in the logs and outputs the first 10 attention images with captions in tensorboard. (Currently Kubeflow only supports up to 10 outputs Tensorboard)


In [ ]:
@dsl.python_component(
    name='model_predictions',
    description='Predicts on images in validation set',
    base_image=BASE_IMAGE
)
def predict(dataset_path: str, tokenizing_output: str, 
        model_train_output: str, preprocess_output_dir: str, 
        valid_output_dir: str, embedding_dim: int, units: int):
    import pickle
    import json
    import models
    import matplotlib.pyplot as plt
    import numpy as np
    import tensorflow as tf
    from datetime import datetime
    from io import BytesIO
    from tensorflow.python.lib.io import file_io
    from ast import literal_eval as make_tuple
    
    tokenizing_output = make_tuple(tokenizing_output)
    model_train_output = make_tuple(model_train_output)
    
    # Unpack tuples
    max_length = int(tokenizing_output[0])
    tokenizer_path = tokenizing_output[1]
    model_path = model_train_output[0]
    val_cap_path = model_train_output[1]
    val_img_path = model_train_output[2]
    
    if preprocess_output_dir == 'default':
        preprocess_output_dir = dataset_path + '/preprocess/'
    
    if valid_output_dir == 'default':
        valid_output_dir = dataset_path + '/valid/'
        
    tensorboard_dir = valid_output_dir + 'logs' + datetime.now().strftime("%Y%m%d-%H%M%S")
    summary_writer = tf.summary.create_file_writer(tensorboard_dir)

    # Load tokenizer, model, test_captions, and test_imgs
    
    # Load tokenizer
    with file_io.FileIO(tokenizer_path, 'rb') as src:
        tokenizer = pickle.load(src)
    
    vocab_size = len(tokenizer.word_index) + 1
    
    # Shape of the vector extracted from InceptionV3 is (64, 2048)
    attention_features_shape = 64
    features_shape = 2048
    
    encoder = models.CNN_Encoder(embedding_dim)
    decoder = models.RNN_Decoder(embedding_dim, units, vocab_size)
    
    # Load model from checkpoint (encoder, decoder)
    optimizer = tf.keras.optimizers.Adam()
    ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder, optimizer=optimizer)
    ckpt.restore(model_path).expect_partial()
    
    # Load test captions
    f = BytesIO(file_io.read_file_to_string(val_cap_path, 
                                            binary_mode=True))
    cap_val = np.load(f)
    
    # load test images
    f = BytesIO(file_io.read_file_to_string(val_img_path, 
                                            binary_mode=True))
    img_name_val = np.load(f)
    
    # To get original image locations, replace .npy extension with .jpg and 
    # replace preprocessed path with path original images
    PATH = dataset_path + '/train2014/train2014/'
    img_name_val = [img.replace('.npy', '.jpg') for img in img_name_val]
    img_name_val = [img.replace(preprocess_output_dir, PATH) for img in img_name_val]
    
    image_model = tf.keras.applications.InceptionV3(include_top=False,
                                                weights='imagenet')
    new_input = image_model.input
    hidden_layer = image_model.layers[-1].output

    image_features_extract_model = tf.keras.Model(new_input, hidden_layer)
    
    # Preprocess the images using InceptionV3
    def load_image(image_path):
        img = tf.io.read_file(image_path)
        img = tf.image.decode_jpeg(img, channels=3)
        img = tf.image.resize(img, (299, 299))
        img = tf.keras.applications.inception_v3.preprocess_input(img)
        return img, image_path
    
    # Run predictions
    def evaluate(image):
        attention_plot = np.zeros((max_length, attention_features_shape))

        hidden = decoder.reset_state(batch_size=1)

        temp_input = tf.expand_dims(load_image(image)[0], 0)
        img_tensor_val = image_features_extract_model(temp_input)
        img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3]))

        features = encoder(img_tensor_val)

        dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0)
        result = []

        for i in range(max_length):
            predictions, hidden, attention_weights = decoder(dec_input, features, hidden)

            attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()

            predicted_id = tf.argmax(predictions[0]).numpy()
            result.append(tokenizer.index_word[predicted_id])

            if tokenizer.index_word[predicted_id] == '<end>':
                return result, attention_plot

            dec_input = tf.expand_dims([predicted_id], 0)

        attention_plot = attention_plot[:len(result), :]
        return result, attention_plot
    
    # Modified to plot images on tensorboard
    def plot_attention(image, result, attention_plot):
        img = tf.io.read_file(image)
        img = tf.image.decode_jpeg(img, channels=3)
        temp_image = np.array(img.numpy())
        
        len_result = len(result)
        for l in range(min(len_result, 10)): # Tensorboard only supports 10 imgs
            temp_att = np.resize(attention_plot[l], (8, 8))
            plt.title(result[l])
            img = plt.imshow(temp_image)
            plt.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent())
            
            # Save plt to image to access in tensorboard
            buf = BytesIO()
            plt.savefig(buf, format='png')
            buf.seek(0)
            
            final_im = tf.image.decode_png(buf.getvalue(), channels=4)
            final_im = tf.expand_dims(final_im, 0)
            with summary_writer.as_default():
                tf.summary.image("attention", final_im, step=l)
    
    # Select a random image to caption from validation set
    rid = np.random.randint(0, len(img_name_val))
    image = img_name_val[rid]
    real_caption = ' '.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]])
    result, attention_plot = evaluate(image)
    print ('Image:', image)
    print ('Real Caption:', real_caption)
    print ('Prediction Caption:', ' '.join(result))
    plot_attention(image, result, attention_plot)
    
    # Plot attention images on tensorboard
    metadata = {
        'outputs': [{
            'type': 'tensorboard',
            'source': tensorboard_dir,
        }]
    }
    with open('/mlpipeline-ui-metadata.json', 'w') as f:
        json.dump(metadata, f)

In [ ]:
predict_op = compiler.build_python_component(
    component_func=predict,
    staging_gcs_path=PIPELINE_STORAGE_PATH,
    base_image=BASE_IMAGE,
    dependency=[kfp.compiler.VersionedDependency(name='matplotlib', version='3.1.0')],
    target_image=PREDICT_IMG)

Create and run pipeline

Create pipeline

The pipeline parameters are specified below in the caption pipeline function signature. Using the value 'default' for the output directories saves them in a subdirectory of GCS_DATASET_PATH.

Requirements


In [ ]:
@dsl.pipeline(
    name='Image Captioning Pipeline',
    description='A pipeline that trains a model to caption images'
)
def caption_pipeline(
    dataset_path=GCS_DATASET_PATH,
    num_examples=30000,
    epochs=20,
    training_batch_size=64,
    hidden_state_size=512,
    vocab_size=5000,
    embedding_dim=256,
    preprocessing_batch_size=16,
    preprocessing_output_dir='default',
    tokenizing_output_dir='default',
    training_output_dir='default',
    validation_output_dir='default',
    ): 
    
    preprocessing_img_task = preprocessing_img_op(
        dataset_path, 
        output_dir=preprocessing_output_dir,
        batch_size=preprocessing_batch_size, 
        num_examples=num_examples)
    
    tokenize_captions_task = tokenize_captions_op(
        dataset_path, 
        preprocessing_img_task.output, 
        output_dir=tokenizing_output_dir, 
        top_k=vocab_size)
    
    model_train_task = model_train_op(
        dataset_path, 
        preprocessing_img_task.output,
        tokenize_captions_task.output,
        train_output_dir=training_output_dir, 
        valid_output_dir=validation_output_dir,
        batch_size=training_batch_size, 
        embedding_dim=embedding_dim, 
        units=hidden_state_size, 
        epochs=epochs)
    
    predict_task = predict_op(
        dataset_path,
        tokenize_captions_task.output, 
        model_train_task.output,
        preprocess_output_dir=preprocessing_output_dir,
        valid_output_dir=validation_output_dir,
        embedding_dim=embedding_dim,
        units=hidden_state_size)

    # The pipeline should be able to authenticate to GCP.
    # Refer to [Authenticating Pipelines to GCP](https://www.kubeflow.org/docs/gke/authentication-pipelines/) for details.
    #
    # For example, you may uncomment the following lines to use GSA keys.
    # from kfp.gcp import use_gcp_secret
    # kfp.dsl.get_pipeline_conf().add_op_transformer(use_gcp_secret('user-gcp-sa'))

Run pipeline


In [ ]:
# Test run to make sure all parts of the pipeline are working properly
arguments = {
    'dataset_path': GCS_DATASET_PATH, 
    'num_examples': 100, # Small test to make sure pipeline functions properly
    'training_batch_size': 16, # has to be smaller since only training on 80/100 examples 
}

kfp.Client().create_run_from_pipeline_func(pipeline, arguments=arguments)

Model checkpoints are saved at training_output_dir, which is [GCS_DATASET_PATH]/train/checkpoints/ by default.