From this web tutorial - orginally in Scala
Data is store sales from Rossman - get the data set from Kaggle
Rossmann operates over 3,000 drug stores in 7 European countries. Currently, Rossmann store managers are tasked with predicting their daily sales for up to six weeks in advance. Store sales are influenced by many factors, including promotions, competition, school and state holidays, seasonality, and locality. With thousands of individual managers predicting sales based on their unique circumstances, the accuracy of results can be quite varied. In their first Kaggle competition, Rossmann is challenging you to predict 6 weeks of daily sales for 1,115 stores located across Germany. Reliable sales forecasts enable store managers to create effective staff schedules that increase productivity and motivation. By helping Rossmann create a robust prediction model, you will help store managers stay focused on what’s most important to them: their customers and their teams!
There are several assumptions (likely invalidated by our data) that are made by the model.
Let's review those assumptions in this case. We're violating the first one because we our SchoolHoliday and StateHoliday variables are likely to be correlated. We're working with Time Series data so we're likely violating the second one as well. In general, finding problems that strictly fit the linear relationship is difficult and this problem is no exception, so we're likely violating the third assumption as well. The last two we can hope for but again, we're likely violating. With all that being said, we should still at least experiment with this model because it's so well understood and it makes for a strong baseline predictor for future exploration. This aligns with the Concepts of Structural Risk Minimization (if you don't understand SRM, don't worry about it). Now let's define the loss function for linear regression. In linear regression, we hope to minimize the squared error, defined as: L_n = ||Xw^T - y||^2 Where X is our input Matrix, w is our weight vector and y is our output vector. There is a closed form solution to this problem that can be found by inverting the inner product but Spark, because of it's orientation towards big data doesn't actually look to solve the problem this way. Spark leverages gradient descent methods to efficiently descend down the gradient of risk function to arrive at (hopefully) a solution near the global minimum.
In [ ]:
import array
from pyspark.sql import HiveContext, DataFrame
from pyspark.ml.feature import VectorAssembler,StringIndexer,OneHotEncoder
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import RegressionMetrics
log4jLogger = sc._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
logger.info("pyspark script logger initialized")
Now sometimes preprocessing might occur on an ad hoc basis but if you want to build something for production, it's worth putting it all into a pipeline. Machine learning pipelines are excellent ways of making your data science repeatable and efficient. The first thing that we're going to need to do is make sure that our data is in a format in which we can input it into our models. That means it needs to be numerical. To do that we're going to use a couple of nifty Spark pre-processing functions, namely the StringIndexer, OneHotEncoder, and VectorAssembler. Respectively these allow us to:
Here's how we create the Indexers and Encoders for our data. You'll notice that we have to apply these to basically all columns because basically none of them are continuous.
In [ ]:
stateHolidayIndexer = StringIndexer() \
.setInputCol("StateHoliday") \
.setOutputCol("StateHolidayIndex")
schoolHolidayIndexer = StringIndexer() \
.setInputCol("SchoolHoliday") \
.setOutputCol("SchoolHolidayIndex")
stateHolidayEncoder = OneHotEncoder() \
.setInputCol("StateHolidayIndex") \
.setOutputCol("StateHolidayVec")
schoolHolidayEncoder = OneHotEncoder() \
.setInputCol("SchoolHolidayIndex") \
.setOutputCol("SchoolHolidayVec")
dayOfMonthEncoder = OneHotEncoder() \
.setInputCol("DayOfMonth") \
.setOutputCol("DayOfMonthVec")
dayOfWeekEncoder = OneHotEncoder() \
.setInputCol("DayOfWeek") \
.setOutputCol("DayOfWeekVec")
storeEncoder = OneHotEncoder() \
.setInputCol("Store") \
.setOutputCol("StoreVec")
Now we want to assemble all our vectors together into one vector as input to our model
In [ ]:
assembler = VectorAssembler() \
.setInputCols(["StoreVec", "DayOfWeekVec", "Open",
"DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"]) \
.setOutputCol("features")
In [ ]:
assembler.explainParams()
What we've got here with Pipelines is a way of performing a repeated set of steps in sequence. Much like the DAG that we saw in the the core concepts of the Pipeline:
Once you see the code, you'll notice that in our train test split we're also setting an Evaluator
that will judge how well our model is doing and automatically select the best parameter for us to use based on that metric. This means that we get to train lots and lots of different models to see which one is best. Super simple! Let's walk through the creation for each model.
The pipeline we've set up is pretty self explanatory
In [ ]:
def preppedLRPipeline():
lr = LinearRegression()
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.1, 0.01]) \
.addGrid(lr.fitIntercept, [True, False]) \
.addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0]) \
.build()
pipeline = Pipeline() \
.setStages([stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, lr])
tvs = TrainValidationSplit() \
.setEstimator(pipeline) \
.setEvaluator(RegressionEvaluator()) \
.setEstimatorParamMaps(paramGrid) \
.setTrainRatio(0.75)
return tvs
We create our model, use indexers and assmblers, and run our data through it. By default the pipline uses the label
and features
columns as the output column and input features (though this is regression). We can choose to manually set these using the appropriate setter methods on our LinearRegression instance.
In [ ]:
def loadTrainingData(sqlContext):
trainRaw = sqlContext \
.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load("data/rossman-store-sales/train.csv") \
.repartition(6)
trainRaw.registerTempTable("raw_training_data")
return sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\\d+-\\d+-(\\d+)', 1))) DayOfMonth
FROM raw_training_data
""")
# .na.drop()
def loadKaggleTestData(sqlContext):
testRaw = sqlContext \
.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.load("data/rossman-store-sales/test.csv") \
.repartition(6)
testRaw.registerTempTable("raw_test_data")
testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\\d+-\\d+-(\\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""")
# .na.drop() # weird things happen if you don't filter out the null values manually
return [testRaw, testData] # got to hold onto testRaw so we can make sure
# to have all the prediction IDs to submit to kaggle
Bringing in the Data will build what we learned we about how to use Spark DataFrames.
One thing you'll notice is that we're performing several splits of the data. Our training set gets split into the training and validation sets automatically by our TrainValidationSplit class and we've already set aside our own test set that we'll use as an internal test before submitting to kaggle.
One place where Spark's youth shows is its handling of null values. We had to do a bit of data juggling previously to remove the null values (because the pipeline will fail if they're included), below we've got some code that makes it easy to save the predictions and join them with the true values.
In [ ]:
def savePredictions(predictions, testRaw):
tdOut = testRaw \
.select("Id") \
.distinct() \
.join(predictions, testRaw("Id") == predictions("PredId"), "outer") \
.select("Id", "Sales") \
.na.fill(0.0) # some of our inputs were null so we have to
# fill these with something
return tdOut \
.coalesce(1) \
.write.format("com.databricks.spark.csv") \
.option("header", "true") \
.save("linear_regression_predictions.csv")
Now we've brought in our data, created our pipeline, we are now ready to train up our models and see how they perform. This will take some time to run because we are exploring a hyperparameter space for each model. It takes time to try out all the permutations in our parameter grid as well as create a training set for each tree so be patient!
In [ ]:
def fitModel(tvs, data):
[training, test] = data.randomSplit([0.8, 0.2], seed = 12345)
logger.info("Fitting data")
model = tvs.fit(training)
logger.info("Now performing test on hold out set")
holdout = model.transform(test).select("prediction","label")
# have to do a type conversion for RegressionMetrics
rm = RegressionMetrics(holdout.rdd.map(lambda x : \
(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)
return model
Here's our linear regression:
In [ ]:
data = loadTrainingData(sqlContext)
[testRaw, testData] = loadKaggleTestData(sqlContext)
# The linear Regression Pipeline
linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
lrOut = lrModel.transform(testData) \
.withColumnRenamed("prediction","Sales") \
.withColumnRenamed("Id","PredId") \
.select("PredId", "Sales")
savePredictions(lrOut, testRaw)
In [ ]:
lrModel = fitModel(linearTvs, data)
In [ ]:
fitModel(linearTvs, data)
In [ ]:
LinearRegression().fitIntercept
In [ ]: