Cognihack Data Science Track
Predicting repeat shopping likelihood with Python, Spark and Watson Machine Learning

This notebook will take you through the process of creating a predictive model in Python using the data manipulation and machine learning libraries distributed with Spark.

Once we've worked through the process of reading, understanding and preparing our data and have built a simple model together, we'll deploy it to the Watson Machine Learning service and make it available as a real-time scoring service.

You should spend the remaining time working as a group to speculate on how you might improve these predictions. The cognihack tutors will endeavour to assist with any experimentation to help you create and evaluate refinements to the baseline model.

Learning goals

The learning goals of this exercise are:

  • Loading CSV files into an Apache® Spark DataFrame.
  • Exploring the data using the features within:
    a) Spark's data wrangling Python API: pyspark.sql;
    b) the pandas data wrangling library; and
    c) matplotlib for exploratory plots.
  • Engineering some basic predictive features, again using pyspark.sql and Spark user defined functions (UDFs).
  • Preparing the data for training and evaluation.
  • Creating an Apache® Spark machine learning pipeline.
  • Training and evaluating a model.
  • Persisting a pipeline and model in Watson Machine Learning repository.
  • Deploying the model for online scoring using Wastson Machine Learning API.
  • Scoring sample scoring data using the Watson Machine Learning API.

Contents

This notebook contains the following parts:

  1. Setup
  2. Load and understand data
  3. Prepare dataset
  4. Create a basic model
  5. Deploy and score
  6. Taking it further

1. Setup

Before we begin working through this notebook, you must perform the following setup tasks:

  • Sign up for the IBM Data Science Experience (using w3 credentials) and create a new project;
  • Make sure that you are using a Spark 2.0 kernel and Python 2.x; and
  • Create a Watson Machine Learning Service instance (a free plan is offered).

2. Load and explore data

2.1 Load the data

The first step in our analysis process is to bring our data into the Spark environment.
We will do this by reading directly from a Bluemix hosted dashDB instance.

pyspark.sql will help us load and manipulate our data from Python while it resides in Spark.


In [ ]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

This last statement will make some aggregation and mutation statements look a little different to the cheat sheet as spark functions will be prefixed by F.

The spark.read function will import data directly into our spark instance, without touching the environment in which Python is running.


In [ ]:
spark = SparkSession.builder.getOrCreate()

dash = {
    'jdbcurl': 'jdbc:db2://dashdb-entry-yp-dal09-10.services.dal.bluemix.net:50000/BLUDB',
    'user': 'dash14210',
    'password': 'W@sI$Ea8H8fz',
}

offers = spark.read.jdbc(dash['jdbcurl'],
                         table='DASH14210.OFFERS', 
                         properties={"user" : dash["user"], 
                                     "password" : dash["password"]})

trainHistory = spark.read.jdbc(dash['jdbcurl'], 
                               table='DASH14210.HISTORY', 
                               properties={"user" : dash["user"], 
                                           "password" : dash["password"]})

transactions = spark.read.jdbc(dash['jdbcurl'], 
                               table='DASH14210.TRANSACTIONS', 
                               properties={"user" : dash["user"], 
                                           "password" : dash["password"]})

offers.cache()
trainHistory.cache()
transactions.cache()

When this is done, we are left with a Python object which is a logical pointer to a Spark DataFrame object.
Notice as well that Spark will give us some information on how it is breaking down the heavy lifting into discrete and parallelisable tasks.

2.2 Quality and completeness

Before we can get stuck into building predictive models, we need to first understand if any immediate attention is required to issues of data quality and completeness.

Like many data manipulation libraries, Spark makes its own decisions about how to read in data.
To check it has treated our data as we expected, we need to compare the schemata of our Spark DataFrames to that provided with the datasets:

history
id - A unique id representing a customer
chain - An integer representing a store chain
offer - An id representing a certain offer
market - An id representing a geographical region
repeattrips - The number of times the customer made a repeat purchase
repeater - A boolean, equal to repeattrips > 0
offerdate - The date a customer received the offer

transactions
id - see above
chain - see above
dept - An aggregate grouping of the Category (e.g. water)
category - The product category (e.g. sparkling water)
company - An id of the company that sells the item
brand - An id of the brand to which the item belongs
date - The date of purchase
productsize - The amount of the product purchase (e.g. 16 oz of water)
productmeasure - The units of the product purchase (e.g. ounces)
purchasequantity - The number of units purchased
purchaseamount - The dollar amount of the purchase

offers
offer - see above
category - see above
quantity - The number of units one must purchase to get the discount
company - see above
offervalue - The dollar value of the offer
brand - see above

The transactions file can be joined to the history file by (id,chain). The history file can be joined to the offers file by (offer). The transactions file can be joined to the offers file by (category, brand, company). A negative value in productquantity and purchaseamount indicates a return.

While we're at it, let's also see how many observations each dataset has, take a peek at the data and look for any missing values.

2.2.1 Offer details

First up, the count of observations in the dataset.


In [ ]:
offers.count()

A small, reference dataset.

Next question: how is the data typed within the Spark DataFrame?


In [ ]:
offers.schema

Spark has interpreted every field as a string. 🙄

There are two fields: offervalue and quantity that definitely should not be strings - let's fix them up now.


In [ ]:
offers = offers.withColumn("offervalue", offers["offervalue"].cast("double"))
offers = offers.withColumn("quantity", offers["quantity"].cast("double"))

Now we're ready to take a peak at the data.


In [ ]:
offers.show()

And finally, check for any records with a missing value in critical fields.


In [ ]:
offers.where(offers.offer.isNull() |
            offers.category.isNull() |
            offers.quantity.isNull() |
            offers.company.isNull() |
            offers.offervalue.isNull() |
            offers.brand.isNull()).count()

2.2.2 Customer-Offer History

Can you repeat the same operations here?

  1. Count the records;
  2. Inspect the type schema;
  3. Convert miss-typed fields using withColumn() and cast();
  4. Inspect the data; and
  5. Check the dataset for missing values.

In [ ]:
trainHistory.count()

A larger dataset that relates offers and customers.


In [ ]:
trainHistory.schema

Same deal. Let's do some conversion to numeric and datetime types.


In [ ]:
trainHistory = trainHistory.withColumn("repeattrips", trainHistory["repeattrips"].cast("double"))
trainHistory = trainHistory.withColumn("repeater", trainHistory["repeater"].cast("boolean"))
trainHistory = trainHistory.withColumn("repeater", trainHistory["repeater"].cast("double"))
trainHistory = trainHistory.withColumn("offerdate", trainHistory["offerdate"].cast("date"))

In [ ]:
trainHistory.show()

In [ ]:
trainHistory.where(trainHistory.chain.isNull() | 
                  trainHistory.market.isNull() |
                  trainHistory.repeattrips.isNull() |
                  trainHistory.repeater.isNull() |
                  trainHistory.offerdate.isNull()).count()

2.2.3 Transactions


In [ ]:
transactions.count()

The largest of the three datasets.


In [ ]:
transactions.schema

In [ ]:
transactions = transactions.withColumn("date", transactions["date"].cast("date"))
transactions = transactions.withColumn("productsize", transactions["productsize"].cast("double"))
transactions = transactions.withColumn("purchasequantity", transactions["purchasequantity"].cast("double"))
transactions = transactions.withColumn("purchaseamount", transactions["purchaseamount"].cast("double"))

In [ ]:
transactions.show()

In [ ]:
transactions.where(transactions.id.isNull() |
                  transactions.chain.isNull() |
                  transactions.dept.isNull() |
                  transactions.category.isNull() |
                  transactions.company.isNull() |
                  transactions.brand.isNull() |
                  transactions.date.isNull() |
                  transactions.productsize.isNull() |
                  transactions.productmeasure.isNull() |
                  transactions.purchasequantity.isNull() |
                  transactions.purchaseamount.isNull()).count()

2.3 Exploration

Let's begin interpreting the contents of these sets, starting with the range of dates over which the offers were presented to customers.


In [ ]:
trainHistory.agg(
    F.min("offerdate").alias("offerdate_min")
    , F.max("offerdate").alias("offerdate_max")).show()

What is the frequency of records across some of the categorical variables?


In [ ]:
trainHistory.groupBy(trainHistory.chain).count().orderBy("count", ascending = False).show(n = 50)

Probably too many of these to do anything useful without grouping.

Try for yourself with the market variable.


In [ ]:
trainHistory.groupBy(trainHistory.market).count().orderBy("count", ascending = False).show(n = 50)

Slightly more usable, perhaps we'll come back to this.


In [ ]:
trainHistory.describe(["repeattrips", "repeater"]).show()

First insight: 27% of customers to whom an offer is made become repeat shoppers. Does this vary across market?


In [ ]:
trainHistory.groupBy(trainHistory.market).agg(
    F.count("id").alias("customer_count")
    , F.avg("repeater").alias("response_rate")
    ).orderBy("response_rate", ascending = False).show()

There's a hypothesis emerging here that our larger markets may show the strongest response to offers. Let's plot it to check.


In [ ]:
count_vs_rate = trainHistory.groupBy(trainHistory.market).agg(
    F.count("id").alias("customer_count")
    , F.avg("repeater").alias("response_rate")
    ).orderBy("response_rate", ascending = False).toPandas()

In [ ]:
%matplotlib inline
import matplotlib.pyplot as plt
count_vs_rate.plot(kind='scatter', x='customer_count', y='response_rate')

In [ ]:
count_vs_rate[count_vs_rate.customer_count < 40000].plot(kind='scatter', x='customer_count', y='response_rate')

There is a weak relationship there, but we probably won't want to employ something as nuanced as this in our first iteration of analysis. Interesting to know though!

We understand a little bit of the offer history data. Let's just check for missing values.

Now we need to repeat the process across our other datasets. Let's start with the offers.


In [ ]:
offers.describe(["quantity", "offervalue"]).show()

Looks as though a small number of the offers have a different quantity value.


In [ ]:
offers.groupBy("quantity").count().show()

Interesting. Is this still applicable when we join offers to our history dataset?


In [ ]:
offers[offers.quantity == 2].show()

In [ ]:
trainHistory[trainHistory.offer=="1221658"].count()

No. In which case, it's not going to be significant for our analysis and modelling.
Do any of the categorical fields have few enough levels to enter into a simple classification model?


In [ ]:
offers.groupBy("company").count().orderBy("count", ascending = False).show()

In [ ]:
offers.groupBy("brand").count().orderBy("count", ascending = False).show()

In [ ]:
offers.groupBy("category").count().orderBy("count", ascending = False).show()

These might work in a classification tree model which automatically groups, or as a binned aggregate measure of the response rate across each.
Let's move on to the transactions.

What is the range of dates of our transactions?


In [ ]:
transactions.agg(
    F.min("date").alias("date_min")
    ,F.max("date").alias("date_max")).show()

Right, so up to a year before the offers were presented.
What are the ranges of purchasequantity and purchaseamount?


In [ ]:
transactions.describe(["productsize"
                       ,"productmeasure"
                       ,"purchasequantity"
                       , "purchaseamount"]).show()

OK, we have some returns data in here, too. We may need to account for this in a future iteration of modelling work.

2.4 Activity 2: Data understanding learning challenges

2.4.1 Data understanding learning challenge 1

Is there any pattern to the number of records in our history dataset by offerdate? Try plotting a graph to investigate.


In [ ]:
datatoplot = trainHistory.groupBy(?).?
datatoplot.orderBy(?).show()

In [ ]:
datatoplot.orderBy(?).?.plot(kind=?, x=?, y=?)

The graph above is a little messy and difficult to interpret.
Let's use a very quick and dirty approach to extracting the month, plot that and see if this is any easier to interpret.


In [ ]:
newdata = trainHistory.withColumn("offermonth", datatoplot["offerdate"]??)
datatoplot = newdata.groupBy(?).? #aggregate the data
datatoplot.? # plot the chart

We are now able to discriminate between the periods in our analysis, but we've lost the interesting pattern we saw before.

Extension tasks

  1. Try producing a summary of counts by week.
    Hint: take a look at the resample() function within Pandas or the pyspark.sql.functions functions next_day() or weekofyear().
  2. Is there a pattern relating to the day of the week offers are sent out? Is this an important indicator in response likelihood?
    Hint: pyspark.sql.functions.date_format() may be helpful.

In [ ]:
newdata = trainHistory.withColumn("offerweek", ?)
datatoplot = newdata.groupBy(?).?
datatoplot.?.?.plot(?)

In [ ]:
newdata = trainHistory.withColumn("dayofweek", ?) 
# hints: https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.functions.date_format
# a format string of "E" will return a day of the week abbreviation
datatoplot = newdata.groupBy(?).?
datatoplot.?.?.plot(?)

2.4.2 Data understanding learning challenge 2

In the data understanding phase, we run lots of operations against our data, and we need these to be optimised in order to complete the task in a timely fashion. In our case, the key differentiator is whether the function requires creation of a Python object (e.g. using Pandas), or whether it can run on a Spark data frame. Let's try using Pandas' implementation of 'describe'


In [ ]:
trainHistory.toPandas().?

In [ ]:
offers.?.?

Pyspark.sql also has an implementation of describe(), which we saw earlier. Note: be careful which order you run code, as you may need to declare which module to use a function from. Let's try to use a different set of Pyspark functionality to find the range of our continuous variables.


In [ ]:
transactions.agg(
    F.min("chain").alias("chain_min")
    ,F.max("chain").alias("chain_max")
    ,F.min("?").alias("?")
    ,F.max("?").alias("?")
    ?
    ?
    ?
    ?
    ?
    ?
).show()

From the exercise, it looks like there is something strange in one of the columns. Are all of the values positive and with a reasonable range? Let's take a look at some of the negative values


In [ ]:
transactions[transactions.??].show()

3. Data preparation

The output of this phase is a dataset with which we can build and test models.

3.1 Summarising data for use in modelling

Given that the aim of the task is to make customer-offer level predictions about likelihood to become a repeat purchaser, data that we use from the offers or transactions datasets will need to be joined to the history dataset.
We have also observed that the transactions dataset contains a large volume of datat, too much to enter into a model without aggregation to the customer, offer level. In aggregating this, our goal is to select an approach which generates features that:

a) retain as much information about the behaviour of these customers as possible; and
b) will be usable in our model (some algorithms can only accept numerical inputs, for example).

As a starter set, we will simply measure how much each customer had spent in the 30, 60, 90 and 180 days prior to being made an offer.
To do so, we will first need to join the offer history and transactions tables.


In [ ]:
offertxns = transactions.join(trainHistory.select(["id" , "chain", "offer", "offerdate", "repeater"]), ["id", "chain"], how = "inner")
offertxns.show(n=5)

Calculate "history" interval dates based on offerdate.


In [ ]:
offertxns = offertxns.withColumn("offerdate_30", F.date_sub(offertxns.offerdate, 30))
offertxns = offertxns.withColumn("offerdate_60", F.date_sub(offertxns.offerdate, 60))
offertxns = offertxns.withColumn("offerdate_90", F.date_sub(offertxns.offerdate, 90))
offertxns = offertxns.withColumn("offerdate_180", F.date_sub(offertxns.offerdate, 180))
offertxns.show(n=5)

We can employ a Spark "user defined function" to create corresponding aggregation flags to identify whether the transaction in scope of one of the history periods.


In [ ]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def inDateRange(date, date_lower, date_upper):
    if date >= date_lower and date <= date_upper: return 1
    else: return 0

udfInDateRange = udf(inDateRange, IntegerType())
    
offertxns = offertxns.withColumn("offerdate_30_tf", udfInDateRange(offertxns.date, offertxns.offerdate_30, offertxns.offerdate))
offertxns = offertxns.withColumn("offerdate_60_tf", udfInDateRange(offertxns.date, offertxns.offerdate_60, offertxns.offerdate))
offertxns = offertxns.withColumn("offerdate_90_tf", udfInDateRange(offertxns.date, offertxns.offerdate_90, offertxns.offerdate))
offertxns = offertxns.withColumn("offerdate_180_tf", udfInDateRange(offertxns.date, offertxns.offerdate_180, offertxns.offerdate))
offertxns.show(n=5)

At this point we can calculate the quantity and spend per customer per offer.
As an extension, you could join onto the offers table and create equivalent measures for quantity and spend in the same brand, company and category as the offer presented to the customer.


In [ ]:
offertxns = offertxns.withColumn("offerdate_30_qty", offertxns.purchasequantity * offertxns.offerdate_30_tf)
offertxns = offertxns.withColumn("offerdate_60_qty", offertxns.purchasequantity * offertxns.offerdate_60_tf)
offertxns = offertxns.withColumn("offerdate_90_qty", offertxns.purchasequantity * offertxns.offerdate_90_tf)
offertxns = offertxns.withColumn("offerdate_180_qty", offertxns.purchasequantity * offertxns.offerdate_180_tf)

offertxns = offertxns.withColumn("offerdate_30_amt", offertxns.purchaseamount * offertxns.offerdate_30_tf)
offertxns = offertxns.withColumn("offerdate_60_amt", offertxns.purchaseamount * offertxns.offerdate_60_tf)
offertxns = offertxns.withColumn("offerdate_90_amt", offertxns.purchaseamount * offertxns.offerdate_90_tf)
offertxns = offertxns.withColumn("offerdate_180_amt", offertxns.purchaseamount * offertxns.offerdate_180_tf)
offertxns.show(n=5)

In [ ]:
offertxnsSum = offertxns.groupBy(["id", "chain", "offer", "offerdate", "repeater"]).agg(
    F.sum("offerdate_30_qty").alias("qty_30")
    , F.sum("offerdate_60_qty").alias("qty_60")
    , F.sum("offerdate_90_qty").alias("qty_90")
    , F.sum("offerdate_180_qty").alias("qty_180")
    , F.sum("offerdate_30_amt").alias("amt_30")
    , F.sum("offerdate_60_amt").alias("amt_60")
    , F.sum("offerdate_90_amt").alias("amt_90")
    , F.sum("offerdate_180_amt").alias("amt_180"))
offertxnsSum.show(n=5)

3.2 Reshaping data

What is the average spend in these intervals?
Spark will allow us to calculate this quite easily. In order to plot this nicely, we will need help from the Python data wrangling library of choice: Pandas. Luckily Spark also offers an easy way to translate between the two types of object using the .toPandas() function.


In [ ]:
import pandas as pd
average_spend = offertxnsSum.groupBy("repeater").agg(
    F.avg("amt_30").alias("30")
    , F.avg("amt_60").alias("60")
    , F.avg("amt_90").alias("90")
    , F.avg("amt_180").alias("180")).toPandas()

average_spend_melt = pd.melt(average_spend, id_vars = "repeater", var_name = "interval_days", value_name = "spend_ave")
average_spend_melt["interval_days"] = pd.to_numeric(average_spend_melt["interval_days"])

average_spend_melt.head()

In [ ]:
average_spend_melt.plot(kind='scatter', x='interval_days', y='spend_ave', c="repeater")

3.3 Divertissement: interactive charting

(Maybe useful for the hack later...)

If we want to add some interactivity to our charts, one great option is the Bokeh library.


In [ ]:
from bokeh.plotting import figure
from bokeh.io import output_notebook, show
from bokeh.charts import Scatter
from bokeh.palettes import brewer

output_notebook()

In [ ]:
p = Scatter(average_spend_melt,
              x="interval_days",
              y="spend_ave",
            color = "repeater",
              title = "Comparison: purchase intervals and average spends",
             xlabel = "Purchase analysis interval",
             ylabel = "Average spend",
           palette = brewer["Dark2"][3])
show(p)

This gives us a very small level of interaction - but much more is possible!
You may have to move outside of the notebook environment to do this though (using output_file, for example).
Bokeh interactivity docs and examples

3.4 Activity 3: Data prep learning challenges

3.4.1 Data prep learning challenge 1

Let's apply some of what we've looked at together and engineer a new feature that may or may not be predictive and useful, but will at least give us some experience of working with the summarisation capability of pyspark.sql.

What we are looking to do is examine which departments customers have shopped into as a way of classifying their habits.

Let's start by identifying the five most popular departments in our transactions set. We can sample our data to achieve this quickly.


In [ ]:
(transactions
     .sample(False, 0.01, 42)
     .groupBy(?)
     .agg(?.alias("transaction_count"))
     .orderBy(?)
     .show(?))

We can use these to create a series of flags at the customer level which may be useful in the classification task. The task here is to create a flag for every customer, showing whether they have shopped in each of: department 99, department 35, 37, 56 and 26.


In [ ]:
customerDepts = (transactions
                 .withColumn(?, (?).?) # check out the cheat sheet for help here!
                    ...
                 .groupBy(?)
                 .agg(?.alias("dept_99"),
                     ...)) # how would you aggregate this to get a 'per customer' answer?

Inspect and generate some summary statistics for these fields.


In [ ]:
customerDepts.?

In [ ]:
customerDepts.?.?

Last piece of the puzzle: let's measure the level of correlation of these predictors.
(If everybody buys from the same departments then these will not be good predictor variables to use in a model.)

Pandas has a very slick way of doing this via. the .corr() function, but what do we need to do first to allow us to use our summary data?


In [ ]:
customerDepts.?.corr()

Qs.

  1. Are these good variables to add into the model?
  2. How else could we test their suitability?
  3. If we built these flags for a hundred departments: how could we use this information but still only add a small number of additional predictors into our model?

3.4.2 Data prep learning challenge 2

How could we add a new column to our offer history data, expressing the rank of the chain (by number of customers) within the market?

Wrangling pattern:

  1. Summarise customer counts by chain and market;
  2. Apply the rank() function to customer counts by market; then
  3. Join back to original dataframe.

Starting with (1) - can you use the agg() function to summarise the data appropriately?

  • Name your column of counts: customer_count
  • Name your new dataframe: historyCustCounts

In [ ]:
historyCustCount = (trainHistory
                    .groupBy(?,?)
                    .agg(?))
historyCustCount.show()
historyCustCount.count()

Moving on to (2), let's calculate a new column chain_market_rank within historyCustCount showing the ranking of chains by customer count within each market.
Hint: as this is a window function (calculates a quantity within a partition, in this case market), you'll need to specify the window specification using Window which is available in the pyspark.sql.window library.


In [ ]:
from ? import ?
w = ?.partitionBy(?).orderBy(?)
historyCustCount = historyCustCount.withColumn(?, ?.over(w))
historyCustCount.show()

Is this a good variable to use in our model? Let's plot the distribution of values as a histogram.

Again, Pandas has a very neat function: .hist() that allows us to plot histograms quickly.


In [ ]:
historyCustCount.?.hist(?) # check out the pandas docs for help with the arguments to hist()
# https://pandas.pydata.org/pandas-docs/stable/visualization.html#visualization-hist

Do you think this would be informative?
What other analysis could you do to support your assertion?

For completeness, let's go ahead and join this back to our original trainHistory dataframe.


In [ ]:
trainHistory = ?.join(?) # pyspark cheat sheet will help you here
trainHistory.show()

We don't need customer_count, let's go ahead and drop it.


In [ ]:
trainHistory = trainHistory.?
trainHistory.show()

3.4.3 Data prep learning challenge 3

This challenge is more open ended: you will work from a specification to produce a new predictor variable (i.e. without direction from the script).

This is the specification you have been provided:

Calculating "Customer first transaction interval"

We are hoping to create a variable to encapsulate information about how long a customer began shopping with the company prior to being offered an incentive.
To do this, you will need to:

  1. Find the first transaction for each customer in the transactions dataset;
  2. Compare that to the offerdate in the offer history dataset and calculate the number of days between the two (the datediff() function will help); and
  3. Plot (or otherwise analyse) the distribution and make a decision about whether you would include this in a model. You might consider sampling your data before plotting.

In [ ]:
firstCustTrans = (transactions
                  .groupBy(?)
                  .agg(?.alias("first_purch_date")))
firstCustTrans.show()

In [ ]:
firstCustTrans = (trainHistory
                  .select("id", "offerdate")
                  .join(firstCustTrans, ?)
                  .withColumn("shop_history_interval", ?))

firstCustTrans.show()

In [ ]:
firstCustTrans.sample(?).?.?

4. Modelling experiments

We're now ready to have a first pass at building a model.

4.1 Holdout partitioning

For those who have worked in targeted marketing, this approach will be quite familiar. The premise is to train a model based on one part of your dataset and evaluate its performance using the other.
Spark has a convenience function to do just that: randomSplit(). The arguments supplied to this function is an array of weights specifying the proportions by which the dataset should be split.


In [ ]:
offertxnsSum = offertxnsSum.withColumnRenamed("repeater","label")
splits = offertxnsSum.randomSplit([0.7,0.3])
trainSet = splits[0]
testSet = splits[1]

trainSet.show(n=5)

In [ ]:
trainSet.count()

Let's also cache these datafrmaes like we did with our original data sets.


In [ ]:
trainSet.cache()
testSet.cache()

Spark has an idiosyncratic way of taking input and we will have to employ the 'VectorAssembler' function to bundle up the features for us to input into the model.


In [ ]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["qty_30", "qty_60", "qty_90", "qty_180",
              "amt_30", "amt_60", "amt_90", "amt_180"],
    outputCol="features")

trainSetAssembled = assembler.transform(trainSet)
trainSetAssembled.show(n=5, truncate = False)

In [ ]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()
pipeline = Pipeline(stages=[assembler, lr])
model = pipeline.fit(trainSet)

We now have a fitted model!
What values have been chosen for the parameters?


In [ ]:
#[stage.coefficients for stage in model.stages if hasattr(stage, "coefficients")]
model.stages[1].coefficients

In [ ]:
prediction = model.transform(trainSet)
prediction.select("label","prediction", "probability", "features").show(5)

In [ ]:
from sklearn.metrics import roc_curve

prediction_collect = prediction.select("label", "probability").toPandas()
roc_inputs = [(float(i[1][0]), float(i[1][1][1])) for i in prediction_collect.iterrows()]
roc_inputs = pd.DataFrame(roc_inputs, columns = ["label","prob"])
fpr, tpr, _ = roc_curve(y_true = roc_inputs.label, 
                        y_score = roc_inputs.prob)

roc_lr_train = pd.DataFrame(
   {"FPR" : fpr
    ,"TPR" : tpr})

In [ ]:
fig = plt.figure()
ax = plt.axes()
ax.set(title = "Receiver-Operator Characteristic",
       xlabel = "False Positive Rate",
      ylabel = "True Positive Rate")

x = [0,1]
y = [0,1]
ax.plot(x, y)
ax.plot(roc_lr_train.FPR, roc_lr_train.TPR)
plt.show()

In [ ]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
accuracy = evaluator.evaluate(prediction)
print "Area under the ROC curve = %g " % accuracy

Not bad, the competition benchmark for this dataset is 0.59.
Before we start extending this, we should perform some diagnositcs.

First, let's look at where the misclassifications are occuring by creating a contingency table (a.k.a confusion matrix).


In [ ]:
prediction_collect = prediction.toPandas()
pd.crosstab(prediction_collect.label, prediction_collect.prediction)

What does this tell us? Is this a bad model?

And of course we should check if our model generalises well by scoring and evaluating the test set.

4.2 Modeling Exercises

4.2.1 Exercise 1: Scoring on the test set

Go ahead and use the pipeline we've already developed to transform the test set data. When you've done that, plot a ROC curve for the predictions and measure the AUC. Discuss and draw some conclusions about whether the model is generalising well.


In [ ]:
prediction = ?
prediction.select("label","prediction", "probability", "features").show(5)

Build a confusion matrix to calculate rate of misclassification


In [ ]:
prediction_collect = prediction.?
pd.crosstab(?)

In [ ]:
prediction_collect = prediction.? # different to the previous cell! Check out section 4.1 for help...
roc_inputs = ?
roc_lr_test = ?

In [ ]:
fig = plt.figure()
ax = plt.axes()
ax.set(title = "Receiver-Operator Characteristic",
       xlabel = "False Positive Rate",
      ylabel = "True Positive Rate")

x = [0,1]
y = [0,1]
ax.plot(x, y)
ax.plot(roc_lr_train.FPR, roc_lr_train.TPR)
ax.plot(?) # this is where you would plot your test set performance
# specify a different line colour or style to differentiate from the training set performance.
plt.show()

In [ ]:
accuracy = ?
print "Area under the ROC curve = %g " % accuracy

4.2.2 Exercise 2: Creating a Decision Tree

We've seen how to create a logistic regression model. However, this is a parametric model and requires making some distributional assumptions about our data. In some cases this is not appropriate and we need to use a non-parametric method. Let's go through the same approach using pyspark.ml.classification, and fit a decision tree.
Slight additional complexity here: regardless of the original data type of the target variable, this algorithm requires you to have processed it with StringIndexer (from pyspark.ml.feature) prior to sending it to the model. Hence we have an additional stage to this pipeline.


In [ ]:
from pyspark.ml.classification import ?
from pyspark.ml.feature import StringIndexer
si = StringIndexer(inputCol="label", outputCol="indexed")
dt = DecisionTreeClassifier(labelCol=?)
pipeline_dt = Pipeline(stages=[?, ?, ?])
model_dt = ?

The first thing we did with our logistic regression was to look at the parameter values. There is no equivalent for decision trees. Instead, there is a featureImportances object which will give us similar useful information about the model. Extract it from the pipeline in the same way we did for the coefficients of our logistic regression.


In [ ]:
model_dt.stages[?].?

Go ahead and measure the AUC metric as before using the trains and test sets.


In [ ]:
prediction_dt = ?
prediction_dt.select("label", "prediction", "probability").show(5)

In [ ]:
evaluator_dt = BinaryClassificationEvaluator(?)
accuracy_dt = ?
print "Area under the ROC curve = %g " % accuracy_dt

In [ ]:
prediction_collect = ?
pd.crosstab(?)

Now, there is a stochastic element to the building of these models, but in preparation for Cognihack, something felt strange about the AUC we were getting. Have a chat with your teams about why this may be the case, in particular in the context of parallel computing.

Extension:

  • Try recreating the decision tree model, carefully selecting your features.
  • What about any more data derivations you could include
  • Try selecting the parameters of the decision tree, such as depth and minimum split size
  • Consider other classification algorithms, either from pyspark.ml or elsewhere

5. Deploy and score

With the advent of Watson Machine Learning, we can quite easily deploy our model to a cloud scoring service.

5.1 Persist Spark model within the ML repository

The first step here is to import the relevant client libraries.


In [ ]:
from repository.mlrepositoryclient import MLRepositoryClient
from repository.mlrepositoryartifact import MLRepositoryArtifact

Tip: service_path, user and password can be found on Service Credentials tab of service instance created in Bluemix.

For example, the following code:

wml_service_path = "https://ibm-watson-ml.mybluemix.net"  
wml_username = "ebecda6c-a18b-4c6f-82e4-4c7fc26361f4"  
wml_password = "4705d497-fcc0-4e1c-9f55-934b13b13fb2"

Will create the necessary credentials to connect to the Watson ML service. Just substitute in your own in the place of these example values.


In [ ]:
# The code was removed by DSX for sharing.

In [ ]:
ml_repository_client = MLRepositoryClient(wml_service_path)
ml_repository_client.authorize(wml_username, wml_password)

Create model artifact (abstraction layer).


In [ ]:
model_artifact = MLRepositoryArtifact(model, training_data=trainSet, name="Repeat Buyer Prediction Model")

Tip: The MLRepositoryArtifact method expects a trained model object, training data, and a model name. (It is this model name that is displayed by the Watson Machine Learning service).

We can now save our model to the repository.


In [ ]:
saved_model = ml_repository_client.models.save(model_artifact)

Get saved model metadata from Watson Machine Learning.
Tip: Use meta.available_props() to get the list of available props.


In [ ]:
saved_model.meta.available_props()

In [ ]:
print "modelType: " + saved_model.meta.prop("modelType")
print "trainingDataSchema: " + str(saved_model.meta.prop("trainingDataSchema"))
print "creationTime: " + str(saved_model.meta.prop("creationTime"))
print "modelVersionHref: " + saved_model.meta.prop("modelVersionHref")
print "label: " + saved_model.meta.prop("label")

Tip: modelVersionHref is our model unique indentifier in the Watson Machine Learning repository.

5.2 Create an online scoring endpoint

In this section you will learn how to create online scoring and to score a new data record by using the Watson Machine Learning REST API.
For more information about REST APIs, see the Swagger Documentation.
To work with the Watson Machine Leraning REST API you must generate an access token. To do that you can use the following sample code:


In [ ]:
import urllib3, requests, json

headers = urllib3.util.make_headers(basic_auth='{}:{}'.format(wml_username, wml_password))
url = '{}/v2/identity/token'.format(wml_service_path)
response = requests.get(url, headers=headers)
mltoken = json.loads(response.text).get('token')
print mltoken

You can now create an online scoring endpoint. Execute the following sample code that uses the modelVersionHref value to create the scoring endpoint to the Bluemix repository.


In [ ]:
endpoint_online = wml_service_path + "/v2/online/deployments/"
header_online = {'Content-Type': 'application/json', 'Authorization': mltoken}
payload_online = {"artifactVersionHref": saved_model.meta.prop("modelVersionHref"), "name": "Repeat Shopper Prediction"}

print endpoint_online
print header_online
print payload_online

response_online = requests.post(endpoint_online, json=payload_online, headers=header_online)

print response_online
print response_online.text

scoring_href = json.loads(response_online.text).get('entity').get('scoringHref')
print scoring_href

Let's see what happens when we send a PUT request to our new endpoint containing a new scoring record. The model should hopefully return some predictions.


In [ ]:
payload_scoring = {
    "record":[
        "42", #id                     
        "8620", #chain
        "400", #offer
        "2017-6-5", #offerdate
        5, #qty_30
        10, #qty_60
        15, #qty_90
        20, #qty_180
        50, #amt_30
        100, #amt_60
        150, #amt_90
        200, #amt_180
    ]}

response_scoring = requests.put(scoring_href, json=payload_scoring, headers=header_online)

print response_scoring.text

In [ ]: