Train and deploy on Kubeflow from Notebooks

This notebook introduces you to using Kubeflow Fairing to train and deploy a model to Kubeflow on multiple backends such as Google Kubernetes Engine (GKE), Google Cloud ML Engine and Azure Kubernetes Service (AKS). This notebook demonstrate how to:

  • Train an XGBoost model in a local notebook,
  • Use Kubeflow Fairing to train an XGBoost model remotely on Kubeflow using some of the supported backends,
  • Use Kubeflow Fairing to deploy a trained model to Kubeflow, and
  • Call the deployed endpoint for predictions.

To learn more about how to run this notebook locally, see the guide to training and deploying on GCP from a local notebook.

Set up your notebook for training an XGBoost model

Import the libraries required to train this model.


In [ ]:
!pip install --user -r requirements.txt

In [ ]:
import argparse
import logging
import joblib
import sys
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor

In [ ]:
logging.basicConfig(format='%(message)s')
logging.getLogger().setLevel(logging.INFO)

Define a function to split the input file into training and testing datasets.


In [ ]:
def read_input(file_name, test_size=0.25):
    """Read input data and split it into train and test."""
    data = pd.read_csv(file_name)
    data.dropna(axis=0, subset=['SalePrice'], inplace=True)

    y = data.SalePrice
    X = data.drop(['SalePrice'], axis=1).select_dtypes(exclude=['object'])

    train_X, test_X, train_y, test_y = train_test_split(X.values,
                                                      y.values,
                                                      test_size=test_size,
                                                      shuffle=False)

    imputer = SimpleImputer()
    train_X = imputer.fit_transform(train_X)
    test_X = imputer.transform(test_X)

    return (train_X, train_y), (test_X, test_y)

Define functions to train, evaluate, and save the trained model.


In [ ]:
def train_model(train_X,
                train_y,
                test_X,
                test_y,
                n_estimators,
                learning_rate):
    """Train the model using XGBRegressor."""
    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)

    model.fit(train_X,
            train_y,
            early_stopping_rounds=40,
            eval_set=[(test_X, test_y)])

    print("Best RMSE on eval: %.2f with %d rounds" %
               (model.best_score,
                model.best_iteration+1))
    return model

def eval_model(model, test_X, test_y):
    """Evaluate the model performance."""
    predictions = model.predict(test_X)
    logging.info("mean_absolute_error=%.2f", mean_absolute_error(predictions, test_y))

def save_model(model, model_file):
    """Save XGBoost model for serving."""
    joblib.dump(model, model_file)
    logging.info("Model export success: %s", model_file)

Define a class for your model, with methods for training and prediction.


In [ ]:
class HousingServe(object):
    
    def __init__(self):
        self.train_input = "ames_dataset/train.csv"
        self.n_estimators = 50
        self.learning_rate = 0.1
        self.model_file = "trained_ames_model.dat"
        self.model = None

    def train(self):
        (train_X, train_y), (test_X, test_y) = read_input(self.train_input)
        model = train_model(train_X,
                          train_y,
                          test_X,
                          test_y,
                          self.n_estimators,
                          self.learning_rate)

        eval_model(model, test_X, test_y)
        save_model(model, self.model_file)

    def predict(self, X, feature_names=None):
        """Predict using the model for given ndarray."""
        if not self.model:
            self.model = joblib.load(self.model_file)
        # Do any preprocessing
        prediction = self.model.predict(data=X)
        # Do any postprocessing
        return prediction

Train an XGBoost model in a notebook

Call HousingServe().train() to train your model, and then evaluate and save your trained model.


In [ ]:
HousingServe().train()

Set up Kubeflow Fairing for training and predictions

Import the fairing library and configure the backend environment that your training or prediction job will run in.


In [ ]:
from kubeflow import fairing

# Default to using the GKE backend.
# Change to `KubeflowAzureBackend` or `KubeflowAWSBackend`,etc if you are on other cloud providers.
# Appropriately modify settings below for the corresponding backend.
FAIRING_BACKEND = 'KubeflowGKEBackend'

# If using KubeflowAzureBackend and unless passed to notebook as parameters,
# replace below with your configuration (remove < and > characters).
# DOCKER_REGISTRY = '<your_registry>.azurecr.io'
# AZURE_REGION = '<your_region>'
# AZURE_RESOURCE_GROUP = '<your_resource_group>'
# AZURE_STORAGE_ACCOUNT = '<your_storage_account>'

In [ ]:
import importlib

# The logic in this cell demonstrates what to change
# if wanting to use a different backend such as Cloud ML Engine (GCPManagedBackend),
# AKS (KubeflowAzureBackend) or EKS (KubeflowAWSBackend). This cell is also parametrized
# so that it allows programmatic execution of the notebook using different backends.

if FAIRING_BACKEND in ['KubeflowGKEBackend', 'GCPManagedBackend']:
    # Setting up google container repositories (GCR) for storing output containers
    GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
    DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
    BuildContext = None
if FAIRING_BACKEND == 'KubeflowAzureBackend':
    from kubeflow.fairing.builders.cluster.azurestorage_context import StorageContextSource
    BuildContext = StorageContextSource(
        region=AZURE_REGION, resource_group_name=AZURE_RESOURCE_GROUP,
        storage_account_name=AZURE_STORAGE_ACCOUNT
    )
if FAIRING_BACKEND == 'KubeflowAWSBackend':
    from kubeflow.fairing.builders.cluster.s3_context import S3ContextSource
    # Change the configuration and use your own region, buckets.
    AWS_ACCOUNT_ID = fairing.cloud.aws.guess_account_id()
    AWS_REGION = 'us-west-2'
    DOCKER_REGISTRY = '{}.dkr.ecr.{}.amazonaws.com'.format(AWS_ACCOUNT_ID, AWS_REGION)
    S3_BUCKET = 'kubeflow-fairing-data'
    
    BuildContext = S3ContextSource(
        aws_account=AWS_ACCOUNT_ID, region=AWS_REGION,
        bucket_name=S3_BUCKET
    )
    
BackendClass = getattr(importlib.import_module('kubeflow.fairing.backends'), FAIRING_BACKEND)

Train an XGBoost model remotely on Kubeflow

Import the TrainJob and use the configured backend class. Kubeflow Fairing packages the HousingServe class, the training data, and the training job's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the training job on Kubeflow.


In [ ]:
from kubeflow.fairing import TrainJob
train_job = TrainJob(HousingServe, input_files=['ames_dataset/train.csv', "requirements.txt"],
                     docker_registry=DOCKER_REGISTRY,
                     backend=BackendClass(build_context_source=BuildContext))
train_job.submit()

Deploy the trained model to Kubeflow for predictions

Import the PredictionEndpoint and use the configured backend class. Kubeflow Fairing packages the HousingServe class, the trained model, and the prediction endpoint's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the prediction endpoint on Kubeflow.


In [ ]:
from kubeflow.fairing import PredictionEndpoint
endpoint = PredictionEndpoint(HousingServe, input_files=['trained_ames_model.dat', "requirements.txt"],
                              service_type='LoadBalancer',
                              docker_registry=DOCKER_REGISTRY,
                              backend=BackendClass(build_context_source=BuildContext))
endpoint.create()

Call the prediction endpoint

Create a test dataset, then call the endpoint on Kubeflow for predictions.


In [ ]:
(train_X, train_y), (test_X, test_y) = read_input("ames_dataset/train.csv")
endpoint.predict_nparray(test_X)

Clean up the prediction endpoint

Delete the prediction endpoint created by this notebook.


In [ ]:
endpoint.delete()