In [ ]:
!pip install -r requirements.txt
In [ ]:
import argparse
import logging
import joblib
import sys
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from xgboost import XGBRegressor
try:
from sklearn.preprocessing import Imputer
except ImportError:
from sklearn.impute import SimpleImputer as Imputer
In [ ]:
logging.basicConfig(format='%(message)s')
logging.getLogger().setLevel(logging.INFO)
In [ ]:
def read_input(file_name, test_size=0.25):
"""Read input data and split it into train and test."""
data = pd.read_csv(file_name)
data.dropna(axis=0, subset=['SalePrice'], inplace=True)
y = data.SalePrice
X = data.drop(['SalePrice'], axis=1).select_dtypes(exclude=['object'])
train_X, test_X, train_y, test_y = train_test_split(X.values,
y.values,
test_size=test_size,
shuffle=False)
imputer = Imputer()
train_X = imputer.fit_transform(train_X)
test_X = imputer.transform(test_X)
return (train_X, train_y), (test_X, test_y)
In [ ]:
def train_model(train_X,
train_y,
test_X,
test_y,
n_estimators,
learning_rate):
"""Train the model using XGBRegressor."""
model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)
model.fit(train_X,
train_y,
early_stopping_rounds=40,
eval_set=[(test_X, test_y)])
print("Best RMSE on eval: %.2f with %d rounds",
model.best_score,
model.best_iteration+1)
return model
def eval_model(model, test_X, test_y):
"""Evaluate the model performance."""
predictions = model.predict(test_X)
logging.info("mean_absolute_error=%.2f", mean_absolute_error(predictions, test_y))
def save_model(model, model_file):
"""Save XGBoost model for serving."""
joblib.dump(model, model_file)
logging.info("Model export success: %s", model_file)
In [ ]:
def gcs_copy(src_path, dst_path):
import subprocess
print(subprocess.run(['gsutil', 'cp', src_path, dst_path], stdout=subprocess.PIPE).stdout[:-1].decode('utf-8'))
#print(subprocess.run(['gsutil', 'ls', dst_path], stdout=subprocess.PIPE).stdout[:-1].decode('utf-8'))
In [ ]:
### create GCS bucket for storing model weights
GCS_BUCKET = "gs://fairing-demo"
!gsutil mb {GCS_BUCKET}
In [ ]:
class HousingServe(object):
def __init__(self):
self.train_input = "ames_dataset/train.csv"
self.n_estimators = 50
self.learning_rate = 0.1
self.model_file = "trained_ames_model.dat"
self.gcs_model_file = "{}/housing_serve/{}".format(GCS_BUCKET, self.model_file)
self.model = None
def train(self):
(train_X, train_y), (test_X, test_y) = read_input(self.train_input)
model = train_model(train_X,
train_y,
test_X,
test_y,
self.n_estimators,
self.learning_rate)
eval_model(model, test_X, test_y)
save_model(model, self.model_file)
gcs_copy(self.model_file, self.gcs_model_file)
print("Model saved to {}".format(self.gcs_model_file))
def predict(self, X):
"""Predict using the model for given ndarray."""
#return [[1.1, 1.2]]
if not self.model:
gcs_copy(self.gcs_model_file, self.model_file)
self.model = joblib.load(self.model_file)
prediction = self.model.predict(data=X)
return [prediction]
In [ ]:
HousingServe().train()
In [ ]:
import os
from kubeflow import fairing
# Setting up google container repositories (GCR) for storing output containers
# You can use any docker container registry istead of GCR
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
In [ ]:
py_version = ".".join([str(x) for x in sys.version_info[0:3]])
base_image = 'gcr.io/{}/fairing-predict-example:latest'.format(GCP_PROJECT)
In [ ]:
!docker build --build-arg PY_VERSION={py_version} . -t {base_image}
!docker push {base_image}
In [ ]:
fairing.config.set_builder('docker', registry=DOCKER_REGISTRY, base_image=base_image)
fairing.config.set_deployer('job')
remote_train = fairing.config.fn(HousingServe)
remote_train()
In [ ]:
fairing.config.set_deployer('serving', serving_class="HousingServe")
create_endpoint = fairing.config.fn(HousingServe)
create_endpoint()
In [ ]:
(train_X, train_y), (test_X, test_y) = read_input("ames_dataset/train.csv")
endpoint.predict_nparray(test_X)