Predict Store Sales with ML Pipelines

From this web tutorial - orginally in Scala

scala code on github

The data

Data is store sales from Rossman - get the data set from Kaggle

Rossmann operates over 3,000 drug stores in 7 European countries. Currently, Rossmann store managers are tasked with predicting their daily sales for up to six weeks in advance. Store sales are influenced by many factors, including promotions, competition, school and state holidays, seasonality, and locality. With thousands of individual managers predicting sales based on their unique circumstances, the accuracy of results can be quite varied. In their first Kaggle competition, Rossmann is challenging you to predict 6 weeks of daily sales for 1,115 stores located across Germany. Reliable sales forecasts enable store managers to create effective staff schedules that increase productivity and motivation. By helping Rossmann create a robust prediction model, you will help store managers stay focused on what’s most important to them: their customers and their teams!

Review of Linear Regression

There are several assumptions (likely invalidated by our data) that are made by the model.

  • That our input data is drawn from a multivariate normal distribution, ie that our variables are independent and normally distributed
  • Observations are independent of one another
  • A Linear, additive relationship between our input variables and our output variables
  • Homoscedasticity of the error terms, or that the error terms should be distributed normally around the regression line
  • Our variables are measured without systematic error (and like the point above, that the error values are drawn from some normal process, not caused by a confounding variable)

Let's review those assumptions in this case. We're violating the first one because we our SchoolHoliday and StateHoliday variables are likely to be correlated. We're working with Time Series data so we're likely violating the second one as well. In general, finding problems that strictly fit the linear relationship is difficult and this problem is no exception, so we're likely violating the third assumption as well. The last two we can hope for but again, we're likely violating. With all that being said, we should still at least experiment with this model because it's so well understood and it makes for a strong baseline predictor for future exploration. This aligns with the Concepts of Structural Risk Minimization (if you don't understand SRM, don't worry about it). Now let's define the loss function for linear regression. In linear regression, we hope to minimize the squared error, defined as: L_n = ||Xw^T - y||^2 Where X is our input Matrix, w is our weight vector and y is our output vector. There is a closed form solution to this problem that can be found by inverting the inner product but Spark, because of it's orientation towards big data doesn't actually look to solve the problem this way. Spark leverages gradient descent methods to efficiently descend down the gradient of risk function to arrive at (hopefully) a solution near the global minimum.

Coding up our pipeline


In [ ]:
import array

from pyspark.sql import HiveContext, DataFrame
from pyspark.ml.feature import VectorAssembler,StringIndexer,OneHotEncoder
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import RegressionMetrics

log4jLogger = sc._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
logger.info("pyspark script logger initialized")

Preprocessing and preparing piplines

Now sometimes preprocessing might occur on an ad hoc basis but if you want to build something for production, it's worth putting it all into a pipeline. Machine learning pipelines are excellent ways of making your data science repeatable and efficient. The first thing that we're going to need to do is make sure that our data is in a format in which we can input it into our models. That means it needs to be numerical. To do that we're going to use a couple of nifty Spark pre-processing functions, namely the StringIndexer, OneHotEncoder, and VectorAssembler. Respectively these allow us to:

  • Convert string based categorical features to numerical categorical features
  • Convert numerical based categorical features(in one column) to numerical continuous features (one column per category) /increasing sparsity/
  • Assemble a vector from a variety of columns and vectors that may have been previously created.

Here's how we create the Indexers and Encoders for our data. You'll notice that we have to apply these to basically all columns because basically none of them are continuous.


In [ ]:
stateHolidayIndexer = StringIndexer() \
  .setInputCol("StateHoliday") \
  .setOutputCol("StateHolidayIndex")
schoolHolidayIndexer = StringIndexer() \
  .setInputCol("SchoolHoliday") \
  .setOutputCol("SchoolHolidayIndex")
stateHolidayEncoder = OneHotEncoder() \
  .setInputCol("StateHolidayIndex") \
  .setOutputCol("StateHolidayVec")
schoolHolidayEncoder = OneHotEncoder() \
  .setInputCol("SchoolHolidayIndex") \
  .setOutputCol("SchoolHolidayVec")
dayOfMonthEncoder = OneHotEncoder() \
  .setInputCol("DayOfMonth") \
.setOutputCol("DayOfMonthVec")
dayOfWeekEncoder = OneHotEncoder() \
  .setInputCol("DayOfWeek") \
.setOutputCol("DayOfWeekVec")
storeEncoder = OneHotEncoder() \
  .setInputCol("Store") \
  .setOutputCol("StoreVec")

Now we want to assemble all our vectors together into one vector as input to our model


In [ ]:
assembler = VectorAssembler() \
    .setInputCols(["StoreVec", "DayOfWeekVec", "Open",
    "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"]) \
    .setOutputCol("features")

In [ ]:
assembler.explainParams()

Creating the pipelines

Pipelines overview

What we've got here with Pipelines is a way of performing a repeated set of steps in sequence. Much like the DAG that we saw in the the core concepts of the Pipeline:

  • Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms DataFrame with features into a DataFrame with predictions.
  • Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.
  • Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

Once you see the code, you'll notice that in our train test split we're also setting an Evaluator that will judge how well our model is doing and automatically select the best parameter for us to use based on that metric. This means that we get to train lots and lots of different models to see which one is best. Super simple! Let's walk through the creation for each model.

Linear regression

The pipeline we've set up is pretty self explanatory


In [ ]:
def preppedLRPipeline():
    lr = LinearRegression()

    paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [True, False]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0]) \
    .build()

    pipeline = Pipeline() \
    .setStages([stateHolidayIndexer, schoolHolidayIndexer,
      stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
      dayOfWeekEncoder, dayOfMonthEncoder,
      assembler, lr])

    tvs = TrainValidationSplit() \
    .setEstimator(pipeline) \
    .setEvaluator(RegressionEvaluator()) \
    .setEstimatorParamMaps(paramGrid) \
    .setTrainRatio(0.75)
    return tvs

We create our model, use indexers and assmblers, and run our data through it. By default the pipline uses the label and features columns as the output column and input features (though this is regression). We can choose to manually set these using the appropriate setter methods on our LinearRegression instance.

Bringing in our data

We need to do a bit of data scrubbing to assure we're handling null values, as Spark is touchy about nulls in our DataFrames.


In [ ]:
def loadTrainingData(sqlContext):
    trainRaw = sqlContext \
    .read.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .load("data/rossman-store-sales/train.csv") \
    .repartition(6)
    trainRaw.registerTempTable("raw_training_data")

    return sqlContext.sql("""SELECT
    double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
    StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\\d+-\\d+-(\\d+)', 1))) DayOfMonth
    FROM raw_training_data
  """)
#     .na.drop()


def loadKaggleTestData(sqlContext):
    testRaw = sqlContext \
    .read.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .load("data/rossman-store-sales/test.csv") \
    .repartition(6)
  
    testRaw.registerTempTable("raw_test_data")

    testData = sqlContext.sql("""SELECT
    Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
    SchoolHoliday, (double(regexp_extract(Date, '\\d+-\\d+-(\\d+)', 1))) DayOfMonth
    FROM raw_test_data
    WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
      OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
  """)
    #   .na.drop() # weird things happen if you don't filter out the null values manually

    return [testRaw, testData] # got to hold onto testRaw so we can make sure
                               # to have all the prediction IDs to submit to kaggle

Bringing in the Data will build what we learned we about how to use Spark DataFrames.

One thing you'll notice is that we're performing several splits of the data. Our training set gets split into the training and validation sets automatically by our TrainValidationSplit class and we've already set aside our own test set that we'll use as an internal test before submitting to kaggle.

Saving the Predictions

One place where Spark's youth shows is its handling of null values. We had to do a bit of data juggling previously to remove the null values (because the pipeline will fail if they're included), below we've got some code that makes it easy to save the predictions and join them with the true values.


In [ ]:
def savePredictions(predictions, testRaw):
    tdOut = testRaw \
    .select("Id") \
    .distinct() \
    .join(predictions, testRaw("Id") == predictions("PredId"), "outer") \
    .select("Id", "Sales") \
    .na.fill(0.0) # some of our inputs were null so we have to
                       # fill these with something
    return tdOut \
    .coalesce(1) \
    .write.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .save("linear_regression_predictions.csv")

Fitting, Testing, and Using The Model

Now we've brought in our data, created our pipeline, we are now ready to train up our models and see how they perform. This will take some time to run because we are exploring a hyperparameter space for each model. It takes time to try out all the permutations in our parameter grid as well as create a training set for each tree so be patient!


In [ ]:
def fitModel(tvs, data):
    [training, test] = data.randomSplit([0.8, 0.2], seed = 12345)
    logger.info("Fitting data")
    model = tvs.fit(training)
    logger.info("Now performing test on hold out set")
    holdout = model.transform(test).select("prediction","label")

    # have to do a type conversion for RegressionMetrics
    rm = RegressionMetrics(holdout.rdd.map(lambda x : \
    (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

    logger.info("Test Metrics")
    logger.info("Test Explained Variance:")
    logger.info(rm.explainedVariance)
    logger.info("Test R^2 Coef:")
    logger.info(rm.r2)
    logger.info("Test MSE:")
    logger.info(rm.meanSquaredError)
    logger.info("Test RMSE:")
    logger.info(rm.rootMeanSquaredError)

    return model

Here's our linear regression:


In [ ]:
data = loadTrainingData(sqlContext)
[testRaw, testData] = loadKaggleTestData(sqlContext)

# The linear Regression Pipeline
linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
lrOut = lrModel.transform(testData) \
    .withColumnRenamed("prediction","Sales") \
    .withColumnRenamed("Id","PredId") \
    .select("PredId", "Sales")
savePredictions(lrOut, testRaw)

In [ ]:
lrModel = fitModel(linearTvs, data)

In [ ]:
fitModel(linearTvs, data)

In [ ]:
LinearRegression().fitIntercept

In [ ]: