Disclaimer: The following code demonstrates a sample model creation with AI Platform, based on GA-BQ export dataset. This is meant for inspiration only. We expect analysts/data scientists to identify the right set of features to create retargeted audiences based on their business needs.

Local Setup

Cloud AI Platform model versions need to be compatible across the Python interpreter, scikit-learn version, and AI Platform ML runtime. To maintain consistency, we'll be using Python 3.7, scikit-learn (0.20.4) and ML runtime 1.15.

As the default interpreter for Colab is Python 3.6, we'll be using a local runtime. Open a shell on your system and follow the instructions -

  1. Create & activate a virtualenv with Python 3.7, e.g.
    python3.7 -m virtualenv venv && source venv/bin/activate
  2. Type
    pip install jupyter_http_over_ws
  3. Type
    jupyter serverextension enable --py jupyter_http_over_ws
  4. Start local server:
    jupyter notebook --NotebookApp.allow_origin='https://colab.research.google.com' --port=8888 --NotebookApp.port_retries=0
  5. Copy the server URL and paste in Backend URL field. ('Connect to a local runtime' on the top left)

Check if you are using python3.7 and update gcloud SDK. (Install if needed)


In [ ]:
!python --version && gcloud components update

Install requirements and login into the right email and project


In [ ]:
!pip install scikit-learn==0.20.4 google-cloud-bigquery pandas numpy google-api-python-client

In [ ]:
!gcloud init

Update Params


In [ ]:
GCP_PROJECT_ID = "" #@param {type:"string"}
BQ_DATASET = "" #@param {type:"string"}
REGION = "us-central1" #@param {type:"string"}

In [ ]:
#@title Enter Model Parameters
GCS_MODEL_DIR = "gs://" #@param {type: "string"}
MODEL_NAME = "" #@param {type:"string"}
VERSION_NAME = "" #@param {type: "string"}
FRAMEWORK = "SCIKIT_LEARN" #@param ["SCIKIT_LEARN", "TENSORFLOW", "XGBOOST"]

if GCS_MODEL_DIR[-1] != '/':
  GCS_MODEL_DIR = GCS_MODEL_DIR + '/'

In [ ]:
import math
from google.cloud import bigquery
client = bigquery.Client(project=GCP_PROJECT_ID)

Generate and load sample GA dataset

Based on an anonymized public GA dataset.
We are creating sample training and test datasets to use as input for the propensity model.


In [ ]:
my_query = """
WITH sample_raw_data AS (
  SELECT CAST(CEIL(RAND() * 100) AS INT64) AS clientId, * EXCEPT (clientId) FROM `bigquery-public-data.google_analytics_sample.ga_sessions_20170801` LIMIT 1000
),
visit_data AS (
  SELECT clientId, SUM(totals.visits) AS all_visits, CAST(ROUND(RAND() * 1) AS INT64) AS converted
  FROM sample_raw_data
  GROUP BY clientId
)
SELECT *
FROM visit_data
"""
df = client.query(my_query).to_dataframe()

In [ ]:
df.head()

In [ ]:
training_data_size = math.ceil(df.shape[0] * 0.7)
training_data = df[:training_data_size]
test_data = df[training_data_size:]
training_data.to_csv('training.csv', index=False)
test_data.to_csv('test.csv', index=False)

In [ ]:
BQ_TABLE_TRAINING = BQ_DATASET+".training_data"
BQ_TABLE_TEST = BQ_DATASET+".test_data"

In [ ]:
!bq load --project_id $GCP_PROJECT_ID --autodetect --source_format='CSV' $BQ_TABLE_TRAINING training.csv
!bq load --project_id $GCP_PROJECT_ID --autodetect --source_format='CSV' $BQ_TABLE_TEST test.csv

Load Training Data from BQ


In [ ]:
my_query = "SELECT * FROM `{0}.{1}`".format(GCP_PROJECT_ID,BQ_TABLE_TRAINING)
training = client.query(my_query).to_dataframe()

In [ ]:
training.head()

Train & Test the model (Simple) - Logistic Regression model

We are using a Logistic Regression model which predicts if the user will convert.
The model output is a 0 (false) or 1 (true) for each user.


In [ ]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from googleapiclient import discovery
import pandas as pd
import numpy as np
import pickle

In [ ]:
features, labels = training[["all_visits"]], training["converted"]
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size = 0.2, random_state=1)
X_train.shape, X_test.shape

In [ ]:
lr = LogisticRegression(penalty='l2')
lr.fit(X_train, y_train)
y_pred = lr.predict(X_test)
y_pred[:5]

In [ ]:
accuracy_score(y_test, y_pred)

In [ ]:
confusion_matrix(y_test, y_pred)

In [ ]:
lr.predict_proba(X_test)

Package & Upload Model to GCP (Simple)


In [ ]:
with open('model.pkl', 'wb') as f:
  pickle.dump(lr,f)

In [ ]:
! gsutil cp model.pkl $GCS_MODEL_DIR

In [ ]:
! gcloud config set project $GCP_PROJECT_ID
! gcloud ai-platform models create $MODEL_NAME --regions $REGION
! gcloud ai-platform versions create $VERSION_NAME --model $MODEL_NAME --origin $GCS_MODEL_DIR --runtime-version=1.15 --framework $FRAMEWORK --python-version=3.7

Train, Test & Upload the model (Advanced) - Logistic Regression model with probability outputs

We are using a Logistic Regression model to predict if the user will convert.
The model output is [class_label, probability], e.g. [1, 0.95]. That is, there's 95% chance the user will convert.


In [ ]:
%%writefile predictor.py
import os
import pickle

import numpy as np

class MyPredictor(object):
  def __init__(self, model):
    self._model = model

  def predict(self, instances, **kwargs):
    inputs = np.asarray(instances)
    probabilities = self._model.predict_proba(inputs).tolist()
    outputs = [[p.index(max(p)), max(p)] for p in probabilities] #label, probability
    return outputs

  @classmethod
  def from_path(cls, model_dir):
    model_path = os.path.join(model_dir, 'model.pkl')
    with open(model_path, 'rb') as f:
      model = pickle.load(f)

    return cls(model)

In [ ]:
%%writefile setup.py
from setuptools import setup

setup(
    name='my_custom_code',
    version='0.1',
    scripts=['predictor.py'])

In [ ]:
GCS_CUSTOM_ROUTINE_PATH = GCS_MODEL_DIR +"my_custom_code-0.1.tar.gz"
GCS_MODEL_PATH = GCS_MODEL_DIR + "model/"
ADVANCED_VERSION_NAME = VERSION_NAME + "_2"

In [ ]:
!python setup.py sdist --formats=gztar
!gsutil cp model.pkl $GCS_MODEL_PATH
!gsutil cp ./dist/my_custom_code-0.1.tar.gz $GCS_CUSTOM_ROUTINE_PATH

If model not created, create the model by uncommenting the first 2 lines.


In [ ]:
#!gcloud config set project $GCP_PROJECT_ID
#!gcloud ai-platform models create $MODEL_NAME --regions $REGION
!gcloud beta ai-platform versions create $ADVANCED_VERSION_NAME --model $MODEL_NAME --origin $GCS_MODEL_PATH --runtime-version=1.15 --python-version=3.7 --package-uris $GCS_CUSTOM_ROUTINE_PATH --prediction-class predictor.MyPredictor

(OPTIONAL) Testing predictions from the AI Platform


In [ ]:
my_query = "SELECT * FROM `{0}.{1}`".format(GCP_PROJECT_ID,BQ_TABLE_TEST)
test = client.query(my_query).to_dataframe()
features_df = test["all_visits"]
features = features_df.values.tolist()
features = [[f] for f in features] if len(np.array(features).shape) == 1 else features
features[:5]

Logistic Regression Model (Simple)


In [ ]:
ai_platform = discovery.build("ml", "v1")
name = 'projects/{}/models/{}/versions/{}'.format(GCP_PROJECT_ID, MODEL_NAME, VERSION_NAME)
response = ai_platform.projects().predict(name=name, body={'instances': features}).execute()

if 'error' in response:
  raise RuntimeError(response['error'])
else:
  predictions = response['predictions']
  print(predictions[:5])

In [ ]:
test['predicted'] = predictions
test.head()

In [ ]:
accuracy_score(test['converted'], test['predicted'])

In [ ]:
confusion_matrix(test['converted'], test['predicted'])

Logistic Regression Model with Probabilty Outputs (Advanced)


In [ ]:
ai_platform = discovery.build('ml', 'v1')
name = 'projects/{}/models/{}/versions/{}'.format(GCP_PROJECT_ID, MODEL_NAME, ADVANCED_VERSION_NAME)

response = ai_platform.projects().predict(name=name, body={'instances': features}).execute()

if 'error' in response:
  raise RuntimeError(response['error'])
else:
  predictions = response['predictions']
  print(predictions[:5])
  test['advanced_labels'] = [p[0] for p in predictions]
  test['advanced_probs'] = [p[1] for p in predictions]

In [ ]:
test.head()

In [ ]:
def postprocess_output(df):
  df = df[df['advanced_labels'] == 1] #predicted to convert
  df['decile'] = pd.qcut(df['advanced_probs'], 10, labels=False, duplicates='drop') 
  col_mapper = {'decile': 'ga:dimension1',
                'clientId': 'ga:userId'}
  df_col_names = list(col_mapper.keys())
  export_names = [col_mapper[key] for key in df_col_names]
  df = df[df_col_names]
  df.columns = export_names
  return df

postprocess_output(test)

Automation with Modem - Parameter Specification

1. Select BQ feature rows for model input

Say training/test dataset has the schema (in BQ) - 'id', 'feature1', 'feature2'. The model uses 'feature1' & 'feature2', then those are the column names. In this example, only 'all_visits' is used as an input column.


In [ ]:
MODEL_INPUT_COL_NAMES = ['all_visits']

2. Create mapping between BQ schema & Data Import schema

The idea here is to think about how the outputs should be mapped before testing and automation. This will be used in the automation piece.
There are 3 distinct cases -

  1. Data Import schema includes the same column from BigQuery (e.g. clientId)
    'clientId': 'ga:userId'
  2. Data Import schema includes the model output without any post processing (e.g. kMeans cluster number, logistic class number) In this case, always use the predicted key as follows -
    'predicted': 'ga:dimension1'
  3. Data Import schema includes the model output without post processing (e.g. predict_proba output from logistic regression model) -
    In this case, the key should be the same as the intended post-processed column name (say, decile). Check the example above for more details.
    'decile': 'ga:dimension2'

In [ ]:
#case 2
CSV_COLUMN_MAP = {'clientId': 'ga:userId',
                  'predicted': 'ga:dimension1'}
#case 3
CSV_COLUMN_MAP = {'clientId': 'ga:userId',
                  'decile': 'ga:dimension2'}

(OPTIONAL) Automation with Modem - Modifying code to add pre and post processing logic

In main.py, you can add additional pre and post processing, at the start of the preprocess_features and postprocess_output functions.
(i.e. between the comments # -------- Additional lines start here -------- and # -------- Additional lines end here --------).

The design principle to keep in mind is to always add columns to the output dataframe, even for intermediate outputs if necessary. See the examples below for inspiration - they are based on the Colab examples.

Pre-processing Example

In this example, we are standardizing the allVisits column before passing it as a model input.

def preprocess_features(df):
    # TODO(developer): If needed, add preprocessing logic.
    # -------- Additional lines start here -------- 
    standardize_col = lambda x: (x - np.mean(x))/ np.std(x)
    df['allVisits] = df['allVisits'].apply(standardize_col) 
    # -------- Additional lines end here -------- 
    selected_df = df[BQ_PREDICTION_FEATURES] 
    features = selected_df.values.tolist()
    features = [[f] for f in features] if len(np.array(features).shape) == 1 else features
    return features, df

Post-processing Example

In this example, we are converting a prediction output from a custom prediction routine to a format for GA import.
Specifically, we are using the prediction output of [label, probability] to filter users who are likely to convert (label = 1) and use the probabilities to assign deciles.

def postprocess_output(df):
    predictions = df['predicted'] 
    # TODO(developer): If needed, add postprocessing logic. Mostly necessary if using custom prediction routine.
    # -------- Additional lines start here -------- 
    df['advanced_labels'] = [p[0] for p in predictions]
    df['advanced_probs'] = [p[1] for p in predictions]
    df = df[df['advanced_labels'] == 1] #predicted to convert
    df['decile'] = pd.qcut(df['advanced_probs'], 10, labels=False, duplicates='drop')
    # -------- Additional lines end here --------  
    final_cols = list(CSV_COLUMN_MAP.keys())
    df = df[final_cols]
    df.columns = [CSV_COLUMN_MAP[bq_col_header] for bq_col_header in final_cols]
    return df