In [ ]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
This notebook introduces you the usage of Kubeflow Fairing to train and deploy a model to Kubeflow on Google Kubernetes Engine (GKE), and Google Cloud AI Platform training. This notebook demonstrate how to:
You need Python 3.6 to use Kubeflow Fairing.
Pre-conditions
Create service account
export SA_NAME = [service account name]
gcloud iam service-accounts create ${SA_NAME}
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member serviceAccount:${SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
--role 'roles/editor'
gcloud iam service-accounts keys create ~/key.json \
--iam-account ${SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com
Authorize for Source Repository
gcloud auth configure-docker
Update local kubeconfig (for submiting job to kubeflow cluster)
export CLUSTER_NAME=${DEPLOYMENT_NAME} # this is the deployment name or the kubenete cluster name
export ZONE=us-central1-c
gcloud container clusters get-credentials ${CLUSTER_NAME} --region ${ZONE}
Set the environmental variable: GOOGLE_APPLICATION_CREDENTIALS
export GOOGLE_APPLICATION_CREDENTIALS = ....
os.environ['GOOGLE_APPLICATION_CREDENTIALS']=...
Install the lastest version of fairing
pip install git+https://github.com/kubeflow/fairing@master
Upload training file
# upload the train.csv to GCS bucket that can be accessed from both CMLE and Kubeflow cluster
gsutil cp ./train.csv ${GCP_Bucket}/train.csv
Please not that the above configuration is required for notebook service running outside Kubeflow environment. And the examples demonstrated in the notebook is fully tested on notebook service outside Kubeflow cluster also.
The environemt variables, e.g. service account, projects and etc, should have been pre-configured while setting up the cluster.
In [26]:
import argparse
import logging
import joblib
import sys
import pandas as pd
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBClassifier
In [2]:
logging.basicConfig(format='%(message)s')
logging.getLogger().setLevel(logging.INFO)
In [3]:
import os
import fairing
# Setting up google container repositories (GCR) for storing output containers
# You can use any docker container registry istead of GCR
# For local notebook, GCP_PROJECT should be set explicitly
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
GCP_Bucket = os.environ['GCP_BUCKET'] # e.g., 'gs://kubeflow-demo-g/'
# This is for local notebook instead of that in kubeflow cluster
# os.environ['GOOGLE_APPLICATION_CREDENTIALS']=
In [4]:
def gcs_copy(src_path, dst_path):
import subprocess
print(subprocess.run(['gsutil', 'cp', src_path, dst_path], stdout=subprocess.PIPE).stdout[:-1].decode('utf-8'))
def gcs_download(src_path, file_name):
import subprocess
print(subprocess.run(['gsutil', 'cp', src_path, file_name], stdout=subprocess.PIPE).stdout[:-1].decode('utf-8'))
In [37]:
def read_input(source_path, test_size=0.25):
"""Read input data and split it into train and test."""
file_name = source_path.split('/')[-1]
gcs_download(source_path, file_name)
data = pd.read_csv(file_name)
data.dropna(axis=0, inplace=True)
y = data.Class
X = data.drop(['Class', 'Amount', 'Time'], axis=1).select_dtypes(exclude=['object'])
train_X, test_X, train_y, test_y = train_test_split(X.values,
y.values,
test_size=test_size,
shuffle=True)
imputer = SimpleImputer()
train_X = imputer.fit_transform(train_X)
test_X = imputer.transform(test_X)
return (train_X, train_y), (test_X, test_y)
In [38]:
def train_model(train_X,
train_y,
test_X,
test_y,
n_estimators,
learning_rate):
"""Train the model using XGBRegressor."""
model = XGBClassifier(n_estimators=n_estimators, learning_rate=learning_rate)
model.fit(train_X,
train_y,
early_stopping_rounds=40,
eval_set=[(test_X, test_y)])
print("Best loss on eval: %.2f with %d rounds",
model.best_score,
model.best_iteration+1)
return model
def eval_model(model, test_X, test_y):
"""Evaluate the model performance."""
predictions = model.predict_proba(test_X)
logging.info("auc=%.2f", roc_auc_score(test_y, predictions[:,1]))
def save_model(model, model_file):
"""Save XGBoost model for serving."""
joblib.dump(model, model_file)
gcs_copy(model_file, GCP_Bucket + model_file)
logging.info("Model export success: %s", model_file)
In [39]:
class FraudServe(object):
def __init__(self):
self.train_input = GCP_Bucket + "train_fraud.csv"
self.n_estimators = 50
self.learning_rate = 0.1
self.model_file = "trained_fraud_model.joblib"
self.model = None
def train(self):
(train_X, train_y), (test_X, test_y) = read_input(self.train_input)
model = train_model(train_X,
train_y,
test_X,
test_y,
self.n_estimators,
self.learning_rate)
eval_model(model, test_X, test_y)
save_model(model, self.model_file)
def predict(self, X, feature_names):
"""Predict using the model for given ndarray."""
if not self.model:
self.model = joblib.load(self.model_file)
# Do any preprocessing
prediction = self.model.predict(data=X)
# Do any postprocessing
return [[prediction.item(0), prediction.item(0)]]
In [40]:
FraudServe().train()
In [ ]:
# In this demo, I use gsutil, therefore i compile a special image to install GoogleCloudSDK as based image
base_image = 'gcr.io/{}/fairing-predict-example:latest'.format(GCP_PROJECT)
!docker build --build-arg PY_VERSION=3.6.4 . -t {base_image}
!docker push {base_image}
In [11]:
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job-xgboost'.format(GCP_PROJECT)
BASE_IMAGE = base_image
In [12]:
from fairing import TrainJob
from fairing.backends import GKEBackend
train_job = TrainJob(FraudServe, BASE_IMAGE, input_files=["requirements.txt"],
docker_registry=DOCKER_REGISTRY, backend=GKEBackend())
train_job.submit()
Import the TrainJob
and GCPManagedBackend
classes. Kubeflow Fairing packages the FraudServe
class, the training data, and the training job's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the training job on Cloud ML Engine.
In [13]:
from fairing import TrainJob
from fairing.backends import GCPManagedBackend
train_job = TrainJob(FraudServe, BASE_IMAGE, input_files=["requirements.txt"],
docker_registry=DOCKER_REGISTRY, backend=GCPManagedBackend())
train_job.submit()
Import the PredictionEndpoint
and KubeflowGKEBackend
classes. Kubeflow Fairing packages the FraudServe
class, the trained model, and the prediction endpoint's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the prediction endpoint on Kubeflow.
This part only works for fairing version >=0.5.2
In [14]:
from fairing import PredictionEndpoint
from fairing.backends import KubeflowGKEBackend
# The trained_ames_model.joblib is exported during the above local training
endpoint = PredictionEndpoint(FraudServe, BASE_IMAGE, input_files=['trained_fraud_model.joblib', "requirements.txt"],
docker_registry=DOCKER_REGISTRY, backend=KubeflowGKEBackend())
endpoint.create()
In [ ]:
# Deploy model to gcp
# from fairing.deployers.gcp.gcpserving import GCPServingDeployer
# deployer = GCPServingDeployer()
# deployer.deploy(VERSION_DIR, MODEL_NAME, VERSION_NAME)
In [15]:
(train_X, train_y), (test_X, test_y) = read_input(GCP_Bucket + "train_fraud.csv")
endpoint.predict_nparray(test_X)
In [16]:
endpoint.delete()
In [ ]: