Cognihack Data Science Track |
---|
Predicting repeat shopping likelihood with Python, Spark and Watson Machine Learning |
This notebook will take you through the process of creating a predictive model in Python using the data manipulation and machine learning libraries distributed with Spark.
Once we've worked through the process of reading, understanding and preparing our data and have built a simple model together, we'll deploy it to the Watson Machine Learning service and make it available as a real-time scoring service.
You should spend the remaining time working as a group to speculate on how you might improve these predictions. The cognihack tutors will endeavour to assist with any experimentation to help you create and evaluate refinements to the baseline model.
The learning goals of this exercise are:
This notebook contains the following parts:
Before we begin working through this notebook, you must perform the following setup tasks:
pyspark.sql will help us load and manipulate our data from Python while it resides in Spark.
In [ ]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
This last statement will make some aggregation and mutation statements look a little different to the cheat sheet as spark functions will be prefixed by F.
The spark.read function will import data directly into our spark instance, without touching the environment in which Python is running.
In [ ]:
spark = SparkSession.builder.getOrCreate()
dash = {
'jdbcurl': 'jdbc:db2://dashdb-entry-yp-dal09-10.services.dal.bluemix.net:50000/BLUDB',
'user': 'dash14210',
'password': 'W@sI$Ea8H8fz',
}
offers = spark.read.jdbc(dash['jdbcurl'],
table='DASH14210.OFFERS',
properties={"user" : dash["user"],
"password" : dash["password"]})
trainHistory = spark.read.jdbc(dash['jdbcurl'],
table='DASH14210.HISTORY',
properties={"user" : dash["user"],
"password" : dash["password"]})
transactions = spark.read.jdbc(dash['jdbcurl'],
table='DASH14210.TRANSACTIONS',
properties={"user" : dash["user"],
"password" : dash["password"]})
offers.cache()
trainHistory.cache()
transactions.cache()
When this is done, we are left with a Python object which is a logical pointer to a Spark DataFrame object.
Notice as well that Spark will give us some information on how it is breaking down the heavy lifting into discrete and parallelisable tasks.
Like many data manipulation libraries, Spark makes its own decisions about how to read in data.
To check it has treated our data as we expected, we need to compare the schemata of our Spark DataFrames to that provided with the datasets:
history
id - A unique id representing a customer
chain - An integer representing a store chain
offer - An id representing a certain offer
market - An id representing a geographical region
repeattrips - The number of times the customer made a repeat purchase
repeater - A boolean, equal to repeattrips > 0
offerdate - The date a customer received the offertransactions
id - see above
chain - see above
dept - An aggregate grouping of the Category (e.g. water)
category - The product category (e.g. sparkling water)
company - An id of the company that sells the item
brand - An id of the brand to which the item belongs
date - The date of purchase
productsize - The amount of the product purchase (e.g. 16 oz of water)
productmeasure - The units of the product purchase (e.g. ounces)
purchasequantity - The number of units purchased
purchaseamount - The dollar amount of the purchaseoffers
offer - see above
category - see above
quantity - The number of units one must purchase to get the discount
company - see above
offervalue - The dollar value of the offer
brand - see aboveThe transactions file can be joined to the history file by (id,chain). The history file can be joined to the offers file by (offer). The transactions file can be joined to the offers file by (category, brand, company). A negative value in productquantity and purchaseamount indicates a return.
While we're at it, let's also see how many observations each dataset has, take a peek at the data and look for any missing values.
First up, the count of observations in the dataset.
In [ ]:
offers.count()
A small, reference dataset.
Next question: how is the data typed within the Spark DataFrame?
In [ ]:
offers.schema
Spark has interpreted every field as a string. 🙄
There are two fields: offervalue and quantity that definitely should not be strings - let's fix them up now.
In [ ]:
offers = offers.withColumn("offervalue", offers["offervalue"].cast("double"))
offers = offers.withColumn("quantity", offers["quantity"].cast("double"))
Now we're ready to take a peak at the data.
In [ ]:
offers.show()
And finally, check for any records with a missing value in critical fields.
In [ ]:
offers.where(offers.offer.isNull() |
offers.category.isNull() |
offers.quantity.isNull() |
offers.company.isNull() |
offers.offervalue.isNull() |
offers.brand.isNull()).count()
Can you repeat the same operations here?
In [ ]:
trainHistory.count()
A larger dataset that relates offers and customers.
In [ ]:
trainHistory.schema
Same deal. Let's do some conversion to numeric and datetime types.
In [ ]:
trainHistory = trainHistory.withColumn("repeattrips", trainHistory["repeattrips"].cast("double"))
trainHistory = trainHistory.withColumn("repeater", trainHistory["repeater"].cast("boolean"))
trainHistory = trainHistory.withColumn("repeater", trainHistory["repeater"].cast("double"))
trainHistory = trainHistory.withColumn("offerdate", trainHistory["offerdate"].cast("date"))
In [ ]:
trainHistory.show()
In [ ]:
trainHistory.where(trainHistory.chain.isNull() |
trainHistory.market.isNull() |
trainHistory.repeattrips.isNull() |
trainHistory.repeater.isNull() |
trainHistory.offerdate.isNull()).count()
In [ ]:
transactions.count()
The largest of the three datasets.
In [ ]:
transactions.schema
In [ ]:
transactions = transactions.withColumn("date", transactions["date"].cast("date"))
transactions = transactions.withColumn("productsize", transactions["productsize"].cast("double"))
transactions = transactions.withColumn("purchasequantity", transactions["purchasequantity"].cast("double"))
transactions = transactions.withColumn("purchaseamount", transactions["purchaseamount"].cast("double"))
In [ ]:
transactions.show()
In [ ]:
transactions.where(transactions.id.isNull() |
transactions.chain.isNull() |
transactions.dept.isNull() |
transactions.category.isNull() |
transactions.company.isNull() |
transactions.brand.isNull() |
transactions.date.isNull() |
transactions.productsize.isNull() |
transactions.productmeasure.isNull() |
transactions.purchasequantity.isNull() |
transactions.purchaseamount.isNull()).count()
In [ ]:
trainHistory.agg(
F.min("offerdate").alias("offerdate_min")
, F.max("offerdate").alias("offerdate_max")).show()
What is the frequency of records across some of the categorical variables?
In [ ]:
trainHistory.groupBy(trainHistory.chain).count().orderBy("count", ascending = False).show(n = 50)
Probably too many of these to do anything useful without grouping.
Try for yourself with the market variable.
In [ ]:
trainHistory.groupBy(trainHistory.market).count().orderBy("count", ascending = False).show(n = 50)
Slightly more usable, perhaps we'll come back to this.
In [ ]:
trainHistory.describe(["repeattrips", "repeater"]).show()
First insight: 27% of customers to whom an offer is made become repeat shoppers. Does this vary across market?
In [ ]:
trainHistory.groupBy(trainHistory.market).agg(
F.count("id").alias("customer_count")
, F.avg("repeater").alias("response_rate")
).orderBy("response_rate", ascending = False).show()
There's a hypothesis emerging here that our larger markets may show the strongest response to offers. Let's plot it to check.
In [ ]:
count_vs_rate = trainHistory.groupBy(trainHistory.market).agg(
F.count("id").alias("customer_count")
, F.avg("repeater").alias("response_rate")
).orderBy("response_rate", ascending = False).toPandas()
In [ ]:
%matplotlib inline
import matplotlib.pyplot as plt
count_vs_rate.plot(kind='scatter', x='customer_count', y='response_rate')
In [ ]:
count_vs_rate[count_vs_rate.customer_count < 40000].plot(kind='scatter', x='customer_count', y='response_rate')
There is a weak relationship there, but we probably won't want to employ something as nuanced as this in our first iteration of analysis. Interesting to know though!
We understand a little bit of the offer history data. Let's just check for missing values.
Now we need to repeat the process across our other datasets. Let's start with the offers.
In [ ]:
offers.describe(["quantity", "offervalue"]).show()
Looks as though a small number of the offers have a different quantity value.
In [ ]:
offers.groupBy("quantity").count().show()
Interesting. Is this still applicable when we join offers to our history dataset?
In [ ]:
offers[offers.quantity == 2].show()
In [ ]:
trainHistory[trainHistory.offer=="1221658"].count()
No. In which case, it's not going to be significant for our analysis and modelling.
Do any of the categorical fields have few enough levels to enter into a simple classification model?
In [ ]:
offers.groupBy("company").count().orderBy("count", ascending = False).show()
In [ ]:
offers.groupBy("brand").count().orderBy("count", ascending = False).show()
In [ ]:
offers.groupBy("category").count().orderBy("count", ascending = False).show()
These might work in a classification tree model which automatically groups, or as a binned aggregate measure of the response rate across each.
Let's move on to the transactions.
What is the range of dates of our transactions?
In [ ]:
transactions.agg(
F.min("date").alias("date_min")
,F.max("date").alias("date_max")).show()
Right, so up to a year before the offers were presented.
What are the ranges of purchasequantity and purchaseamount?
In [ ]:
transactions.describe(["productsize"
,"productmeasure"
,"purchasequantity"
, "purchaseamount"]).show()
OK, we have some returns data in here, too. We may need to account for this in a future iteration of modelling work.
In [ ]:
datatoplot = trainHistory.groupBy(?).?
datatoplot.orderBy(?).show()
In [ ]:
datatoplot.orderBy(?).?.plot(kind=?, x=?, y=?)
The graph above is a little messy and difficult to interpret.
Let's use a very quick and dirty approach to extracting the month, plot that and see if this is any easier to interpret.
In [ ]:
newdata = trainHistory.withColumn("offermonth", datatoplot["offerdate"]??)
datatoplot = newdata.groupBy(?).? #aggregate the data
datatoplot.? # plot the chart
We are now able to discriminate between the periods in our analysis, but we've lost the interesting pattern we saw before.
In [ ]:
newdata = trainHistory.withColumn("offerweek", ?)
datatoplot = newdata.groupBy(?).?
datatoplot.?.?.plot(?)
In [ ]:
newdata = trainHistory.withColumn("dayofweek", ?)
# hints: https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.functions.date_format
# a format string of "E" will return a day of the week abbreviation
datatoplot = newdata.groupBy(?).?
datatoplot.?.?.plot(?)
In the data understanding phase, we run lots of operations against our data, and we need these to be optimised in order to complete the task in a timely fashion. In our case, the key differentiator is whether the function requires creation of a Python object (e.g. using Pandas), or whether it can run on a Spark data frame. Let's try using Pandas' implementation of 'describe'
In [ ]:
trainHistory.toPandas().?
In [ ]:
offers.?.?
Pyspark.sql also has an implementation of describe(), which we saw earlier. Note: be careful which order you run code, as you may need to declare which module to use a function from. Let's try to use a different set of Pyspark functionality to find the range of our continuous variables.
In [ ]:
transactions.agg(
F.min("chain").alias("chain_min")
,F.max("chain").alias("chain_max")
,F.min("?").alias("?")
,F.max("?").alias("?")
?
?
?
?
?
?
).show()
From the exercise, it looks like there is something strange in one of the columns. Are all of the values positive and with a reasonable range? Let's take a look at some of the negative values
In [ ]:
transactions[transactions.??].show()
Given that the aim of the task is to make customer-offer level predictions about likelihood to become a repeat purchaser, data that we use from the offers or transactions datasets will need to be joined to the history dataset.
We have also observed that the transactions dataset contains a large volume of datat, too much to enter into a model without aggregation to the customer, offer level. In aggregating this, our goal is to select an approach which generates features that:
a) retain as much information about the behaviour of these customers as possible; and
b) will be usable in our model (some algorithms can only accept numerical inputs, for example).
As a starter set, we will simply measure how much each customer had spent in the 30, 60, 90 and 180 days prior to being made an offer.
To do so, we will first need to join the offer history and transactions tables.
In [ ]:
offertxns = transactions.join(trainHistory.select(["id" , "chain", "offer", "offerdate", "repeater"]), ["id", "chain"], how = "inner")
offertxns.show(n=5)
Calculate "history" interval dates based on offerdate.
In [ ]:
offertxns = offertxns.withColumn("offerdate_30", F.date_sub(offertxns.offerdate, 30))
offertxns = offertxns.withColumn("offerdate_60", F.date_sub(offertxns.offerdate, 60))
offertxns = offertxns.withColumn("offerdate_90", F.date_sub(offertxns.offerdate, 90))
offertxns = offertxns.withColumn("offerdate_180", F.date_sub(offertxns.offerdate, 180))
offertxns.show(n=5)
We can employ a Spark "user defined function" to create corresponding aggregation flags to identify whether the transaction in scope of one of the history periods.
In [ ]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def inDateRange(date, date_lower, date_upper):
if date >= date_lower and date <= date_upper: return 1
else: return 0
udfInDateRange = udf(inDateRange, IntegerType())
offertxns = offertxns.withColumn("offerdate_30_tf", udfInDateRange(offertxns.date, offertxns.offerdate_30, offertxns.offerdate))
offertxns = offertxns.withColumn("offerdate_60_tf", udfInDateRange(offertxns.date, offertxns.offerdate_60, offertxns.offerdate))
offertxns = offertxns.withColumn("offerdate_90_tf", udfInDateRange(offertxns.date, offertxns.offerdate_90, offertxns.offerdate))
offertxns = offertxns.withColumn("offerdate_180_tf", udfInDateRange(offertxns.date, offertxns.offerdate_180, offertxns.offerdate))
offertxns.show(n=5)
At this point we can calculate the quantity and spend per customer per offer.
As an extension, you could join onto the offers table and create equivalent measures for quantity and spend in the same brand, company and category as the offer presented to the customer.
In [ ]:
offertxns = offertxns.withColumn("offerdate_30_qty", offertxns.purchasequantity * offertxns.offerdate_30_tf)
offertxns = offertxns.withColumn("offerdate_60_qty", offertxns.purchasequantity * offertxns.offerdate_60_tf)
offertxns = offertxns.withColumn("offerdate_90_qty", offertxns.purchasequantity * offertxns.offerdate_90_tf)
offertxns = offertxns.withColumn("offerdate_180_qty", offertxns.purchasequantity * offertxns.offerdate_180_tf)
offertxns = offertxns.withColumn("offerdate_30_amt", offertxns.purchaseamount * offertxns.offerdate_30_tf)
offertxns = offertxns.withColumn("offerdate_60_amt", offertxns.purchaseamount * offertxns.offerdate_60_tf)
offertxns = offertxns.withColumn("offerdate_90_amt", offertxns.purchaseamount * offertxns.offerdate_90_tf)
offertxns = offertxns.withColumn("offerdate_180_amt", offertxns.purchaseamount * offertxns.offerdate_180_tf)
offertxns.show(n=5)
In [ ]:
offertxnsSum = offertxns.groupBy(["id", "chain", "offer", "offerdate", "repeater"]).agg(
F.sum("offerdate_30_qty").alias("qty_30")
, F.sum("offerdate_60_qty").alias("qty_60")
, F.sum("offerdate_90_qty").alias("qty_90")
, F.sum("offerdate_180_qty").alias("qty_180")
, F.sum("offerdate_30_amt").alias("amt_30")
, F.sum("offerdate_60_amt").alias("amt_60")
, F.sum("offerdate_90_amt").alias("amt_90")
, F.sum("offerdate_180_amt").alias("amt_180"))
offertxnsSum.show(n=5)
What is the average spend in these intervals?
Spark will allow us to calculate this quite easily. In order to plot this nicely, we will need help from the Python data wrangling library of choice: Pandas. Luckily Spark also offers an easy way to translate between the two types of object using the .toPandas() function.
In [ ]:
import pandas as pd
average_spend = offertxnsSum.groupBy("repeater").agg(
F.avg("amt_30").alias("30")
, F.avg("amt_60").alias("60")
, F.avg("amt_90").alias("90")
, F.avg("amt_180").alias("180")).toPandas()
average_spend_melt = pd.melt(average_spend, id_vars = "repeater", var_name = "interval_days", value_name = "spend_ave")
average_spend_melt["interval_days"] = pd.to_numeric(average_spend_melt["interval_days"])
average_spend_melt.head()
In [ ]:
average_spend_melt.plot(kind='scatter', x='interval_days', y='spend_ave', c="repeater")
In [ ]:
from bokeh.plotting import figure
from bokeh.io import output_notebook, show
from bokeh.charts import Scatter
from bokeh.palettes import brewer
output_notebook()
In [ ]:
p = Scatter(average_spend_melt,
x="interval_days",
y="spend_ave",
color = "repeater",
title = "Comparison: purchase intervals and average spends",
xlabel = "Purchase analysis interval",
ylabel = "Average spend",
palette = brewer["Dark2"][3])
show(p)
This gives us a very small level of interaction - but much more is possible!
You may have to move outside of the notebook environment to do this though (using output_file, for example).
Bokeh interactivity docs and examples
Let's apply some of what we've looked at together and engineer a new feature that may or may not be predictive and useful, but will at least give us some experience of working with the summarisation capability of pyspark.sql.
What we are looking to do is examine which departments customers have shopped into as a way of classifying their habits.
Let's start by identifying the five most popular departments in our transactions set. We can sample our data to achieve this quickly.
In [ ]:
(transactions
.sample(False, 0.01, 42)
.groupBy(?)
.agg(?.alias("transaction_count"))
.orderBy(?)
.show(?))
We can use these to create a series of flags at the customer level which may be useful in the classification task. The task here is to create a flag for every customer, showing whether they have shopped in each of: department 99, department 35, 37, 56 and 26.
In [ ]:
customerDepts = (transactions
.withColumn(?, (?).?) # check out the cheat sheet for help here!
...
.groupBy(?)
.agg(?.alias("dept_99"),
...)) # how would you aggregate this to get a 'per customer' answer?
Inspect and generate some summary statistics for these fields.
In [ ]:
customerDepts.?
In [ ]:
customerDepts.?.?
Last piece of the puzzle: let's measure the level of correlation of these predictors.
(If everybody buys from the same departments then these will not be good predictor variables to use in a model.)
Pandas has a very slick way of doing this via. the .corr() function, but what do we need to do first to allow us to use our summary data?
In [ ]:
customerDepts.?.corr()
Starting with (1) - can you use the agg() function to summarise the data appropriately?
In [ ]:
historyCustCount = (trainHistory
.groupBy(?,?)
.agg(?))
historyCustCount.show()
historyCustCount.count()
Moving on to (2), let's calculate a new column chain_market_rank within historyCustCount showing the ranking of chains by customer count within each market.
Hint: as this is a window function (calculates a quantity within a partition, in this case market), you'll need to specify the window specification using Window which is available in the pyspark.sql.window library.
In [ ]:
from ? import ?
w = ?.partitionBy(?).orderBy(?)
historyCustCount = historyCustCount.withColumn(?, ?.over(w))
historyCustCount.show()
Is this a good variable to use in our model? Let's plot the distribution of values as a histogram.
Again, Pandas has a very neat function: .hist() that allows us to plot histograms quickly.
In [ ]:
historyCustCount.?.hist(?) # check out the pandas docs for help with the arguments to hist()
# https://pandas.pydata.org/pandas-docs/stable/visualization.html#visualization-hist
Do you think this would be informative?
What other analysis could you do to support your assertion?
For completeness, let's go ahead and join this back to our original trainHistory dataframe.
In [ ]:
trainHistory = ?.join(?) # pyspark cheat sheet will help you here
trainHistory.show()
We don't need customer_count, let's go ahead and drop it.
In [ ]:
trainHistory = trainHistory.?
trainHistory.show()
This is the specification you have been provided:
We are hoping to create a variable to encapsulate information about how long a customer began shopping with the company prior to being offered an incentive.
To do this, you will need to:
In [ ]:
firstCustTrans = (transactions
.groupBy(?)
.agg(?.alias("first_purch_date")))
firstCustTrans.show()
In [ ]:
firstCustTrans = (trainHistory
.select("id", "offerdate")
.join(firstCustTrans, ?)
.withColumn("shop_history_interval", ?))
firstCustTrans.show()
In [ ]:
firstCustTrans.sample(?).?.?
For those who have worked in targeted marketing, this approach will be quite familiar. The premise is to train a model based on one part of your dataset and evaluate its performance using the other.
Spark has a convenience function to do just that: randomSplit(). The arguments supplied to this function is an array of weights specifying the proportions by which the dataset should be split.
In [ ]:
offertxnsSum = offertxnsSum.withColumnRenamed("repeater","label")
splits = offertxnsSum.randomSplit([0.7,0.3])
trainSet = splits[0]
testSet = splits[1]
trainSet.show(n=5)
In [ ]:
trainSet.count()
Let's also cache these datafrmaes like we did with our original data sets.
In [ ]:
trainSet.cache()
testSet.cache()
Spark has an idiosyncratic way of taking input and we will have to employ the 'VectorAssembler' function to bundle up the features for us to input into the model.
In [ ]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["qty_30", "qty_60", "qty_90", "qty_180",
"amt_30", "amt_60", "amt_90", "amt_180"],
outputCol="features")
trainSetAssembled = assembler.transform(trainSet)
trainSetAssembled.show(n=5, truncate = False)
In [ ]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()
pipeline = Pipeline(stages=[assembler, lr])
model = pipeline.fit(trainSet)
We now have a fitted model!
What values have been chosen for the parameters?
In [ ]:
#[stage.coefficients for stage in model.stages if hasattr(stage, "coefficients")]
model.stages[1].coefficients
In [ ]:
prediction = model.transform(trainSet)
prediction.select("label","prediction", "probability", "features").show(5)
In [ ]:
from sklearn.metrics import roc_curve
prediction_collect = prediction.select("label", "probability").toPandas()
roc_inputs = [(float(i[1][0]), float(i[1][1][1])) for i in prediction_collect.iterrows()]
roc_inputs = pd.DataFrame(roc_inputs, columns = ["label","prob"])
fpr, tpr, _ = roc_curve(y_true = roc_inputs.label,
y_score = roc_inputs.prob)
roc_lr_train = pd.DataFrame(
{"FPR" : fpr
,"TPR" : tpr})
In [ ]:
fig = plt.figure()
ax = plt.axes()
ax.set(title = "Receiver-Operator Characteristic",
xlabel = "False Positive Rate",
ylabel = "True Positive Rate")
x = [0,1]
y = [0,1]
ax.plot(x, y)
ax.plot(roc_lr_train.FPR, roc_lr_train.TPR)
plt.show()
In [ ]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(
labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
accuracy = evaluator.evaluate(prediction)
print "Area under the ROC curve = %g " % accuracy
Not bad, the competition benchmark for this dataset is 0.59.
Before we start extending this, we should perform some diagnositcs.
First, let's look at where the misclassifications are occuring by creating a contingency table (a.k.a confusion matrix).
In [ ]:
prediction_collect = prediction.toPandas()
pd.crosstab(prediction_collect.label, prediction_collect.prediction)
What does this tell us? Is this a bad model?
And of course we should check if our model generalises well by scoring and evaluating the test set.
Go ahead and use the pipeline we've already developed to transform the test set data. When you've done that, plot a ROC curve for the predictions and measure the AUC. Discuss and draw some conclusions about whether the model is generalising well.
In [ ]:
prediction = ?
prediction.select("label","prediction", "probability", "features").show(5)
Build a confusion matrix to calculate rate of misclassification
In [ ]:
prediction_collect = prediction.?
pd.crosstab(?)
In [ ]:
prediction_collect = prediction.? # different to the previous cell! Check out section 4.1 for help...
roc_inputs = ?
roc_lr_test = ?
In [ ]:
fig = plt.figure()
ax = plt.axes()
ax.set(title = "Receiver-Operator Characteristic",
xlabel = "False Positive Rate",
ylabel = "True Positive Rate")
x = [0,1]
y = [0,1]
ax.plot(x, y)
ax.plot(roc_lr_train.FPR, roc_lr_train.TPR)
ax.plot(?) # this is where you would plot your test set performance
# specify a different line colour or style to differentiate from the training set performance.
plt.show()
In [ ]:
accuracy = ?
print "Area under the ROC curve = %g " % accuracy
We've seen how to create a logistic regression model. However, this is a parametric model and requires making some distributional assumptions about our data. In some cases this is not appropriate and we need to use a non-parametric method. Let's go through the same approach using pyspark.ml.classification, and fit a decision tree.
Slight additional complexity here: regardless of the original data type of the target variable, this algorithm requires you to have processed it with StringIndexer (from pyspark.ml.feature) prior to sending it to the model. Hence we have an additional stage to this pipeline.
In [ ]:
from pyspark.ml.classification import ?
from pyspark.ml.feature import StringIndexer
si = StringIndexer(inputCol="label", outputCol="indexed")
dt = DecisionTreeClassifier(labelCol=?)
pipeline_dt = Pipeline(stages=[?, ?, ?])
model_dt = ?
The first thing we did with our logistic regression was to look at the parameter values. There is no equivalent for decision trees. Instead, there is a featureImportances object which will give us similar useful information about the model. Extract it from the pipeline in the same way we did for the coefficients of our logistic regression.
In [ ]:
model_dt.stages[?].?
Go ahead and measure the AUC metric as before using the trains and test sets.
In [ ]:
prediction_dt = ?
prediction_dt.select("label", "prediction", "probability").show(5)
In [ ]:
evaluator_dt = BinaryClassificationEvaluator(?)
accuracy_dt = ?
print "Area under the ROC curve = %g " % accuracy_dt
In [ ]:
prediction_collect = ?
pd.crosstab(?)
Now, there is a stochastic element to the building of these models, but in preparation for Cognihack, something felt strange about the AUC we were getting. Have a chat with your teams about why this may be the case, in particular in the context of parallel computing.
In [ ]:
from repository.mlrepositoryclient import MLRepositoryClient
from repository.mlrepositoryartifact import MLRepositoryArtifact
Tip: service_path, user and password can be found on Service Credentials tab of service instance created in Bluemix.
For example, the following code:
wml_service_path = "https://ibm-watson-ml.mybluemix.net"
wml_username = "ebecda6c-a18b-4c6f-82e4-4c7fc26361f4"
wml_password = "4705d497-fcc0-4e1c-9f55-934b13b13fb2"
Will create the necessary credentials to connect to the Watson ML service. Just substitute in your own in the place of these example values.
In [ ]:
# The code was removed by DSX for sharing.
In [ ]:
ml_repository_client = MLRepositoryClient(wml_service_path)
ml_repository_client.authorize(wml_username, wml_password)
Create model artifact (abstraction layer).
In [ ]:
model_artifact = MLRepositoryArtifact(model, training_data=trainSet, name="Repeat Buyer Prediction Model")
Tip: The MLRepositoryArtifact method expects a trained model object, training data, and a model name. (It is this model name that is displayed by the Watson Machine Learning service).
We can now save our model to the repository.
In [ ]:
saved_model = ml_repository_client.models.save(model_artifact)
Get saved model metadata from Watson Machine Learning.
Tip: Use meta.available_props() to get the list of available props.
In [ ]:
saved_model.meta.available_props()
In [ ]:
print "modelType: " + saved_model.meta.prop("modelType")
print "trainingDataSchema: " + str(saved_model.meta.prop("trainingDataSchema"))
print "creationTime: " + str(saved_model.meta.prop("creationTime"))
print "modelVersionHref: " + saved_model.meta.prop("modelVersionHref")
print "label: " + saved_model.meta.prop("label")
Tip: modelVersionHref is our model unique indentifier in the Watson Machine Learning repository.
In this section you will learn how to create online scoring and to score a new data record by using the Watson Machine Learning REST API.
For more information about REST APIs, see the Swagger Documentation.
To work with the Watson Machine Leraning REST API you must generate an access token. To do that you can use the following sample code:
In [ ]:
import urllib3, requests, json
headers = urllib3.util.make_headers(basic_auth='{}:{}'.format(wml_username, wml_password))
url = '{}/v2/identity/token'.format(wml_service_path)
response = requests.get(url, headers=headers)
mltoken = json.loads(response.text).get('token')
print mltoken
You can now create an online scoring endpoint. Execute the following sample code that uses the modelVersionHref value to create the scoring endpoint to the Bluemix repository.
In [ ]:
endpoint_online = wml_service_path + "/v2/online/deployments/"
header_online = {'Content-Type': 'application/json', 'Authorization': mltoken}
payload_online = {"artifactVersionHref": saved_model.meta.prop("modelVersionHref"), "name": "Repeat Shopper Prediction"}
print endpoint_online
print header_online
print payload_online
response_online = requests.post(endpoint_online, json=payload_online, headers=header_online)
print response_online
print response_online.text
scoring_href = json.loads(response_online.text).get('entity').get('scoringHref')
print scoring_href
Let's see what happens when we send a PUT request to our new endpoint containing a new scoring record. The model should hopefully return some predictions.
In [ ]:
payload_scoring = {
"record":[
"42", #id
"8620", #chain
"400", #offer
"2017-6-5", #offerdate
5, #qty_30
10, #qty_60
15, #qty_90
20, #qty_180
50, #amt_30
100, #amt_60
150, #amt_90
200, #amt_180
]}
response_scoring = requests.put(scoring_href, json=payload_scoring, headers=header_online)
print response_scoring.text
In [ ]: