Churn Predictive Analytics using Amazon SageMaker and Snowflake


Background

The purpose of this lab is to demonstrate the basics of building an advanced analytics solution using Amazon SageMaker on data stored in Snowflake. In this notebook we will create a customer churn analytics solution by training an XGBoost churn model, and batching churn prediction scores into a data warehouse.

(Need to update) This notebook extends one of the example tutorial notebooks: Customer Churn Prediction with XGBoost. The extended learning objectives are highlighted in bold below.

Learning Objectives

  • Learn how to query ground truth data from our data warehouse into a pandas dataframe for exploration and feature engineering.
  • Train an XGBoost model to perform churn prediction.
  • Learn how to run a Batch Transform job to calculate churn scores in batch.
  • Optimize your model using SageMaker Neo.
  • Upload the Churn Score results back to Snowflake to perform basic analysis.

Prerequisites

In summary:

  • You've built the lab environment using this CloudFormation template. This template installs the Snowflake python connector within your Jupyter instance.
  • You've taken note of the Snowflake credentials in the lab guide.
  • This notebook should be running in your default VPC.
  • Snowflake traffic uses port 443.

Setup

Run the cell below to import Python libraries required by this notebook.

The IAM role arn used to give training and hosting access to your data. By default, we'll use the IAM permissions that have been allocated to your notebook instance. The role should have the permissions to access your S3 bucket, and full execution permissions on Amazon SageMaker. In practice, you could minimize the scope of requried permissions.


In [ ]:
import boto3
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import io
import os
import sys
import time
import json
from IPython.display import display
from time import strftime, gmtime

import sagemaker
from sagemaker.predictor import csv_serializer
from sagemaker import get_execution_role

sess = sagemaker.Session()
role = get_execution_role()
region = boto3.Session().region_name
print("IAM role ARN: {}".format(role))

Now let's set the S3 bucket and prefix that you want to use for training and model data. This bucket should be created within the same region as the Notebook Instance, training, and hosting.

  • Replace <<'REPLACE WITH YOUR BUCKET NAME'>> with the name of your bucket.

In [ ]:
#bucket = 'snowflake-sagemaker-workshop'
bucket = '<REPLACE WITH YOUR BUCKET NAME>'
prefix = 'churn-analytics-lab'

Data

Mobile operators have historical records on which customers ultimately ended up churning and which continued using the service. We can use this historical information to construct an ML model of one mobile operator’s churn using a process called training. After training the model, we can pass the profile information of an arbitrary customer (the same profile information that we used to train the model) to the model, and have the model predict whether this customer is going to churn. Of course, we expect the model to make mistakes–after all, predicting the future is tricky business! But I’ll also show how to deal with prediction errors.

The dataset we use is publicly available and was mentioned in the book Discovering Knowledge in Data by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets. In the previous steps, this dataset was loaded into the CUSTOMER_CHURN table in your Snowflake instance.

Provide the connection and credentials required to connect to your Snowflake account. You'll need to modify the cell below with the appropriate ACCOUNT for your Snowflake trial. If you followed the lab guide instructions, the username and password below will work.

NOTE: For Snowflake accounts in regions other than US WEST add the Region ID after a period . i.e. XYZ123456.US-EAST-1.

In practice, security standards might prohibit you from providing credentials in clear text. As a best practice in production, you should utilize a service like AWS Secrets Manager to manage your database credentials.


In [ ]:
import snowflake.connector
# Connecting to Snowflake using the default authenticator
ctx = snowflake.connector.connect(
  user='sagemaker',
  password='AWSSF123',
  account='<ACCOUNT>',
  warehouse='SAGEMAKER_WH',
  database='ML_WORKSHOP',
  schema='PUBLIC'
)

Explore

Now we can run queries against your database.

However, in practice, the data table will often contain more data than what is practical to operate on within a notebook instance, or relevant attributes are spread across multiple tables. Being able to run SQL queries and loading the data into a pandas dataframe will be helpful during the initial stages of development. Check out the Spark integration for a fully scalable solution. Snowflake Connector for Spark


In [ ]:
# Query Snowflake Data
cs=ctx.cursor()
allrows=cs.execute("""select Cust_ID,STATE,ACCOUNT_LENGTH,AREA_CODE,PHONE,INTL_PLAN,VMAIL_PLAN,VMAIL_MESSAGE,
                   DAY_MINS,DAY_CALLS,DAY_CHARGE,EVE_MINS,EVE_CALLS,EVE_CHARGE,NIGHT_MINS,NIGHT_CALLS,
                   NIGHT_CHARGE,INTL_MINS,INTL_CALLS,INTL_CHARGE,CUSTSERV_CALLS,
                   CHURN from CUSTOMER_CHURN """).fetchall()

churn = pd.DataFrame(allrows)
churn.columns=['Cust_id','State','Account Length','Area Code','Phone','Intl Plan', 'VMail Plan', 'VMail Message','Day Mins',
            'Day Calls', 'Day Charge', 'Eve Mins', 'Eve Calls', 'Eve Charge', 'Night Mins', 'Night Calls','Night Charge',
            'Intl Mins','Intl Calls','Intl Charge','CustServ Calls', 'Churn?']

pd.set_option('display.max_columns', 500)     # Make sure we can see all of the columns
pd.set_option('display.max_rows', 10)         # Keep the output on one page
churn

By modern standards, it’s a relatively small dataset, with only 3,333 records, where each record uses 21 attributes to describe the profile of a customer of an unknown US mobile operator. The attributes are:

  • State: the US state in which the customer resides, indicated by a two-letter abbreviation; for example, OH or NJ
  • Account Length: the number of days that this account has been active
  • Area Code: the three-digit area code of the corresponding customer’s phone number
  • Phone: the remaining seven-digit phone number
  • Int’l Plan: whether the customer has an international calling plan: yes/no
  • VMail Plan: whether the customer has a voice mail feature: yes/no
  • VMail Message: presumably the average number of voice mail messages per month
  • Day Mins: the total number of calling minutes used during the day
  • Day Calls: the total number of calls placed during the day
  • Day Charge: the billed cost of daytime calls
  • Eve Mins, Eve Calls, Eve Charge: the billed cost for calls placed during the evening
  • Night Mins, Night Calls, Night Charge: the billed cost for calls placed during nighttime
  • Intl Mins, Intl Calls, Intl Charge: the billed cost for international calls
  • CustServ Calls: the number of calls placed to Customer Service
  • Churn?: whether the customer left the service: true/false

The last attribute, Churn?, is known as the target attribute–the attribute that we want the ML model to predict. Because the target attribute is binary, our model will be performing binary prediction, also known as binary classification.

Let's begin exploring the data:


In [ ]:
# Frequency tables for each categorical feature
for column in churn.select_dtypes(include=['object']).columns:
    display(pd.crosstab(index=churn[column], columns='% observations', normalize='columns'))

# Histograms for each numeric features
display(churn.describe())
%matplotlib inline
hist = churn.hist(bins=30, sharey=True, figsize=(10, 10))

We can see immediately that:

  • State appears to be quite evenly distributed
  • Phone takes on too many unique values to be of any practical use. It's possible parsing out the prefix could have some value, but without more context on how these are allocated, we should avoid using it.
  • Only 14% of customers churned, so there is some class imabalance, but nothing extreme.
  • Most of the numeric features are surprisingly nicely distributed, with many showing bell-like gaussianity. VMail Message being a notable exception (and Area Code showing up as a feature we should convert to non-numeric).

In [ ]:
churn = churn.drop('Phone', axis=1)
churn['Area Code'] = churn['Area Code'].astype(object)

Next let's look at the relationship between each of the features and our target variable.


In [ ]:
for column in churn.select_dtypes(include=['object']).columns:
    if column != 'Churn?':
        display(pd.crosstab(index=churn[column], columns=churn['Churn?'], normalize='columns'))

for column in churn.select_dtypes(exclude=['object']).columns:
    print(column)
    hist = churn[[column, 'Churn?']].hist(by='Churn?', bins=30)
    plt.show()

Interestingly we see that churners appear:

  • Fairly evenly distributed geographically
  • More likely to have an international plan
  • Less likely to have a voicemail plan
  • To exhibit some bimodality in daily minutes (either higher or lower than the average for non-churners)
  • To have a larger number of customer service calls (which makes sense as we'd expect customers who experience lots of problems may be more likely to churn)

In addition, we see that churners take on very similar distributions for features like Day Mins and Day Charge. That's not surprising as we'd expect minutes spent talking to correlate with charges. Let's dig deeper into the relationships between our features.


In [ ]:
display(churn.corr())
pd.plotting.scatter_matrix(churn, figsize=(18, 18))
plt.show()

We see several features that essentially have 100% correlation with one another. Including these feature pairs in some machine learning algorithms can create catastrophic problems, while in others it will only introduce minor redundancy and bias. Let's remove one feature from each of the highly correlated pairs: Day Charge from the pair with Day Mins, Night Charge from the pair with Night Mins, Intl Charge from the pair with Intl Mins:


In [ ]:
churn = churn.drop(['Day Charge', 'Eve Charge', 'Night Charge', 'Intl Charge'], axis=1)

Now that we've cleaned up our dataset, let's determine which algorithm to use. As mentioned above, there appear to be some variables where both high and low (but not intermediate) values are predictive of churn. In order to accommodate this in an algorithm like linear regression, we'd need to generate polynomial (or bucketed) terms. Instead, let's attempt to model this problem using gradient boosted trees. Amazon SageMaker provides an XGBoost container that we can use to train in a managed, distributed setting, and then host as a real-time prediction endpoint. XGBoost uses gradient boosted trees which naturally account for non-linear relationships between features and the target variable, as well as accommodating complex interactions between features.

Amazon SageMaker XGBoost can train on data in either a CSV or LibSVM format. For this example, we'll stick with CSV. It should:

  • Have the predictor variable in the first column
  • Not have a header row

But first, let's convert our categorical features into numeric features.


In [ ]:
model_data = pd.get_dummies(churn)
model_data = pd.concat([model_data['Churn?_True.'], model_data.drop(['Churn?_False.', 'Churn?_True.'], axis=1)], axis=1)
to_split_data = model_data.drop(['Cust_id'], axis=1)

And now let's split the data into training, validation, and test sets. This will help prevent us from overfitting the model, and allow us to test the models accuracy on data it hasn't already seen.


In [ ]:
train_data, validation_data, test_data = np.split(to_split_data.sample(frac=1, random_state=1729), [int(0.7 * len(to_split_data)), int(0.9 * len(to_split_data))])
train_data.to_csv('train.csv', header=False, index=False)
validation_data.to_csv('validation.csv', header=False, index=False)

In [ ]:
pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 1000)
display(train_data)

Now we'll upload these files to S3.


In [ ]:
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation.csv')

Train

Moving onto training, first we'll need to specify the locations of the XGBoost algorithm containers.


In [ ]:
from sagemaker.amazon.amazon_estimator import get_image_uri
xgb_training_container = get_image_uri(boto3.Session().region_name, 'xgboost', '0.90-1')

Then, because we're training with the CSV file format, we'll create s3_inputs that our training function can use as a pointer to the files in S3.


In [ ]:
s3_input_train = sagemaker.s3_input(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')
s3_input_validation = sagemaker.s3_input(s3_data='s3://{}/{}/validation/'.format(bucket, prefix), content_type='csv')

Now, we can specify a few parameters like what type of training instances we'd like to use and how many, as well as our XGBoost hyperparameters. A few key hyperparameters are:

  • max_depth controls how deep each tree within the algorithm can be built. Deeper trees can lead to better fit, but are more computationally expensive and can lead to overfitting. There is typically some trade-off in model performance that needs to be explored between a large number of shallow trees and a smaller number of deeper trees.
  • subsample controls sampling of the training data. This technique can help reduce overfitting, but setting it too low can also starve the model of data.
  • num_round controls the number of boosting rounds. This is essentially the subsequent models that are trained using the residuals of previous iterations. Again, more rounds should produce a better fit on the training data, but can be computationally expensive or lead to overfitting.
  • eta controls how aggressive each round of boosting is. Larger values lead to more conservative boosting.
  • gamma controls how aggressively trees are grown. Larger values lead to more conservative models.

More detail on XGBoost's hyperparmeters can be found on their GitHub page.


In [ ]:
xgb = sagemaker.estimator.Estimator(xgb_training_container,
                                    role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m5.xlarge',
                                    output_path='s3://{}/{}/output'.format(bucket, prefix),
                                    sagemaker_session=sess)
xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        num_round=100)

xgb.fit({'train': s3_input_train, 'validation': s3_input_validation})

Compile

Amazon SageMaker Neo optimizes models to run up to twice as fast, with no loss in accuracy. When calling compile_model() function, we specify the target instance family (c5) as well as the S3 bucket to which the compiled model would be stored.


In [ ]:
compiled_model = xgb
#try:
#    xgb.create_model()._neo_image_account(boto3.Session().region_name)
#except:
#    print('Neo is not currently supported in', boto3.Session().region_name)
#else:
#    output_path = '/'.join(xgb.output_path.split('/')[:-1])
#    compiled_model = xgb.compile_model(target_instance_family='ml_c5', 
#                                   input_shape={'data':[1, 69]},
#                                   role=role,
#                                   framework='xgboost',
#                                   framework_version='0.7',
#                                   output_path=output_path)
#    compiled_model.name = 'deployed-xgboost-customer-churn-c5'
#    compiled_model.image = get_image_uri(sess.boto_region_name, 'xgboost-neo', repo_version='latest')

Batch Inference

Next we're going to evaluate our model by using a Batch Transform to generate churn scores in batch from our model_data.

First, we upload the model data to S3. SageMaker Batch Transform is designed to run asynchronously and ingest input data from S3. This differs from SageMaker's real-time inference endpoints, which receive input data from synchronous HTTP requests.

For large scale deployments the data set will be retrieved from Snwoflake using SQL and an External Stage to S3.

Batch Transform is often the ideal option for advanced analytics use case for serveral reasons:

  • Batch Transform is better optimized for throughput in comparison with real-time inference endpoints. Thus, Batch Transform is ideal for processing large volumes of data for analytics.
  • Offline asynchronous processing is acceptable for most analytics use cases.
  • Batch Transform is more cost efficient when real-time inference isn't necessary. You only need to pay for resources used during batch processing. There is no need to pay for ongoing resources like a hosted endpoint for real-time inference.

In [ ]:
batch_input = model_data.iloc[:,1:]
batch_input.to_csv('model.csv', header=False, index=False)
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'model/model.csv')).upload_file('model.csv')

s3uri_batch_input ='s3://{}/{}/model'.format(bucket, prefix)
print('Batch Transform input S3 uri: {}'.format(s3uri_batch_input))

s3uri_batch_output= 's3://{}/{}/out'.format(bucket, prefix)
print('Batch Transform output S3 uri: {}'.format(s3uri_batch_output))

In [ ]:
from sagemaker.transformer import Transformer
BATCH_INSTANCE_TYPE = 'ml.c5.xlarge'

transformer = compiled_model.transformer(instance_count=1,
                                         strategy='SingleRecord',
                                         assemble_with='Line',
                                         instance_type= BATCH_INSTANCE_TYPE,
                                         accept = 'text/csv',
                                         output_path=s3uri_batch_output)
    
transformer.transform(s3uri_batch_input,
                      split_type= 'Line',
                      content_type= 'text/csv',   
                      input_filter = "$[1:]",
                      join_source = "Input",
                      output_filter = "$[0,-1,-2]")

Batch transform jobs run asynchronously, and are non-blocking by default. Run the command below to block until the batch job completes.


In [ ]:
transformer.wait()

There are many ways to compare the performance of a machine learning model, but let's start by simply by comparing actual to predicted values. In this case, we're simply predicting whether the customer churned (1) or not (0), which produces a simple confusion matrix.


In [ ]:
batched_churn_scores = pd.read_csv(s3uri_batch_output+'/model.csv.out', usecols=[0,1], names=['id','scores'])
gt_df = pd.DataFrame(model_data['Churn?_True.']).reset_index(drop=True)
results_df= pd.concat([gt_df,batched_churn_scores],axis=1,join_axes=[gt_df.index])

pd.crosstab(index=results_df['Churn?_True.'], columns=np.round(results_df['scores']), rownames=['actual'], colnames=['predictions'])

Upload Churn Score to Snowflake

To be able to allow multiple business users and dashboards simple access to the churn scores we will upload it to Snowflake by using a Snowflake internal stage.


In [ ]:
results_df.to_csv('results.csv', header=False, index=False)
cs.execute("PUT file://results.csv @ml_results")