In [0]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

End to End Workflow with ML Pipeline Generator

Run in Colab View on GitHub

Overview

ML Pipeline Generator simplifies model building, training and deployment by generating the required training and deployment modules for your model. Using this tool, users with locally running scripts and notebooks can get started with AI Platform and Kubeflow Pipelines in a few steps, and will have the boilerplate code needed to customize their deployments and pipelines further.

[Insert Pic]

This demo shows you how to train and deploy Machine Learning models on a sample dataset. The demo is divided into two parts:

  • Preparing an SVM classifier for training on Cloud AI platform
  • Orchestrating the training of a Tensorflow model on Kubeflow Pipelines

Dataset

This tutorial uses the United States Census Income Dataset provided by the UC Irvine Machine Learning Repository containing information about people from a 1994 Census database, including age, education, marital status, occupation, and whether they make more than $50,000 a year. The dataset consists of over 30k rows, where each row corresponds to a different person. For a given row, there are 14 features that the model conditions on to predict the income of the person. A few of the features are named above, and the exhaustive list can be found both in the dataset link above.

Set up your local development environment

If you are using Colab or AI Platform Notebooks, your environment already meets all the requirements to run this notebook. If you are using AI Platform Notebook, make sure the machine configuration type is 1 vCPU, 3.75 GB RAM or above. You can skip this step.

Otherwise, make sure your environment meets this notebook's requirements. You need the following:

  • The Google Cloud SDK
  • Git
  • Python 3
  • virtualenv
  • Jupyter notebook running in a virtual environment with Python 3

The Google Cloud guide to Setting up a Python development environment and the Jupyter installation guide provide detailed instructions for meeting these requirements. The following steps provide a condensed set of instructions:

  1. Install and initialize the Cloud SDK.

  2. Install Python 3.

  3. Install virtualenv and create a virtual environment that uses Python 3.

  4. Activate that environment and run pip install jupyter in a shell to install Jupyter.

  5. Run jupyter notebook in a shell to launch Jupyter.

  6. Open this notebook in the Jupyter Notebook Dashboard.

Set up your GCP project

If you do not have a GCP project then the following steps are required, regardless of your notebook environment.

  1. Select or create a GCP project.. When you first create an account, you get a $300 free credit towards your compute/storage costs.

  2. Make sure that billing is enabled for your project.

  3. Create a GCP bucket so that we can store files.

PIP install packages and dependencies

Install addional dependencies not installed in Notebook environment

Note: Jupyter runs lines prefixed with ! as shell commands, and it interpolates Python variables prefixed with $ into these commands.


In [0]:
# Use the latest major GA version of the framework.
! pip install --upgrade ml-pipeline-gen PyYAML

Note: Try installing using sudo, if the above command throw any permission errors.

Restart the kernel to allow the package to be imported for Jupyter Notebooks.

Authenticate your GCP account

If you are using AI Platform Notebooks, your environment is already authenticated. Skip this step.

Only if you are on a local Juypter Notebook or Colab Environment, follow these steps:

  1. Create a New Service Account.

  2. Add the following roles: Compute Engine > Compute Admin, ML Engine > ML Engine Admin and Storage > Storage Object Admin.

  3. Download a JSON file that contains your key and it will be stored in your local environment.


In [0]:
# If you are on Colab, run this cell and upload your service account's
# json key.
import os
import sys

if 'google.colab' in sys.modules:    
  from google.colab import files
  keyfile_upload = files.upload()
  keyfile = list(keyfile_upload.keys())[0]
  keyfile_path = os.path.abspath(keyfile)
  %env GOOGLE_APPLICATION_CREDENTIALS $keyfile_path
  ! gcloud auth activate-service-account --key-file $keyfile_path

In [0]:
# If you are running this notebook locally, replace the string below 
# with the path to your service account key and run this cell 
# to authenticate your GCP account.

%env GOOGLE_APPLICATION_CREDENTIALS /path/to/service/account
! gcloud auth activate-service-account --key-file '/path/to/service/account'

Before You Begin

The tool requires following Google Cloud APIs to be enabled:

Add your Project ID below, you can change the region below if you would like, but it is not a requirement.


In [0]:
PROJECT_ID = "[PROJECT-ID]" #@param {type:"string"}
COMPUTE_REGION = "us-central1" # Currently only supported region.

Also add your bucket name:


In [0]:
BUCKET_NAME = "[BUCKET-ID]" #@param {type:"string"}

In [0]:
!gcloud config set project {PROJECT_ID}

The tool requires following Google Cloud APIs to be enabled:


In [0]:
!gcloud services enable ml.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com

Create a model locally

In this section we will create a model locally, which many users have. This section is done to illustrate the on-prem method of creating models and in the next section we will show how to train them on GCP so that you can leverage the benefits of the cloud like easy distributed training, paralllel hyperparameter tuning and fast, up-to-date accelerators.

The next block of code highlights how we will preprocess the census data. It is out of scope for this colab to dive into how the code works. All that is important is that the function load_data returns 4 values: the training features, the training predictor, the evaluation features and the evaluation predictor in that order (this function also uploads data into GCS). Run the hidden cell below.


In [0]:
#@title
# python3
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Train a simple TF classifier for MNIST dataset.

This example comes from the cloudml-samples keras demo.
github.com/GoogleCloudPlatform/cloudml-samples/blob/master/census/tf-keras
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
from six.moves import urllib
import tempfile

import numpy as np
import pandas as pd
import tensorflow.compat.v1 as tf


DATA_DIR = os.path.join(tempfile.gettempdir(), "census_data")
DATA_URL = ("https://storage.googleapis.com/cloud-samples-data/ai-platform"
            + "/census/data/")
TRAINING_FILE = "adult.data.csv"
EVAL_FILE = "adult.test.csv"
TRAINING_URL = os.path.join(DATA_URL, TRAINING_FILE)
EVAL_URL = os.path.join(DATA_URL, EVAL_FILE)

_CSV_COLUMNS = [
    "age", "workclass", "fnlwgt", "education", "education_num",
    "marital_status", "occupation", "relationship", "race", "gender",
    "capital_gain", "capital_loss", "hours_per_week", "native_country",
    "income_bracket",
]
_LABEL_COLUMN = "income_bracket"
UNUSED_COLUMNS = ["fnlwgt", "education", "gender"]

_CATEGORICAL_TYPES = {
    "workclass": pd.api.types.CategoricalDtype(categories=[
        "Federal-gov", "Local-gov", "Never-worked", "Private", "Self-emp-inc",
        "Self-emp-not-inc", "State-gov", "Without-pay"
    ]),
    "marital_status": pd.api.types.CategoricalDtype(categories=[
        "Divorced", "Married-AF-spouse", "Married-civ-spouse",
        "Married-spouse-absent", "Never-married", "Separated", "Widowed"
    ]),
    "occupation": pd.api.types.CategoricalDtype([
        "Adm-clerical", "Armed-Forces", "Craft-repair", "Exec-managerial",
        "Farming-fishing", "Handlers-cleaners", "Machine-op-inspct",
        "Other-service", "Priv-house-serv", "Prof-specialty", "Protective-serv",
        "Sales", "Tech-support", "Transport-moving"
    ]),
    "relationship": pd.api.types.CategoricalDtype(categories=[
        "Husband", "Not-in-family", "Other-relative", "Own-child", "Unmarried",
        "Wife"
    ]),
    "race": pd.api.types.CategoricalDtype(categories=[
        "Amer-Indian-Eskimo", "Asian-Pac-Islander", "Black", "Other", "White"
    ]),
    "native_country": pd.api.types.CategoricalDtype(categories=[
        "Cambodia", "Canada", "China", "Columbia", "Cuba", "Dominican-Republic",
        "Ecuador", "El-Salvador", "England", "France", "Germany", "Greece",
        "Guatemala", "Haiti", "Holand-Netherlands", "Honduras", "Hong",
        "Hungary", "India", "Iran", "Ireland", "Italy", "Jamaica", "Japan",
        "Laos", "Mexico", "Nicaragua", "Outlying-US(Guam-USVI-etc)", "Peru",
        "Philippines", "Poland", "Portugal", "Puerto-Rico", "Scotland", "South",
        "Taiwan", "Thailand", "Trinadad&Tobago", "United-States", "Vietnam",
        "Yugoslavia"
    ]),
    "income_bracket": pd.api.types.CategoricalDtype(categories=[
        "<=50K", ">50K"
    ])
}


def _download_and_clean_file(filename, url):
    """Downloads data from url, and makes changes to match the CSV format.

    The CSVs may use spaces after the comma delimters (non-standard) or include
    rows which do not represent well-formed examples. This function strips out
    some of these problems.

    Args:
      filename: filename to save url to
      url: URL of resource to download
    """
    temp_file, _ = urllib.request.urlretrieve(url)
    with tf.io.gfile.GFile(temp_file, "r") as temp_file_object:
        with tf.io.gfile.GFile(filename, "w") as file_object:
            for line in temp_file_object:
                line = line.strip()
                line = line.replace(", ", ",")
                if not line or "," not in line:
                    continue
                if line[-1] == ".":
                    line = line[:-1]
                line += "\n"
                file_object.write(line)
    tf.io.gfile.remove(temp_file)


def download(data_dir):
    """Downloads census data if it is not already present.

    Args:
      data_dir: directory where we will access/save the census data

    Returns:
      foo
    """
    tf.io.gfile.makedirs(data_dir)

    training_file_path = os.path.join(data_dir, TRAINING_FILE)
    if not tf.io.gfile.exists(training_file_path):
        _download_and_clean_file(training_file_path, TRAINING_URL)

    eval_file_path = os.path.join(data_dir, EVAL_FILE)
    if not tf.io.gfile.exists(eval_file_path):
        _download_and_clean_file(eval_file_path, EVAL_URL)

    return training_file_path, eval_file_path


def upload(train_df, eval_df, train_path, eval_path):
    train_df.to_csv(os.path.join(os.path.dirname(train_path), TRAINING_FILE),
                    index=False, header=False)
    eval_df.to_csv(os.path.join(os.path.dirname(eval_path), EVAL_FILE),
                   index=False, header=False)


def preprocess(dataframe):
    """Converts categorical features to numeric. Removes unused columns.

    Args:
      dataframe: Pandas dataframe with raw data

    Returns:
      Dataframe with preprocessed data
    """
    dataframe = dataframe.drop(columns=UNUSED_COLUMNS)

    # Convert integer valued (numeric) columns to floating point
    numeric_columns = dataframe.select_dtypes(["int64"]).columns
    dataframe[numeric_columns] = dataframe[numeric_columns].astype("float32")

    # Convert categorical columns to numeric
    cat_columns = dataframe.select_dtypes(["object"]).columns
    dataframe[cat_columns] = dataframe[cat_columns].apply(
        lambda x: x.astype(_CATEGORICAL_TYPES[x.name]))
    dataframe[cat_columns] = dataframe[cat_columns].apply(
        lambda x: x.cat.codes)
    return dataframe


def standardize(dataframe):
    """Scales numerical columns using their means and standard deviation.

    Args:
      dataframe: Pandas dataframe

    Returns:
      Input dataframe with the numerical columns scaled to z-scores
    """
    dtypes = list(zip(dataframe.dtypes.index, map(str, dataframe.dtypes)))
    for column, dtype in dtypes:
        if dtype == "float32":
            dataframe[column] -= dataframe[column].mean()
            dataframe[column] /= dataframe[column].std()
    return dataframe


def load_data(train_path="", eval_path=""):
    """Loads data into preprocessed (train_x, train_y, eval_y, eval_y) dataframes.

    Args:
      train_path: Local or GCS path to uploaded train data to.
      eval_path: Local or GCS path to uploaded eval data to.

    Returns:
      A tuple (train_x, train_y, eval_x, eval_y), where train_x and eval_x are
      Pandas dataframes with features for training and train_y and eval_y are
      numpy arrays with the corresponding labels.
    """
    # Download Census dataset: Training and eval csv files.
    training_file_path, eval_file_path = download(DATA_DIR)

    train_df = pd.read_csv(
        training_file_path, names=_CSV_COLUMNS, na_values="?")
    eval_df = pd.read_csv(eval_file_path, names=_CSV_COLUMNS, na_values="?")

    train_df = preprocess(train_df)
    eval_df = preprocess(eval_df)

    # Split train and eval data with labels. The pop method copies and removes
    # the label column from the dataframe.
    train_x, train_y = train_df, train_df.pop(_LABEL_COLUMN)
    eval_x, eval_y = eval_df, eval_df.pop(_LABEL_COLUMN)

    # Join train_x and eval_x to normalize on overall means and standard
    # deviations. Then separate them again.
    all_x = pd.concat([train_x, eval_x], keys=["train", "eval"])
    all_x = standardize(all_x)
    train_x, eval_x = all_x.xs("train"), all_x.xs("eval")

    # Rejoin features and labels and upload to GCS.
    if train_path and eval_path:
        train_df = train_x.copy()
        train_df[_LABEL_COLUMN] = train_y
        eval_df = eval_x.copy()
        eval_df[_LABEL_COLUMN] = eval_y
        upload(train_df, eval_df, train_path, eval_path)

    # Reshape label columns for use with tf.data.Dataset
    train_y = np.asarray(train_y).astype("float32").reshape((-1, 1))
    eval_y = np.asarray(eval_y).astype("float32").reshape((-1, 1))

    return train_x, train_y, eval_x, eval_y

Now we train the a sklearn SVM model on this data.


In [0]:
from sklearn import svm

train_x, train_y, eval_x, eval_y = load_data()
train_y, eval_y = [np.ravel(x) for x in [train_y, eval_y]]
classifier = svm.SVC(C=1)
classifier.fit(train_x, train_y)
score = classifier.score(eval_x, eval_y)
print('Accuracy is {}'.format(score))

Usually, the pipelines have more complexities to it, such as hyperparameter tuning. However, at the end we have a single model which is the best and which we want to serve in production.

Preparing an SVM classifier for training on Cloud AI platform

We now have a model which we think is good, but we want to add this model onto GCP while at the same time adding additional features such as training and prediction so future runs will be simple.

We can leverage the examples that are in thie ML Pipeline Generator as they give good examples and templates to follow. So first we clone the github repo.


In [0]:
!git clone https://github.com/GoogleCloudPlatform/ml-pipeline-generator-python.git

Then we copy the sklearn example to the current directory and go into this folder.


In [0]:
!cp -r ml-pipeline-generator-python/examples/sklearn sklearn-demo

In [0]:
%cd sklearn-demo

We now modify the config.yaml.example file with out project id, bucket id and model name. Note the training and evaluation data files should be stored in your bucket already, unless you decided to handle that upload in your preprocessing function (like in this lab).


In [0]:
%%writefile config.yaml
# Copyright 2020 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Config file for ML Pipeline Generator.

project_id: [PROJECT ID]
bucket_id: [BUCKET ID]
region: "us-central1"
scale_tier: "STANDARD_1"
runtime_version: "1.15"
python_version: "3.7"
package_name: "ml_pipeline_gen"
machine_type_pred: "mls1-c4-m2"

data:
    schema:
        - "age"
        - "workclass"
        - "education_num"
        - "marital_status"
        - "occupation"
        - "relationship"
        - "race"
        - "capital_gain"
        - "capital_loss"
        - "hours_per_week"
        - "native_country"
        - "income_bracket"
    train: "gs://[BUCKET ID]/[MODEL NAME]/data/adult.data.csv"
    evaluation: "gs://[BUCKET ID]/[MODEL NAME]/data/adult.test.csv"
    prediction:
        input_data_paths:
            - "gs://[BUCKET ID]/[MODEL NAME]/inputs/*"
        input_format: "JSON"
        output_format: "JSON"

model:
    # Name must start with a letter and only contain letters, numbers, and
    # underscores.
    name: [MODEL NAME]
    path: "model.sklearn_model"
    target: "income_bracket"

model_params:
    input_args:
        C:
            type: "float"
            help: "Regularization parameter, must be positive."
            default: 1.0
    # Relative path.
    hyperparam_config: "hptuning_config.yaml"

We now copy our previous preoprocessing code into the file concensus_preprocess.py. Run the hidden cell below.


In [0]:
#@title
%%writefile model/census_preprocess.py
# python3
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Train a simple TF classifier for MNIST dataset.

This example comes from the cloudml-samples keras demo.
github.com/GoogleCloudPlatform/cloudml-samples/blob/master/census/tf-keras
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
from six.moves import urllib
import tempfile

import numpy as np
import pandas as pd
import tensorflow.compat.v1 as tf


DATA_DIR = os.path.join(tempfile.gettempdir(), "census_data")
DATA_URL = ("https://storage.googleapis.com/cloud-samples-data/ai-platform"
            + "/census/data/")
TRAINING_FILE = "adult.data.csv"
EVAL_FILE = "adult.test.csv"
TRAINING_URL = os.path.join(DATA_URL, TRAINING_FILE)
EVAL_URL = os.path.join(DATA_URL, EVAL_FILE)

_CSV_COLUMNS = [
    "age", "workclass", "fnlwgt", "education", "education_num",
    "marital_status", "occupation", "relationship", "race", "gender",
    "capital_gain", "capital_loss", "hours_per_week", "native_country",
    "income_bracket",
]
_LABEL_COLUMN = "income_bracket"
UNUSED_COLUMNS = ["fnlwgt", "education", "gender"]

_CATEGORICAL_TYPES = {
    "workclass": pd.api.types.CategoricalDtype(categories=[
        "Federal-gov", "Local-gov", "Never-worked", "Private", "Self-emp-inc",
        "Self-emp-not-inc", "State-gov", "Without-pay"
    ]),
    "marital_status": pd.api.types.CategoricalDtype(categories=[
        "Divorced", "Married-AF-spouse", "Married-civ-spouse",
        "Married-spouse-absent", "Never-married", "Separated", "Widowed"
    ]),
    "occupation": pd.api.types.CategoricalDtype([
        "Adm-clerical", "Armed-Forces", "Craft-repair", "Exec-managerial",
        "Farming-fishing", "Handlers-cleaners", "Machine-op-inspct",
        "Other-service", "Priv-house-serv", "Prof-specialty", "Protective-serv",
        "Sales", "Tech-support", "Transport-moving"
    ]),
    "relationship": pd.api.types.CategoricalDtype(categories=[
        "Husband", "Not-in-family", "Other-relative", "Own-child", "Unmarried",
        "Wife"
    ]),
    "race": pd.api.types.CategoricalDtype(categories=[
        "Amer-Indian-Eskimo", "Asian-Pac-Islander", "Black", "Other", "White"
    ]),
    "native_country": pd.api.types.CategoricalDtype(categories=[
        "Cambodia", "Canada", "China", "Columbia", "Cuba", "Dominican-Republic",
        "Ecuador", "El-Salvador", "England", "France", "Germany", "Greece",
        "Guatemala", "Haiti", "Holand-Netherlands", "Honduras", "Hong",
        "Hungary", "India", "Iran", "Ireland", "Italy", "Jamaica", "Japan",
        "Laos", "Mexico", "Nicaragua", "Outlying-US(Guam-USVI-etc)", "Peru",
        "Philippines", "Poland", "Portugal", "Puerto-Rico", "Scotland", "South",
        "Taiwan", "Thailand", "Trinadad&Tobago", "United-States", "Vietnam",
        "Yugoslavia"
    ]),
    "income_bracket": pd.api.types.CategoricalDtype(categories=[
        "<=50K", ">50K"
    ])
}


def _download_and_clean_file(filename, url):
    """Downloads data from url, and makes changes to match the CSV format.

    The CSVs may use spaces after the comma delimters (non-standard) or include
    rows which do not represent well-formed examples. This function strips out
    some of these problems.

    Args:
      filename: filename to save url to
      url: URL of resource to download
    """
    temp_file, _ = urllib.request.urlretrieve(url)
    with tf.io.gfile.GFile(temp_file, "r") as temp_file_object:
        with tf.io.gfile.GFile(filename, "w") as file_object:
            for line in temp_file_object:
                line = line.strip()
                line = line.replace(", ", ",")
                if not line or "," not in line:
                    continue
                if line[-1] == ".":
                    line = line[:-1]
                line += "\n"
                file_object.write(line)
    tf.io.gfile.remove(temp_file)


def download(data_dir):
    """Downloads census data if it is not already present.

    Args:
      data_dir: directory where we will access/save the census data

    Returns:
      foo
    """
    tf.io.gfile.makedirs(data_dir)

    training_file_path = os.path.join(data_dir, TRAINING_FILE)
    if not tf.io.gfile.exists(training_file_path):
        _download_and_clean_file(training_file_path, TRAINING_URL)

    eval_file_path = os.path.join(data_dir, EVAL_FILE)
    if not tf.io.gfile.exists(eval_file_path):
        _download_and_clean_file(eval_file_path, EVAL_URL)

    return training_file_path, eval_file_path


def upload(train_df, eval_df, train_path, eval_path):
    train_df.to_csv(os.path.join(os.path.dirname(train_path), TRAINING_FILE),
                    index=False, header=False)
    eval_df.to_csv(os.path.join(os.path.dirname(eval_path), EVAL_FILE),
                   index=False, header=False)


def preprocess(dataframe):
    """Converts categorical features to numeric. Removes unused columns.

    Args:
      dataframe: Pandas dataframe with raw data

    Returns:
      Dataframe with preprocessed data
    """
    dataframe = dataframe.drop(columns=UNUSED_COLUMNS)

    # Convert integer valued (numeric) columns to floating point
    numeric_columns = dataframe.select_dtypes(["int64"]).columns
    dataframe[numeric_columns] = dataframe[numeric_columns].astype("float32")

    # Convert categorical columns to numeric
    cat_columns = dataframe.select_dtypes(["object"]).columns
    dataframe[cat_columns] = dataframe[cat_columns].apply(
        lambda x: x.astype(_CATEGORICAL_TYPES[x.name]))
    dataframe[cat_columns] = dataframe[cat_columns].apply(
        lambda x: x.cat.codes)
    return dataframe


def standardize(dataframe):
    """Scales numerical columns using their means and standard deviation.

    Args:
      dataframe: Pandas dataframe

    Returns:
      Input dataframe with the numerical columns scaled to z-scores
    """
    dtypes = list(zip(dataframe.dtypes.index, map(str, dataframe.dtypes)))
    for column, dtype in dtypes:
        if dtype == "float32":
            dataframe[column] -= dataframe[column].mean()
            dataframe[column] /= dataframe[column].std()
    return dataframe


def load_data(train_path="", eval_path=""):
    """Loads data into preprocessed (train_x, train_y, eval_y, eval_y) dataframes.

    Args:
      train_path: Local or GCS path to uploaded train data to.
      eval_path: Local or GCS path to uploaded eval data to.

    Returns:
      A tuple (train_x, train_y, eval_x, eval_y), where train_x and eval_x are
      Pandas dataframes with features for training and train_y and eval_y are
      numpy arrays with the corresponding labels.
    """
    # Download Census dataset: Training and eval csv files.
    training_file_path, eval_file_path = download(DATA_DIR)

    train_df = pd.read_csv(
        training_file_path, names=_CSV_COLUMNS, na_values="?")
    eval_df = pd.read_csv(eval_file_path, names=_CSV_COLUMNS, na_values="?")

    train_df = preprocess(train_df)
    eval_df = preprocess(eval_df)

    # Split train and eval data with labels. The pop method copies and removes
    # the label column from the dataframe.
    train_x, train_y = train_df, train_df.pop(_LABEL_COLUMN)
    eval_x, eval_y = eval_df, eval_df.pop(_LABEL_COLUMN)

    # Join train_x and eval_x to normalize on overall means and standard
    # deviations. Then separate them again.
    all_x = pd.concat([train_x, eval_x], keys=["train", "eval"])
    all_x = standardize(all_x)
    train_x, eval_x = all_x.xs("train"), all_x.xs("eval")

    # Rejoin features and labels and upload to GCS.
    if train_path and eval_path:
        train_df = train_x.copy()
        train_df[_LABEL_COLUMN] = train_y
        eval_df = eval_x.copy()
        eval_df[_LABEL_COLUMN] = eval_y
        upload(train_df, eval_df, train_path, eval_path)

    # Reshape label columns for use with tf.data.Dataset
    train_y = np.asarray(train_y).astype("float32").reshape((-1, 1))
    eval_y = np.asarray(eval_y).astype("float32").reshape((-1, 1))

    return train_x, train_y, eval_x, eval_y

We perform a similar copy and paste into the sklearn_model.py file, with the addition of a parameter C which we will use for hyperparameter tuning. You can add as much hyperparameters as you requre to tune.


In [0]:
%%writefile model/sklearn_model.py
# python3
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Train a simple SVM classifier."""

import argparse
import numpy as np
from sklearn import svm

from model.census_preprocess import load_data


def get_model(params):
    """Trains a classifier."""
    classifier = svm.SVC(C=params.C)
    return classifier

We now speify the hyperparameters for our training runs based on the hyperparameter tuning yaml format for CAIP.


In [0]:
%%writefile hptuning_config.yaml
# Copyright 2020 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
trainingInput:
  scaleTier: STANDARD_1
  hyperparameters:
    goal: MAXIMIZE
    maxTrials: 2
    maxParallelTrials: 2
    hyperparameterMetricTag: score
    enableTrialEarlyStopping: TRUE
    params:
    - parameterName: C
      type: DOUBLE
      minValue: .001
      maxValue: 10
      scaleType: UNIT_LOG_SCALE

Run the Sklearn Model on CAIP

We only modified two yaml files and the demo.py file to specify training, hyperparameter tuning and model prediction. Then, we simply copied and pasted our existing code for preprocessing and building the model. We did not have to write any GCP specific code as yet, this will all be handled by this solution. Now we can submit our jobs to the cloud with a few commands


In [0]:
from ml_pipeline_gen.models import SklearnModel
from model.census_preprocess import load_data

Specify the path of your config.yaml file


In [0]:
config = "config.yaml"

Now, we can easily create our model, generate all the necessary Cloud AI Platform files needed to train the model, upload the data files and train the model in 4 simple commands. Note, our load_data function uploads the files for us automatically, you can also manually upload the files to the buckets you specified in the config.yaml file.


In [0]:
model = SklearnModel(config)
model.generate_files()

# this fn is from out preprocessing file and
# automatically uploads our data to GCS
load_data(model.data["train"], model.data["evaluation"])

job_id = model.train(tune=True)

After training, we would like to test our model's prediction. First, deploy the model (our code automatically returns a generated version). Then request online predictions.


In [0]:
pred_input = [
    [0.02599666, 6, 1.1365801, 4, 0, 1, 4, 0.14693314, -0.21713187,
      -0.034039237, 38],
]
version = model.deploy(job_id=job_id)
preds = model.online_predict(pred_input, version=version)

print("Features: {}".format(pred_input))
print("Predictions: {}".format(preds))