This notebook shows you how to use Kubeflow to build, train, and deploy models on Kubernetes. This notebook walks you through the following
In [1]:
import os
from oauth2client.client import GoogleCredentials
credentials = GoogleCredentials.get_application_default()
In [2]:
import notebook_setup
notebook_setup.notebook_setup()
In [3]:
# fairing:include-cell
import fire
import joblib
import logging
import nbconvert
import os
import pathlib
import sys
from pathlib import Path
import pandas as pd
import pprint
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor
from importlib import reload
from sklearn.datasets import make_regression
from kubeflow.metadata import metadata
from datetime import datetime
import retrying
import urllib3
In [4]:
# Imports not to be included in the built docker image
import util
import kfp
import kfp.components as comp
import kfp.gcp as gcp
import kfp.dsl as dsl
import kfp.compiler as compiler
from kubernetes import client as k8s_client
from kubeflow import fairing
from kubeflow.fairing.builders import append
from kubeflow.fairing.deployers import job
from kubeflow.fairing.preprocessors.converted_notebook import ConvertNotebookPreprocessorWithFire
In [5]:
# fairing:include-cell
def read_synthetic_input(test_size=0.25):
"""generate synthetic data and split it into train and test."""
# generate regression dataset
X, y = make_regression(n_samples=200, n_features=5, noise=0.1)
train_X, test_X, train_y, test_y = train_test_split(X,
y,
test_size=test_size,
shuffle=False)
imputer = SimpleImputer()
train_X = imputer.fit_transform(train_X)
test_X = imputer.transform(test_X)
return (train_X, train_y), (test_X, test_y)
In [6]:
# fairing:include-cell
def train_model(train_X,
train_y,
test_X,
test_y,
n_estimators,
learning_rate):
"""Train the model using XGBRegressor."""
model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)
model.fit(train_X,
train_y,
early_stopping_rounds=40,
eval_set=[(test_X, test_y)])
print("Best RMSE on eval: %.2f with %d rounds",
model.best_score,
model.best_iteration+1)
return model
def eval_model(model, test_X, test_y):
"""Evaluate the model performance."""
predictions = model.predict(test_X)
mae=mean_absolute_error(predictions, test_y)
logging.info("mean_absolute_error=%.2f", mae)
return mae
def save_model(model, model_file):
"""Save XGBoost model for serving."""
joblib.dump(model, model_file)
logging.info("Model export success: %s", model_file)
def create_workspace():
METADATA_STORE_HOST = "metadata-grpc-service.kubeflow" # default DNS of Kubeflow Metadata gRPC serivce.
METADATA_STORE_PORT = 8080
return metadata.Workspace(
store=metadata.Store(grpc_host=METADATA_STORE_HOST, grpc_port=METADATA_STORE_PORT),
name="xgboost-synthetic",
description="workspace for xgboost-synthetic artifacts and executions")
In [7]:
# fairing:include-cell
class ModelServe(object):
def __init__(self, model_file=None):
self.n_estimators = 50
self.learning_rate = 0.1
if not model_file:
if "MODEL_FILE" in os.environ:
print("model_file not supplied; checking environment variable")
model_file = os.getenv("MODEL_FILE")
else:
print("model_file not supplied; using the default")
model_file = "mockup-model.dat"
self.model_file = model_file
print("model_file={0}".format(self.model_file))
self.model = None
self._workspace = None
self.exec = self.create_execution()
def train(self):
(train_X, train_y), (test_X, test_y) = read_synthetic_input()
# Here we use Kubeflow's metadata library to record information
# about the training run to Kubeflow's metadata store.
self.exec.log_input(metadata.DataSet(
description="xgboost synthetic data",
name="synthetic-data",
owner="someone@kubeflow.org",
uri="file://path/to/dataset",
version="v1.0.0"))
model = train_model(train_X,
train_y,
test_X,
test_y,
self.n_estimators,
self.learning_rate)
mae = eval_model(model, test_X, test_y)
# Here we log metrics about the model to Kubeflow's metadata store.
self.exec.log_output(metadata.Metrics(
name="xgboost-synthetic-traing-eval",
owner="someone@kubeflow.org",
description="training evaluation for xgboost synthetic",
uri="gcs://path/to/metrics",
metrics_type=metadata.Metrics.VALIDATION,
values={"mean_absolute_error": mae}))
save_model(model, self.model_file)
self.exec.log_output(metadata.Model(
name="housing-price-model",
description="housing price prediction model using synthetic data",
owner="someone@kubeflow.org",
uri=self.model_file,
model_type="linear_regression",
training_framework={
"name": "xgboost",
"version": "0.9.0"
},
hyperparameters={
"learning_rate": self.learning_rate,
"n_estimators": self.n_estimators
},
version=datetime.utcnow().isoformat("T")))
def predict(self, X, feature_names):
"""Predict using the model for given ndarray.
The predict signature should match the syntax expected by Seldon Core
https://github.com/SeldonIO/seldon-core so that we can use
Seldon h to wrap it a model server and deploy it on Kubernetes
"""
if not self.model:
self.model = joblib.load(self.model_file)
# Do any preprocessing
prediction = self.model.predict(data=X)
# Do any postprocessing
return [[prediction.item(0), prediction.item(1)]]
@property
def workspace(self):
if not self._workspace:
self._workspace = create_workspace()
return self._workspace
def create_execution(self):
r = metadata.Run(
workspace=self.workspace,
name="xgboost-synthetic-faring-run" + datetime.utcnow().isoformat("T"),
description="a notebook run")
return metadata.Execution(
name = "execution" + datetime.utcnow().isoformat("T"),
workspace=self.workspace,
run=r,
description="execution for training xgboost-synthetic")
In [8]:
model = ModelServe(model_file="mockup-model.dat")
model.train()
In [9]:
(train_X, train_y), (test_X, test_y) =read_synthetic_input()
ModelServe().predict(test_X, None)
Out[9]:
Launching a separate Kubernetes job to train the model has the following advantages
In [10]:
# Setting up google container repositories (GCR) for storing output containers
# You can use any docker container registry istead of GCR
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
pip
to install dependencies
In [11]:
# TODO(https://github.com/kubeflow/fairing/issues/426): We should get rid of this once the default
# Kaniko image is updated to a newer image than 0.7.0.
from kubeflow.fairing import constants
constants.constants.KANIKO_IMAGE = "gcr.io/kaniko-project/executor:v0.14.0"
In [12]:
from kubeflow.fairing.builders import cluster
# output_map is a map of extra files to add to the notebook.
# It is a map from source location to the location inside the context.
output_map = {
"Dockerfile": "Dockerfile",
"requirements.txt": "requirements.txt",
}
preprocessor = ConvertNotebookPreprocessorWithFire(class_name='ModelServe', notebook_file='build-train-deploy.ipynb',
output_map=output_map)
if not preprocessor.input_files:
preprocessor.input_files = set()
input_files=["xgboost_util.py", "mockup-model.dat"]
preprocessor.input_files = set([os.path.normpath(f) for f in input_files])
preprocessor.preprocess()
Out[12]:
In [13]:
# Use a stock jupyter image as our base image
# TODO(jlewi): Should we try to use the downward API to default to the image we are running in?
base_image = "gcr.io/kubeflow-images-public/tensorflow-1.14.0-notebook-cpu:v0.7.0"
# We use a custom Dockerfile
cluster_builder = cluster.cluster.ClusterBuilder(registry=DOCKER_REGISTRY,
base_image=base_image,
preprocessor=preprocessor,
dockerfile_path="Dockerfile",
pod_spec_mutators=[fairing.cloud.gcp.add_gcp_credentials_if_exists],
context_source=cluster.gcs_context.GCSContextSource())
cluster_builder.build()
Here you use the append builder to add your code to the base image
Calling preprocessor.preprocess() converts your notebook file to a python file
This preprocessor converts ipynb files to py files by doing the following
# fairing:include-cell
Call preprocess() will create the file build-train-deploy.py
You use the AppendBuilder to rapidly build a new docker image by quickly adding some files to an existing docker image
preprocessor.input_files
to /app
in the newly created image
In [14]:
preprocessor.preprocess()
builder = append.append.AppendBuilder(registry=DOCKER_REGISTRY,
base_image=cluster_builder.image_tag, preprocessor=preprocessor)
builder.build()
train
as an argument because you want to invoke the train
functionNote When you invoke train_deployer.deploy; kubeflow fairing will stream the logs from the Kubernetes job. The job will initially show some connection errors because the job will try to connect to the metadataserver. You can ignore these errors; the job will retry until its able to connect and then continue
In [15]:
pod_spec = builder.generate_pod_spec()
train_deployer = job.job.Job(cleanup=False,
pod_spec_mutators=[
fairing.cloud.gcp.add_gcp_credentials_if_exists])
# Add command line arguments
pod_spec.containers[0].command.extend(["train"])
result = train_deployer.deploy(pod_spec)
In [16]:
!kubectl get jobs -l fairing-id={train_deployer.job_id} -o yaml
In [17]:
from kubeflow.fairing.deployers import serving
pod_spec = builder.generate_pod_spec()
module_name = os.path.splitext(preprocessor.executable.name)[0]
deployer = serving.serving.Serving(module_name + ".ModelServe",
service_type="ClusterIP",
labels={"app": "mockup"})
url = deployer.deploy(pod_spec)
In [18]:
!kubectl get deploy -o yaml {deployer.deployment.metadata.name}
In [19]:
(train_X, train_y), (test_X, test_y) = read_synthetic_input()
In [20]:
result = util.predict_nparray(url, test_X)
pprint.pprint(result.content)
In [21]:
# !kubectl delete service -l app=ames
# !kubectl delete deploy -l app=ames
create_workspace
to create a workspace for this example
In [22]:
ws = create_workspace()
ws.list()
Out[22]:
To create a pipeline you create a function and decorate it with the @dsl.pipeline
decorator
Inside the function, each step in the function is defined by a ContainerOp that specifies a container to invoke
You will use the container image that you built earlier using Kubeflow Fairing
In [23]:
@dsl.pipeline(
name='Training pipeline',
description='A pipeline that trains an xgboost model for the Ames dataset.'
)
def train_pipeline(
):
command=["python", preprocessor.executable.name, "train"]
train_op = dsl.ContainerOp(
name="train",
image=builder.image_tag,
command=command,
).apply(
gcp.use_gcp_secret('user-gcp-sa'),
)
train_op.container.working_dir = "/app"
In [24]:
pipeline_func = train_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)
In [25]:
EXPERIMENT_NAME = 'MockupModel'
#Specify pipeline argument values
arguments = {}
# Get or create an experiment and submit a pipeline run
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
#vvvvvvvvv This link leads to the run information page. (Note: There is a bug in JupyterLab that modifies the URL and makes the link stop working)