In [ ]:
!pip install -r requirements.txt

In [ ]:
import argparse
import logging
import joblib
import sys
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from xgboost import XGBRegressor
try:
    from sklearn.preprocessing import Imputer
except ImportError:
    from sklearn.impute import SimpleImputer as Imputer

In [ ]:
logging.basicConfig(format='%(message)s')
logging.getLogger().setLevel(logging.INFO)

In [ ]:
def read_input(file_name, test_size=0.25):
    """Read input data and split it into train and test."""
    data = pd.read_csv(file_name)
    data.dropna(axis=0, subset=['SalePrice'], inplace=True)

    y = data.SalePrice
    X = data.drop(['SalePrice'], axis=1).select_dtypes(exclude=['object'])

    train_X, test_X, train_y, test_y = train_test_split(X.values,
                                                      y.values,
                                                      test_size=test_size,
                                                      shuffle=False)

    imputer = Imputer()
    train_X = imputer.fit_transform(train_X)
    test_X = imputer.transform(test_X)

    return (train_X, train_y), (test_X, test_y)

In [ ]:
def train_model(train_X,
                train_y,
                test_X,
                test_y,
                n_estimators,
                learning_rate):
    """Train the model using XGBRegressor."""
    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)

    model.fit(train_X,
            train_y,
            early_stopping_rounds=40,
            eval_set=[(test_X, test_y)])

    print("Best RMSE on eval: %.2f with %d rounds",
               model.best_score,
               model.best_iteration+1)
    return model

def eval_model(model, test_X, test_y):
    """Evaluate the model performance."""
    predictions = model.predict(test_X)
    logging.info("mean_absolute_error=%.2f", mean_absolute_error(predictions, test_y))

def save_model(model, model_file):
    """Save XGBoost model for serving."""
    joblib.dump(model, model_file)
    logging.info("Model export success: %s", model_file)

In [ ]:
def gcs_copy(src_path, dst_path):
    import subprocess
    print(subprocess.run(['gsutil', 'cp', src_path, dst_path], stdout=subprocess.PIPE).stdout[:-1].decode('utf-8'))
    #print(subprocess.run(['gsutil', 'ls', dst_path], stdout=subprocess.PIPE).stdout[:-1].decode('utf-8'))

In [ ]:
### create GCS bucket for storing model weights
GCS_BUCKET = "gs://fairing-demo"
!gsutil mb {GCS_BUCKET}

In [ ]:
class HousingServe(object):
    
    def __init__(self):
        self.train_input = "ames_dataset/train.csv"
        self.n_estimators = 50
        self.learning_rate = 0.1
        self.model_file = "trained_ames_model.dat"
        self.gcs_model_file = "{}/housing_serve/{}".format(GCS_BUCKET, self.model_file)
        self.model = None

    def train(self):
        (train_X, train_y), (test_X, test_y) = read_input(self.train_input)
        model = train_model(train_X,
                          train_y,
                          test_X,
                          test_y,
                          self.n_estimators,
                          self.learning_rate)

        eval_model(model, test_X, test_y)
        save_model(model, self.model_file)
        gcs_copy(self.model_file, self.gcs_model_file)
        print("Model saved to {}".format(self.gcs_model_file))

    def predict(self, X):
        """Predict using the model for given ndarray."""
        #return [[1.1, 1.2]]
        if not self.model:
            gcs_copy(self.gcs_model_file, self.model_file)
            self.model = joblib.load(self.model_file)
        prediction = self.model.predict(data=X)
        return [prediction]

Training Locally


In [ ]:
HousingServe().train()

Fairing


In [ ]:
import os
from kubeflow import fairing

# Setting up google container repositories (GCR) for storing output containers
# You can use any docker container registry istead of GCR
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)

Build base image for remote training and prediction


In [ ]:
py_version = ".".join([str(x) for x in sys.version_info[0:3]])
base_image = 'gcr.io/{}/fairing-predict-example:latest'.format(GCP_PROJECT)

In [ ]:
!docker build --build-arg PY_VERSION={py_version} . -t {base_image}
!docker push {base_image}

Training in KF


In [ ]:
fairing.config.set_builder('docker', registry=DOCKER_REGISTRY, base_image=base_image)
fairing.config.set_deployer('job')
remote_train = fairing.config.fn(HousingServe)
remote_train()

Deploying model and creating an endpoint in KF


In [ ]:
fairing.config.set_deployer('serving', serving_class="HousingServe")
create_endpoint = fairing.config.fn(HousingServe)
create_endpoint()

Making prediction calls against the endpoint


In [ ]:
(train_X, train_y), (test_X, test_y) = read_input("ames_dataset/train.csv")
endpoint.predict_nparray(test_X)