In [ ]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Overview

This notebook illustrates the new feature of serving custom model prediction code on AI Platform. It allows us to execute arbitrary python pre-processing code prior to invoking a model, as well as post-processing on the produced predictions. In addition, you can use a model build by your favourite Python-based ML framework!

This is all done server-side so that the client can pass data directly to AI Platform Serving in the unprocessed state.

We will take advantage of this for text classification because it involves pre-processing that is not easily accomplished using native TensorFlow. Instead we will execute the the non TensorFlow pre-processing via python code on the server side.

We will build a text classification model using PyTorch, while performing text preproessing using Keras. PyTorch is an open source deep learning platform that provides a seamless path from research prototyping to production deployment.

Dataset

Hacker News is one of many public datasets available in BigQuery. This dataset includes titles of articles from several data sources. For the following tutorial, we extracted the titles that belong to either GitHub, The New York Times, or TechCrunch, and saved them as CSV files in a publicly shared Cloud Storage bucket at the following location: gs://cloud-training-demos/blogs/CMLE_custom_prediction

Objective

The goal of this tutorial is to:

  1. Process the data for text classification.
  2. Train a PyTorch Text Classifier (locally).
  3. Deploy the PyTorch Text Classifier, along with the preprocessing artifacts, to AI Platform Serving, using the Custom Online Prediction code.

This tutorial focuses more on using this model with AI Platform Serving than on the design of the text classification model itself. For more details about text classification, please refer to Google developer's Guide to Text Classification.

Costs

This tutorial uses billable components of Google Cloud Platform (GCP):

  1. AI Platform Serving (Cloud Machine Learning Engine)
  2. Cloud Storage Learn about AI Platform pricing and Cloud Storage pricing, and use the Pricing Calculator to generate a cost estimate based on your projected usage.

If you are using AI Platform Notebooks, your environment is already authenticated. Skip this step.


In [ ]:
try:
 from google.colab import auth
 auth.authenticate_user()
except:
 pass

Setup


In [ ]:
%load_ext autoreload
%autoreload 2

In [ ]:
!pip install tensorflow==1.15.2 torch --user

In [ ]:
import tensorflow as tf
import torch
import os

print(tf.__version__) 
print(torch.__version__)

In [ ]:
PROJECT='' # TODO (Set to your GCP Project name)
BUCKET = '' # TODO (Set to your GCS Bucket name)
ROOT='torch_text_classification'
MODEL_DIR=os.path.join(ROOT,'models')
PACKAGES_DIR=os.path.join(ROOT,'packages')

In [ ]:
# Delete any previous artefacts from Google Cloud Storage
!gsutil rm -r gs://{BUCKET}/{ROOT}

In [ ]:
!gcloud config set project {PROJECT}

Download and Explore Data


In [ ]:
%%bash
gsutil cp gs://cloud-training-demos/blogs/CMLE_custom_prediction/keras_text_pre_processing/train.tsv .
gsutil cp gs://cloud-training-demos/blogs/CMLE_custom_prediction/keras_text_pre_processing/eval.tsv .

In [ ]:
!head eval.tsv

Preprocessing

Pre-processing class to be used in both training and serving


In [ ]:
%%writefile preprocess.py

from tensorflow.python.keras.preprocessing import sequence
from tensorflow.keras.preprocessing import text


class TextPreprocessor(object):
    def __init__(self, vocab_size, max_sequence_length):
        self._vocabb_size = vocab_size
        self._max_sequence_length = max_sequence_length
        self._tokenizer = None

    def fit(self, text_list):        
        # Create vocabulary from input corpus.
        tokenizer = text.Tokenizer(num_words=self._vocabb_size)
        tokenizer.fit_on_texts(text_list)
        self._tokenizer = tokenizer

    def transform(self, text_list):        
        # Transform text to sequence of integers
        text_sequence = self._tokenizer.texts_to_sequences(text_list)
        # Fix sequence length to max value. Sequences shorter than the length are
        # padded in the beginning and sequences longer are truncated
        # at the beginning.
        padded_text_sequence = sequence.pad_sequences(
          text_sequence, maxlen=self._max_sequence_length)
        return padded_text_sequence

Test Prepocessing Locally


In [ ]:
from preprocess import TextPreprocessor

processor = TextPreprocessor(5, 5)
processor.fit(['hello machine learning'])
processor.transform(['hello machine learning'])

Model Creation

Metadata


In [ ]:
CLASSES = {'github': 0, 'nytimes': 1, 'techcrunch': 2}  # label-to-int mapping
NUM_CLASSES = 3
VOCAB_SIZE = 20000  # Limit on the number vocabulary size used for tokenization
MAX_SEQUENCE_LENGTH = 50  # Sentences will be truncated/padded to this length

Prepare data for training and evaluation


In [ ]:
import pandas as pd
import numpy as np
from preprocess import TextPreprocessor

def load_data(train_data_path, eval_data_path):
    # Parse CSV using pandas
    column_names = ('label', 'text')
    
    df_train = pd.read_csv(train_data_path, names=column_names, sep='\t')
    df_train = df_train.sample(frac=1)
    
    df_eval = pd.read_csv(eval_data_path, names=column_names, sep='\t')

    return ((list(df_train['text']), np.array(df_train['label'].map(CLASSES))),
            (list(df_eval['text']), np.array(df_eval['label'].map(CLASSES))))


((train_texts, train_labels), (eval_texts, eval_labels)) = load_data(
       'train.tsv', 'eval.tsv')

# Create vocabulary from training corpus.
processor = TextPreprocessor(VOCAB_SIZE, MAX_SEQUENCE_LENGTH)
processor.fit(train_texts)

# Preprocess the data
train_texts_vectorized = processor.transform(train_texts)
eval_texts_vectorized = processor.transform(eval_texts)

Build the model


In [ ]:
%%writefile torch_model.py

import torch
import torch.nn as nn
import torch.nn.functional as F

class TorchTextClassifier(nn.Module):
    
    def __init__(self, vocab_size, embedding_dim, seq_length, num_classes, 
                 num_filters, kernel_size, pool_size, dropout_rate):
        super(TorchTextClassifier, self).__init__()

        self.embeddings = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        
        self.conv1 = nn.Conv1d(seq_length, num_filters, kernel_size)
        self.max_pool1 = nn.MaxPool1d(pool_size)
        self.conv2 = nn.Conv1d(num_filters, num_filters*2, kernel_size)
        
        self.dropout = nn.Dropout(dropout_rate)
        self.dense = nn.Linear(num_filters*2, num_classes)
        

    def forward(self, x):
        
        x = self.embeddings(x)
        x = self.dropout(x)

        x = self.conv1(x)
        x = F.relu(x)
        x = self.max_pool1(x)
        
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool1d(x, x.size()[2]).squeeze(2)
        
        x = self.dropout(x)
        x = self.dense(x)
        x = F.softmax(x, 1)
        return x

Train and save the model


In [ ]:
import torch
from torch.autograd import Variable
import torch.nn.functional as F

LEARNING_RATE=.001
FILTERS=64
DROPOUT_RATE=0.2
EMBEDDING_DIM=200
KERNEL_SIZE=3
POOL_SIZE=3

NUM_EPOCH=1
BATCH_SIZE=128

train_size = len(train_texts)
steps_per_epoch = int(len(train_labels)/BATCH_SIZE)

print("Train size: {}".format(train_size))
print("Batch size: {}".format(BATCH_SIZE))
print("Number of epochs: {}".format(NUM_EPOCH))
print("Steps per epoch: {}".format(steps_per_epoch))
print("Vocab Size: {}".format(VOCAB_SIZE))
print("Embed Dimensions: {}".format(EMBEDDING_DIM))
print("Sequence Length: {}".format(MAX_SEQUENCE_LENGTH))
print("")


def get_batch(step):
    start_index = step*BATCH_SIZE
    end_index = start_index + BATCH_SIZE
    x = Variable(torch.Tensor(train_texts_vectorized[start_index:end_index]).long())
    y = Variable(torch.Tensor(train_labels[start_index:end_index]).long())
    return x, y


from torch_model import TorchTextClassifier

model = TorchTextClassifier(VOCAB_SIZE, 
                            EMBEDDING_DIM, 
                            MAX_SEQUENCE_LENGTH, 
                            NUM_CLASSES, 
                            FILTERS, 
                            KERNEL_SIZE, 
                            POOL_SIZE, 
                            DROPOUT_RATE)

model.train()
loss_metric = F.cross_entropy
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

for epoch in range(NUM_EPOCH):
    for step in range(steps_per_epoch):
        x, y = get_batch(step)
        optimizer.zero_grad()
        y_pred = model(x)
        loss = loss_metric(y_pred, y) 
        loss.backward()
        optimizer.step()
        if step % 50 == 0:
            print('Batch [{}/{}] Loss: {}'.format(step+1, steps_per_epoch, round(loss.item(),5)))
    print('Epoch [{}/{}] Loss: {}'.format(epoch+1, NUM_EPOCH, round(loss.item(),5)))
print('Final Loss: {}'.format(epoch+1, NUM_EPOCH, round(loss.item(),5)))

torch.save(model, 'torch_saved_model.pt')

Save pre-processing object

We need to save this so the same tokenizer used at training can be used to pre-process during serving


In [ ]:
import pickle
with open('./processor_state.pkl', 'wb') as f:
    pickle.dump(processor, f)

Custom Model Prediction Preparation

Copy model and pre-processing object to GCS


In [ ]:
!gsutil cp torch_saved_model.pt gs://{BUCKET}/{MODEL_DIR}/
!gsutil cp processor_state.pkl gs://{BUCKET}/{MODEL_DIR}/

Define Model Class


In [ ]:
%%writefile model_prediction.py

import os
import pickle
import numpy as np
import torch
from torch.autograd import Variable


class CustomModelPrediction(object):
    def __init__(self, model, processor):
        self._model = model
        self._processor = processor

    def _postprocess(self, predictions):
        labels = ['github', 'nytimes', 'techcrunch']
        label_indexes = [np.argmax(prediction) 
                             for prediction in predictions.detach().numpy()]
        return [labels[label_index] for label_index in label_indexes]


    def predict(self, instances, **kwargs):
        preprocessed_data = self._processor.transform(instances)
        predictions =  self._model(Variable(torch.Tensor(preprocessed_data).long()))
        labels = self._postprocess(predictions)
        return labels


    @classmethod
    def from_path(cls, model_dir):
        import torch 
        import torch_model
        model = torch.load(os.path.join(model_dir,'torch_saved_model.pt'))
        model.eval()
        with open(os.path.join(model_dir, 'processor_state.pkl'), 'rb') as f:
            processor = pickle.load(f)
        return cls(model, processor)

Test Model Class Locally


In [ ]:
# Headlines for Predictions

techcrunch=[
  'Uber shuts down self-driving trucks unit',
  'Grover raises €37M Series A to offer latest tech products as a subscription',
  'Tech companies can now bid on the Pentagon’s $10B cloud contract'
]
nytimes=[
  '‘Lopping,’ ‘Tips’ and the ‘Z-List’: Bias Lawsuit Explores Harvard’s Admissions',
  'A $3B Plan to Turn Hoover Dam into a Giant Battery',
  'A MeToo Reckoning in China’s Workplace Amid Wave of Accusations'
]
github=[
  'Show HN: Moon – 3kb JavaScript UI compiler',
  'Show HN: Hello, a CLI tool for managing social media',
  'Firefox Nightly added support for time-travel debugging'
]
requests = (techcrunch+nytimes+github)

In [ ]:
from model_prediction import CustomModelPrediction

model = CustomModelPrediction.from_path('.')
model.predict(requests)

Package up files and copy to GCS


In [ ]:
%%writefile setup.py

from setuptools import setup

REQUIRED_PACKAGES = ['keras']

setup(
  name="text_classification",
  version="0.1",
  scripts=["preprocess.py", "model_prediction.py", "torch_model.py"],
  include_package_data=True,
  install_requires=REQUIRED_PACKAGES
)

In [ ]:
!python setup.py sdist
!gsutil cp ./dist/text_classification-0.1.tar.gz gs://{BUCKET}/{PACKAGES_DIR}/text_classification-0.1.tar.gz

Model Deployment to CMLE


In [ ]:
MODEL_NAME='torch_text_classification'
VERSION_NAME='v1'
RUNTIME_VERSION='1.15'
REGION='us-central1'

In [ ]:
!gcloud beta ai-platform models create {MODEL_NAME} --regions {REGION} --enable-logging --enable-console-logging

In [ ]:
!gcloud ai-platform versions delete {VERSION_NAME} --model {MODEL_NAME} --quiet # run if version already created

Pytorch compatible packages

You need to use compiled packages compatible with Cloud AI Platform Package information here

This bucket containers compiled packages for PyTorch that are compatible with Cloud AI Platform prediction. The files are mirroed from the official builds at https://download.pytorch.org/whl/cpu/torch_stable.html

In order to deploy a PyTorch model on Cloud AI Platform Online Predictions, you must add one of these packages to the packageURIs field on the version you deploy. Pick the package matching your Python and PyTorch version. The package names follow this template:

Package name = torch-{TORCH_VERSION_NUMBER}-{PYTHON_VERSION}-linux_x86_64.whl where PYTHON_VERSION = cp35-cp35m for Python 3 with runtime versions < 1.15, cp37-cp37m for Python 3 with runtime versions >= 1.15

Use cp27-cp27mu for Python 2.

For example, if I were to deploy a PyTorch model based on PyTorch 1.1.0 and Python 3, my gcloud command would look like:

gcloud beta ai-platform versions create {VERSION_NAME} --model {MODEL_NAME} \
...
--package-uris=gs://{MY_PACKAGE_BUCKET}/my_package-0.1.tar.gz,gs://cloud-ai-pytorch/torch-1.1.0-cp35-cp35m-linux_x86_64.whl

In [ ]:
!gcloud beta ai-platform versions create {VERSION_NAME} --model {MODEL_NAME} \
 --origin=gs://{BUCKET}/{MODEL_DIR}/ \
 --python-version=3.7 \
 --runtime-version={RUNTIME_VERSION} \
 --package-uris=gs://{BUCKET}/{PACKAGES_DIR}/text_classification-0.1.tar.gz,gs://cloud-ai-pytorch/torch-1.3.1+cpu-cp37-cp37m-linux_x86_64.whl \
 --machine-type=mls1-c4-m4 \
 --prediction-class=model_prediction.CustomModelPrediction

Online Predictions from CMLE


In [ ]:
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
import json

# JSON format the requests
request_data = {'instances': requests}

# Authenticate and call CMLE prediction API 
credentials = GoogleCredentials.get_application_default()
api = discovery.build('ml', 'v1', credentials=credentials,
            discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json')

parent = 'projects/{}/models/{}/versions/{}'.format(PROJECT, MODEL_NAME, VERSION_NAME)
print("Model full name: {}".format(parent))
response = api.projects().predict(body=request_data, name=parent).execute()

print(response['predictions'])

Authors

Khalid Salama & Vijay Reddy

Disclaimer: This is not an official Google product. The sample code provided for an educational purpose.