This notebook introduces you to using Kubeflow Fairing to train and deploy a model to Kubeflow on multiple backends such as Google Kubernetes Engine (GKE), Google Cloud ML Engine and Azure Kubernetes Service (AKS). This notebook demonstrate how to:
To learn more about how to run this notebook locally, see the guide to training and deploying on GCP from a local notebook.
In [ ]:
!pip install --user -r requirements.txt
In [ ]:
import argparse
import logging
import joblib
import sys
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor
In [ ]:
logging.basicConfig(format='%(message)s')
logging.getLogger().setLevel(logging.INFO)
Define a function to split the input file into training and testing datasets.
In [ ]:
def read_input(file_name, test_size=0.25):
"""Read input data and split it into train and test."""
data = pd.read_csv(file_name)
data.dropna(axis=0, subset=['SalePrice'], inplace=True)
y = data.SalePrice
X = data.drop(['SalePrice'], axis=1).select_dtypes(exclude=['object'])
train_X, test_X, train_y, test_y = train_test_split(X.values,
y.values,
test_size=test_size,
shuffle=False)
imputer = SimpleImputer()
train_X = imputer.fit_transform(train_X)
test_X = imputer.transform(test_X)
return (train_X, train_y), (test_X, test_y)
Define functions to train, evaluate, and save the trained model.
In [ ]:
def train_model(train_X,
train_y,
test_X,
test_y,
n_estimators,
learning_rate):
"""Train the model using XGBRegressor."""
model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)
model.fit(train_X,
train_y,
early_stopping_rounds=40,
eval_set=[(test_X, test_y)])
print("Best RMSE on eval: %.2f with %d rounds" %
(model.best_score,
model.best_iteration+1))
return model
def eval_model(model, test_X, test_y):
"""Evaluate the model performance."""
predictions = model.predict(test_X)
logging.info("mean_absolute_error=%.2f", mean_absolute_error(predictions, test_y))
def save_model(model, model_file):
"""Save XGBoost model for serving."""
joblib.dump(model, model_file)
logging.info("Model export success: %s", model_file)
Define a class for your model, with methods for training and prediction.
In [ ]:
class HousingServe(object):
def __init__(self):
self.train_input = "ames_dataset/train.csv"
self.n_estimators = 50
self.learning_rate = 0.1
self.model_file = "trained_ames_model.dat"
self.model = None
def train(self):
(train_X, train_y), (test_X, test_y) = read_input(self.train_input)
model = train_model(train_X,
train_y,
test_X,
test_y,
self.n_estimators,
self.learning_rate)
eval_model(model, test_X, test_y)
save_model(model, self.model_file)
def predict(self, X, feature_names=None):
"""Predict using the model for given ndarray."""
if not self.model:
self.model = joblib.load(self.model_file)
# Do any preprocessing
prediction = self.model.predict(data=X)
# Do any postprocessing
return prediction
In [ ]:
HousingServe().train()
In [ ]:
from kubeflow import fairing
# Default to using the GKE backend.
# Change to `KubeflowAzureBackend` or `KubeflowAWSBackend`,etc if you are on other cloud providers.
# Appropriately modify settings below for the corresponding backend.
FAIRING_BACKEND = 'KubeflowGKEBackend'
# If using KubeflowAzureBackend and unless passed to notebook as parameters,
# replace below with your configuration (remove < and > characters).
# DOCKER_REGISTRY = '<your_registry>.azurecr.io'
# AZURE_REGION = '<your_region>'
# AZURE_RESOURCE_GROUP = '<your_resource_group>'
# AZURE_STORAGE_ACCOUNT = '<your_storage_account>'
In [ ]:
import importlib
# The logic in this cell demonstrates what to change
# if wanting to use a different backend such as Cloud ML Engine (GCPManagedBackend),
# AKS (KubeflowAzureBackend) or EKS (KubeflowAWSBackend). This cell is also parametrized
# so that it allows programmatic execution of the notebook using different backends.
if FAIRING_BACKEND in ['KubeflowGKEBackend', 'GCPManagedBackend']:
# Setting up google container repositories (GCR) for storing output containers
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
BuildContext = None
if FAIRING_BACKEND == 'KubeflowAzureBackend':
from kubeflow.fairing.builders.cluster.azurestorage_context import StorageContextSource
BuildContext = StorageContextSource(
region=AZURE_REGION, resource_group_name=AZURE_RESOURCE_GROUP,
storage_account_name=AZURE_STORAGE_ACCOUNT
)
if FAIRING_BACKEND == 'KubeflowAWSBackend':
from kubeflow.fairing.builders.cluster.s3_context import S3ContextSource
# Change the configuration and use your own region, buckets.
AWS_ACCOUNT_ID = fairing.cloud.aws.guess_account_id()
AWS_REGION = 'us-west-2'
DOCKER_REGISTRY = '{}.dkr.ecr.{}.amazonaws.com'.format(AWS_ACCOUNT_ID, AWS_REGION)
S3_BUCKET = 'kubeflow-fairing-data'
BuildContext = S3ContextSource(
aws_account=AWS_ACCOUNT_ID, region=AWS_REGION,
bucket_name=S3_BUCKET
)
BackendClass = getattr(importlib.import_module('kubeflow.fairing.backends'), FAIRING_BACKEND)
Import the TrainJob
and use the configured backend class. Kubeflow Fairing packages the HousingServe
class, the training data, and the training job's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the training job on Kubeflow.
In [ ]:
from kubeflow.fairing import TrainJob
train_job = TrainJob(HousingServe, input_files=['ames_dataset/train.csv', "requirements.txt"],
docker_registry=DOCKER_REGISTRY,
backend=BackendClass(build_context_source=BuildContext))
train_job.submit()
Import the PredictionEndpoint
and use the configured backend class. Kubeflow Fairing packages the HousingServe
class, the trained model, and the prediction endpoint's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the prediction endpoint on Kubeflow.
In [ ]:
from kubeflow.fairing import PredictionEndpoint
endpoint = PredictionEndpoint(HousingServe, input_files=['trained_ames_model.dat', "requirements.txt"],
service_type='LoadBalancer',
docker_registry=DOCKER_REGISTRY,
backend=BackendClass(build_context_source=BuildContext))
endpoint.create()
In [ ]:
(train_X, train_y), (test_X, test_y) = read_input("ames_dataset/train.csv")
endpoint.predict_nparray(test_X)
In [ ]:
endpoint.delete()