Train and deploy on Kubeflow from Notebooks

This notebook introduces you to using Kubeflow Fairing to train and deploy a model to Kubeflow on Google Kubernetes Engine (GKE), and Google Cloud ML Engine. This notebook demonstrate how to:

  • Train an XGBoost model in a local notebook,
  • Use Kubeflow Fairing to train an XGBoost model remotely on Kubeflow,
  • Use Kubeflow Fairing to train an XGBoost model remotely on Cloud ML Engine,
  • Use Kubeflow Fairing to deploy a trained model to Kubeflow, and
  • Call the deployed endpoint for predictions.

To learn more about how to run this notebook locally, see the guide to training and deploying on GCP from a local notebook.

Set up your notebook for training an XGBoost model

Import the libraries required to train this model.


In [ ]:
!pip install -r requirements.txt

In [ ]:
import argparse
import logging
import joblib
import sys
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor

In [ ]:
logging.basicConfig(format='%(message)s')
logging.getLogger().setLevel(logging.INFO)

Define a function to split the input file into training and testing datasets.


In [ ]:
def read_input(file_name, test_size=0.25):
    """Read input data and split it into train and test."""
    data = pd.read_csv(file_name)
    data.dropna(axis=0, subset=['SalePrice'], inplace=True)

    y = data.SalePrice
    X = data.drop(['SalePrice'], axis=1).select_dtypes(exclude=['object'])

    train_X, test_X, train_y, test_y = train_test_split(X.values,
                                                      y.values,
                                                      test_size=test_size,
                                                      shuffle=False)

    imputer = SimpleImputer()
    train_X = imputer.fit_transform(train_X)
    test_X = imputer.transform(test_X)

    return (train_X, train_y), (test_X, test_y)

Define functions to train, evaluate, and save the trained model.


In [ ]:
def train_model(train_X,
                train_y,
                test_X,
                test_y,
                n_estimators,
                learning_rate):
    """Train the model using XGBRegressor."""
    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)

    model.fit(train_X,
            train_y,
            early_stopping_rounds=40,
            eval_set=[(test_X, test_y)])

    print("Best RMSE on eval: %.2f with %d rounds",
               model.best_score,
               model.best_iteration+1)
    return model

def eval_model(model, test_X, test_y):
    """Evaluate the model performance."""
    predictions = model.predict(test_X)
    logging.info("mean_absolute_error=%.2f", mean_absolute_error(predictions, test_y))

def save_model(model, model_file):
    """Save XGBoost model for serving."""
    joblib.dump(model, model_file)
    logging.info("Model export success: %s", model_file)

Define a class for your model, with methods for training and prediction.


In [ ]:
class HousingServe(object):
    
    def __init__(self):
        self.train_input = "ames_dataset/train.csv"
        self.n_estimators = 50
        self.learning_rate = 0.1
        self.model_file = "trained_ames_model.dat"
        self.model = None

    def train(self):
        (train_X, train_y), (test_X, test_y) = read_input(self.train_input)
        model = train_model(train_X,
                          train_y,
                          test_X,
                          test_y,
                          self.n_estimators,
                          self.learning_rate)

        eval_model(model, test_X, test_y)
        save_model(model, self.model_file)

    def predict(self, X):
        """Predict using the model for given ndarray."""
        if not self.model:
            self.model = joblib.load(self.model_file)
        # Do any preprocessing
        prediction = self.model.predict(data=X)
        # Do any postprocessing
        return [prediction]

Train an XGBoost model in a notebook

Call HousingServe().train() to train your model, and then evaluate and save your trained model.


In [ ]:
HousingServe().train()

Set up Kubeflow Fairing for training and predictions on GCP

Import the fairing library and configure the GCP environment that your training or prediction job will run in.


In [ ]:
import os
from kubeflow import fairing

# Setting up google container repositories (GCR) for storing output containers
# You can use any docker container registry istead of GCR
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
PY_VERSION = ".".join([str(x) for x in sys.version_info[0:3]])
BASE_IMAGE = 'python:{}'.format(PY_VERSION)

Train an XGBoost model remotely on Kubeflow

Kubeflow Fairing packages the HousingServe class, the training data, and the training job's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the training job on Kubeflow.


In [ ]:
from kubeflow import fairing
from kubeflow.fairing.builders.cluster import gcs_context
from kubeflow.fairing.builders.cluster.cluster import ClusterBuilder
from kubeflow.fairing.deployers.tfjob.tfjob import TfJob
from kubeflow.fairing.preprocessors.function import FunctionPreProcessor

preprocessor = FunctionPreProcessor(function_obj=HousingServe, input_files=['ames_dataset/train.csv', "requirements.txt"])
builder = ClusterBuilder(registry=DOCKER_REGISTRY, base_image=BASE_IMAGE, preprocessor=preprocessor,
                        pod_spec_mutators=[fairing.cloud.gcp.add_gcp_credentials_if_exists],
                        context_source=gcs_context.GCSContextSource())

builder.build()
pod_spec = builder.generate_pod_spec()
deployer = TfJob(pod_spec_mutators=[fairing.cloud.gcp.add_gcp_credentials_if_exists],
                worker_count=1, chief_count=0)
deployer.deploy(pod_spec)

Deploy the trained model to Kubeflow for predictions

Kubeflow Fairing packages the HousingServe class, the trained model, and the prediction endpoint's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the prediction endpoint on Kubeflow.


In [ ]:
from kubeflow import fairing
from kubeflow.fairing.builders.cluster import gcs_context
from kubeflow.fairing.builders.cluster.cluster import ClusterBuilder
from kubeflow.fairing.deployers.job.job import Job
from kubeflow.fairing.deployers.serving.serving import Serving
from kubeflow.fairing.preprocessors.function import FunctionPreProcessor

preprocessor = FunctionPreProcessor(function_obj=HousingServe, input_files=['trained_ames_model.dat', "requirements.txt"])
builder = ClusterBuilder(registry=DOCKER_REGISTRY, base_image=BASE_IMAGE, preprocessor=preprocessor,
                        pod_spec_mutators=[fairing.cloud.gcp.add_gcp_credentials_if_exists],
                        context_source=gcs_context.GCSContextSource())

builder.build()
pod_spec = builder.generate_pod_spec()

deployer = Serving(serving_class="HousingServe")
endpoint = deployer.deploy(pod_spec)
print(endpoint)

Call the prediction endpoint

Use the endpoint from previous cell. It should be something like http://xx.xx.xx.xx:5000


In [ ]:
!curl REPLACE_WITH_ENDPOINT/predict -H "Content-Type: application/x-www-form-urlencoded" -d 'json={"data":{"tensor":{"shape":[1,37],"values":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37]}}}'