Disclaimer: The following code demonstrates a sample model creation with AI Platform, based on GA-BQ export dataset. This is meant for inspiration only. We expect analysts/data scientists to identify the right set of features to create retargeted audiences based on their business needs.
Cloud AI Platform model versions need to be compatible across the Python interpreter, scikit-learn version, and AI Platform ML runtime. To maintain consistency, we'll be using Python 3.7, scikit-learn (0.20.4) and ML runtime 1.15.
As the default interpreter for Colab is Python 3.6, we'll be using a local runtime. Open a shell on your system and follow the instructions -
python3.7 -m virtualenv venv && source venv/bin/activate
pip install jupyter_http_over_ws
jupyter serverextension enable --py jupyter_http_over_ws
jupyter notebook --NotebookApp.allow_origin='https://colab.research.google.com' --port=8888 --NotebookApp.port_retries=0
Check if you are using python3.7 and update gcloud SDK. (Install if needed)
In [ ]:
!python --version && gcloud components update
Install requirements and login into the right email and project
In [ ]:
!pip install scikit-learn==0.20.4 google-cloud-bigquery pandas numpy google-api-python-client
In [ ]:
!gcloud init
In [ ]:
GCP_PROJECT_ID = "" #@param {type:"string"}
BQ_DATASET = "" #@param {type:"string"}
REGION = "us-central1" #@param {type:"string"}
In [ ]:
#@title Enter Model Parameters
GCS_MODEL_DIR = "gs://" #@param {type: "string"}
MODEL_NAME = "" #@param {type:"string"}
VERSION_NAME = "" #@param {type: "string"}
FRAMEWORK = "SCIKIT_LEARN" #@param ["SCIKIT_LEARN", "TENSORFLOW", "XGBOOST"]
if GCS_MODEL_DIR[-1] != '/':
GCS_MODEL_DIR = GCS_MODEL_DIR + '/'
In [ ]:
import math
from google.cloud import bigquery
client = bigquery.Client(project=GCP_PROJECT_ID)
In [ ]:
my_query = """
WITH sample_raw_data AS (
SELECT CAST(CEIL(RAND() * 100) AS INT64) AS clientId, * EXCEPT (clientId) FROM `bigquery-public-data.google_analytics_sample.ga_sessions_20170801` LIMIT 1000
),
visit_data AS (
SELECT clientId, SUM(totals.visits) AS all_visits, CAST(ROUND(RAND() * 1) AS INT64) AS converted
FROM sample_raw_data
GROUP BY clientId
)
SELECT *
FROM visit_data
"""
df = client.query(my_query).to_dataframe()
In [ ]:
df.head()
In [ ]:
training_data_size = math.ceil(df.shape[0] * 0.7)
training_data = df[:training_data_size]
test_data = df[training_data_size:]
training_data.to_csv('training.csv', index=False)
test_data.to_csv('test.csv', index=False)
In [ ]:
BQ_TABLE_TRAINING = BQ_DATASET+".training_data"
BQ_TABLE_TEST = BQ_DATASET+".test_data"
In [ ]:
!bq load --project_id $GCP_PROJECT_ID --autodetect --source_format='CSV' $BQ_TABLE_TRAINING training.csv
!bq load --project_id $GCP_PROJECT_ID --autodetect --source_format='CSV' $BQ_TABLE_TEST test.csv
In [ ]:
my_query = "SELECT * FROM `{0}.{1}`".format(GCP_PROJECT_ID,BQ_TABLE_TRAINING)
training = client.query(my_query).to_dataframe()
In [ ]:
training.head()
In [ ]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from googleapiclient import discovery
import pandas as pd
import numpy as np
import pickle
In [ ]:
features, labels = training[["all_visits"]], training["converted"]
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size = 0.2, random_state=1)
X_train.shape, X_test.shape
In [ ]:
lr = LogisticRegression(penalty='l2')
lr.fit(X_train, y_train)
y_pred = lr.predict(X_test)
y_pred[:5]
In [ ]:
accuracy_score(y_test, y_pred)
In [ ]:
confusion_matrix(y_test, y_pred)
In [ ]:
lr.predict_proba(X_test)
In [ ]:
with open('model.pkl', 'wb') as f:
pickle.dump(lr,f)
In [ ]:
! gsutil cp model.pkl $GCS_MODEL_DIR
In [ ]:
! gcloud config set project $GCP_PROJECT_ID
! gcloud ai-platform models create $MODEL_NAME --regions $REGION
! gcloud ai-platform versions create $VERSION_NAME --model $MODEL_NAME --origin $GCS_MODEL_DIR --runtime-version=1.15 --framework $FRAMEWORK --python-version=3.7
In [ ]:
%%writefile predictor.py
import os
import pickle
import numpy as np
class MyPredictor(object):
def __init__(self, model):
self._model = model
def predict(self, instances, **kwargs):
inputs = np.asarray(instances)
probabilities = self._model.predict_proba(inputs).tolist()
outputs = [[p.index(max(p)), max(p)] for p in probabilities] #label, probability
return outputs
@classmethod
def from_path(cls, model_dir):
model_path = os.path.join(model_dir, 'model.pkl')
with open(model_path, 'rb') as f:
model = pickle.load(f)
return cls(model)
In [ ]:
%%writefile setup.py
from setuptools import setup
setup(
name='my_custom_code',
version='0.1',
scripts=['predictor.py'])
In [ ]:
GCS_CUSTOM_ROUTINE_PATH = GCS_MODEL_DIR +"my_custom_code-0.1.tar.gz"
GCS_MODEL_PATH = GCS_MODEL_DIR + "model/"
ADVANCED_VERSION_NAME = VERSION_NAME + "_2"
In [ ]:
!python setup.py sdist --formats=gztar
!gsutil cp model.pkl $GCS_MODEL_PATH
!gsutil cp ./dist/my_custom_code-0.1.tar.gz $GCS_CUSTOM_ROUTINE_PATH
If model not created, create the model by uncommenting the first 2 lines.
In [ ]:
#!gcloud config set project $GCP_PROJECT_ID
#!gcloud ai-platform models create $MODEL_NAME --regions $REGION
!gcloud beta ai-platform versions create $ADVANCED_VERSION_NAME --model $MODEL_NAME --origin $GCS_MODEL_PATH --runtime-version=1.15 --python-version=3.7 --package-uris $GCS_CUSTOM_ROUTINE_PATH --prediction-class predictor.MyPredictor
In [ ]:
my_query = "SELECT * FROM `{0}.{1}`".format(GCP_PROJECT_ID,BQ_TABLE_TEST)
test = client.query(my_query).to_dataframe()
features_df = test["all_visits"]
features = features_df.values.tolist()
features = [[f] for f in features] if len(np.array(features).shape) == 1 else features
features[:5]
In [ ]:
ai_platform = discovery.build("ml", "v1")
name = 'projects/{}/models/{}/versions/{}'.format(GCP_PROJECT_ID, MODEL_NAME, VERSION_NAME)
response = ai_platform.projects().predict(name=name, body={'instances': features}).execute()
if 'error' in response:
raise RuntimeError(response['error'])
else:
predictions = response['predictions']
print(predictions[:5])
In [ ]:
test['predicted'] = predictions
test.head()
In [ ]:
accuracy_score(test['converted'], test['predicted'])
In [ ]:
confusion_matrix(test['converted'], test['predicted'])
In [ ]:
ai_platform = discovery.build('ml', 'v1')
name = 'projects/{}/models/{}/versions/{}'.format(GCP_PROJECT_ID, MODEL_NAME, ADVANCED_VERSION_NAME)
response = ai_platform.projects().predict(name=name, body={'instances': features}).execute()
if 'error' in response:
raise RuntimeError(response['error'])
else:
predictions = response['predictions']
print(predictions[:5])
test['advanced_labels'] = [p[0] for p in predictions]
test['advanced_probs'] = [p[1] for p in predictions]
In [ ]:
test.head()
In [ ]:
def postprocess_output(df):
df = df[df['advanced_labels'] == 1] #predicted to convert
df['decile'] = pd.qcut(df['advanced_probs'], 10, labels=False, duplicates='drop')
col_mapper = {'decile': 'ga:dimension1',
'clientId': 'ga:userId'}
df_col_names = list(col_mapper.keys())
export_names = [col_mapper[key] for key in df_col_names]
df = df[df_col_names]
df.columns = export_names
return df
postprocess_output(test)
In [ ]:
MODEL_INPUT_COL_NAMES = ['all_visits']
The idea here is to think about how the outputs should be mapped before testing and automation. This will be used in the automation piece.
There are 3 distinct cases -
In [ ]:
#case 2
CSV_COLUMN_MAP = {'clientId': 'ga:userId',
'predicted': 'ga:dimension1'}
#case 3
CSV_COLUMN_MAP = {'clientId': 'ga:userId',
'decile': 'ga:dimension2'}
In main.py, you can add additional pre and post processing, at the start of the preprocess_features and postprocess_output functions.
(i.e. between the comments # -------- Additional lines start here -------- and # -------- Additional lines end here --------).
The design principle to keep in mind is to always add columns to the output dataframe, even for intermediate outputs if necessary. See the examples below for inspiration - they are based on the Colab examples.
In this example, we are standardizing the allVisits column before passing it as a model input.
def preprocess_features(df):
# TODO(developer): If needed, add preprocessing logic.
# -------- Additional lines start here --------
standardize_col = lambda x: (x - np.mean(x))/ np.std(x)
df['allVisits] = df['allVisits'].apply(standardize_col)
# -------- Additional lines end here --------
selected_df = df[BQ_PREDICTION_FEATURES]
features = selected_df.values.tolist()
features = [[f] for f in features] if len(np.array(features).shape) == 1 else features
return features, df
In this example, we are converting a prediction output from a custom prediction routine to a format for GA import.
Specifically, we are using the prediction output of [label, probability] to filter users who are likely to convert (label = 1) and use the probabilities to assign deciles.
def postprocess_output(df):
predictions = df['predicted']
# TODO(developer): If needed, add postprocessing logic. Mostly necessary if using custom prediction routine.
# -------- Additional lines start here --------
df['advanced_labels'] = [p[0] for p in predictions]
df['advanced_probs'] = [p[1] for p in predictions]
df = df[df['advanced_labels'] == 1] #predicted to convert
df['decile'] = pd.qcut(df['advanced_probs'], 10, labels=False, duplicates='drop')
# -------- Additional lines end here --------
final_cols = list(CSV_COLUMN_MAP.keys())
df = df[final_cols]
df.columns = [CSV_COLUMN_MAP[bq_col_header] for bq_col_header in final_cols]
return df