In [ ]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
Spark ML
provides a uniform set of high-level APIs that help users create and tune practical machine learning pipelines.
In this notebook, we show how to embed CPLEX as a Spark transformer class.
DOcplex provides transformer classes that take a matrix X
of constraints and a vector y
of costs and solves a linear problem using CPLEX.
Transformer classes share a solve(X, Y, **params)
method which expects:
The transformer classes requires a Spark DataFrame for the 'X' matrix, and support various formats for the 'Y' vector:
The same formats are also supported to optionally specify upper bounds for decision variables.
There are two DOcplex transformer classes:
$CplexTransformer$ expects to solve a linear problem in the classical form:
$$ minimize\ C^{t} x\\ s.t.\\ Ax <= B$$
Where $A$ is a (M,N) matrix describing the constraints and $B$ is a scalar vector of size M, containing the right hand sides of the constraints, and $C$ is the cost vector of size N. In this case the transformer expects a (M,N+1) matrix, where the last column contains the right hand sides.
$CplexRangeTransformer$ expects to solve linear problem as a set of range constraints:
$$ minimize\ C^{t} x\\ s.t.\\ m <= Ax <= M$$
Where $A$ is a (M,N) matrix describing the constraints, $m$ and $M$ are two scalar vectors of size M, containing the minimum and maximum values for the row expressions, and $C$ is the cost vector of size N. In this case the transformer expects a (M,N+2) matrix, where the last two columns contains the minimum and maximum values (in this order).
In [ ]:
try:
import numpy as np
except ImportError:
raise RuntimError('This notebook requires numpy')
In the next section we illustrate the range transformer with the Diet Problem, from DOcplex distributed examples.
The diet problem is delivered in the DOcplex examples.
Given a breakdown matrix of various foods in elementary nutrients, plus limitations on quantities for foods an nutrients, and food costs, the goal is to find the optimal quantity for each food for a balanced diet.
The FOOD_NUTRIENTS data intentionally contains a missing value ($np.nan$) to illustrate the use of a pipeline involving a data cleansing stage.
In [ ]:
# the baseline diet data as Python lists of tuples.
FOODS = [
("Roasted Chicken", 0.84, 0, 10),
("Spaghetti W/ Sauce", 0.78, 0, 10),
("Tomato,Red,Ripe,Raw", 0.27, 0, 10),
("Apple,Raw,W/Skin", .24, 0, 10),
("Grapes", 0.32, 0, 10),
("Chocolate Chip Cookies", 0.03, 0, 10),
("Lowfat Milk", 0.23, 0, 10),
("Raisin Brn", 0.34, 0, 10),
("Hotdog", 0.31, 0, 10)
]
NUTRIENTS = [
("Calories", 2000, 2500),
("Calcium", 800, 1600),
("Iron", 10, 30),
("Vit_A", 5000, 50000),
("Dietary_Fiber", 25, 100),
("Carbohydrates", 0, 300),
("Protein", 50, 100)
]
FOOD_NUTRIENTS = [
# ("Roasted Chicken", 277.4, 21.9, 1.8, 77.4, 0.0, 0.0, 42.2),
("Roasted Chicken", 277.4, 21.9, 1.8, np.nan, 0.0, 0.0, 42.2), # Set a value as missing (NaN)
("Spaghetti W/ Sauce", 358.2, 80.2, 2.3, 3055.2, 11.6, 58.3, 8.2),
("Tomato,Red,Ripe,Raw", 25.8, 6.2, 0.6, 766.3, 1.4, 5.7, 1.0),
("Apple,Raw,W/Skin", 81.4, 9.7, 0.2, 73.1, 3.7, 21.0, 0.3),
("Grapes", 15.1, 3.4, 0.1, 24.0, 0.2, 4.1, 0.2),
("Chocolate Chip Cookies", 78.1, 6.2, 0.4, 101.8, 0.0, 9.3, 0.9),
("Lowfat Milk", 121.2, 296.7, 0.1, 500.2, 0.0, 11.7, 8.1),
("Raisin Brn", 115.1, 12.9, 16.8, 1250.2, 4.0, 27.9, 4.0),
("Hotdog", 242.1, 23.5, 2.3, 0.0, 0.0, 18.0, 10.4)
]
In [ ]:
nb_foods = len(FOODS)
nb_nutrients = len(NUTRIENTS)
print('#foods={0}'.format(nb_foods))
print('#nutrients={0}'.format(nb_nutrients))
assert nb_foods == len(FOOD_NUTRIENTS)
In [ ]:
try:
import findspark
findspark.init()
except ImportError:
# Ignore exception: the 'findspark' module is required when executing Spark in a Windows environment
pass
import pyspark # Only run after findspark.init() (if running in a Windows environment)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
if spark.version < '2.2':
raise "This notebook requires at least version '2.2' for PySpark"
In [ ]:
mat_fn = np.matrix([FOOD_NUTRIENTS[f][1:] for f in range(nb_foods)])
print('The food-nutrient matrix has shape: {0}'.format(mat_fn.shape))
Then we extract the two vectors of min/max for each nutrient. Each vector has nb_nutrients elements.
We also break the FOODS
collection of tuples into columns
In [ ]:
nutrient_mins = [NUTRIENTS[n][1] for n in range(nb_nutrients)]
nutrient_maxs = [NUTRIENTS[n][2] for n in range(nb_nutrients)]
food_names ,food_costs, food_mins, food_maxs = map(list, zip(*FOODS))
We are now ready to prepare the transformer matrix. This matrix has shape (7, 11) as we
have 7 nutrients and 9 foods, plus the additional min
and max
columns
In [ ]:
# step 1. add two lines for nutrient mins, maxs
nf2 = np.append(mat_fn, np.matrix([nutrient_mins, nutrient_maxs]), axis=0)
mat_nf = nf2.transpose()
In [ ]:
mat_nf.shape
In [ ]:
from pyspark.sql import SQLContext
sc = spark.sparkContext
sqlContext = SQLContext(sc)
columns = food_names + ['min', 'max']
food_nutrients_df = sqlContext.createDataFrame(mat_nf.tolist(), columns)
Let's display the dataframe schema and content
In [ ]:
food_nutrients_df.printSchema()
In [ ]:
food_nutrients_df.show()
To use the transformer, create an instance and pass the following parameters to the transform
method
X
matrix of size(M, N+2) containing coefficients for N column variables plus two addition column for range mins and maxs.Y
cost vector (using "y" parameter id)min
) or maximization (max
) problem (using "sense" parameter id)In addition, some data elements that can't be encoded in the matrix itself should be passed as keyword arguments:
ubs
denotes the upper bound for the column variables that are created. The expected size of this scalar vector is N (when matrix has size (M,N+2))minCol
and maxCol
are the names of the columns corresponding to the constraints min and max range in the X
matrixSince the input data contains some missing values, we'll actually define a pipeline that will:
In [ ]:
from docplex.mp.sparktrans.transformers import CplexRangeTransformer
from pyspark.ml.feature import Imputer
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
# Create a data cleansing stage to replace missing values with column mean value
data_cleansing = Imputer(inputCols=food_names, outputCols=food_names)
# Create an optimization stage to calculate the optimal quantity for each food for a balanced diet.
cplexSolve = CplexRangeTransformer(minCol='min', maxCol='max', ubs=food_maxs)
# Configure an ML pipeline, which chains these two stages
pipeline = Pipeline(stages=[data_cleansing, cplexSolve])
# Fit the pipeline: during this step, the data cleansing estimator is configured
model = pipeline.fit(food_nutrients_df)
# Make evaluation on input data. One can still specify stage-specific parameters when invoking 'transform' on the PipelineModel
diet_df = model.transform(food_nutrients_df, params={cplexSolve.y: food_costs, cplexSolve.sense: 'min'})
diet_df.orderBy(desc("value")).show()
Just for checking purpose, let's have a look at the Spark dataframe at the output of the cleansing stage.
This is the dataframe that is fed to the $CplexRangeTransformer$ in the pipeline.
One can check that the first entry in the fourth row has been set to the average of the other values in the same column ($57.2167$).
In [ ]:
data_cleansing.fit(food_nutrients_df).transform(food_nutrients_df).show()
To illustrate the usage of the $CplexTransformer$, let's remove the constraint on the minimum amount for nutrients, and reformulate the problem as a cost maximization.
First, let's define a new dataframe for the constraints matrix by removing the min
column from the food_nutrients_df
dataframe so that it is a well-formed input matrix for the $CplexTransformer$:
In [ ]:
food_nutrients_LP_df = food_nutrients_df.select([item for item in food_nutrients_df.columns if item not in ['min']])
food_nutrients_LP_df.show()
In [ ]:
from docplex.mp.sparktrans.transformers import CplexTransformer
# Create a data cleansing stage to replace missing values with column mean value
data_cleansing = Imputer(inputCols=food_names, outputCols=food_names)
# Create an optimization stage to calculate the optimal quantity for each food for a balanced diet.
# Here, let's use the CplexTransformer by specifying only a maximum amount for each nutrient.
cplexSolve = CplexTransformer(rhsCol='max', ubs=food_maxs)
# Configure an ML pipeline, which chains these two stages
pipeline = Pipeline(stages=[data_cleansing, cplexSolve])
# Fit the pipeline: during this step, the data cleansing estimator is configured
model = pipeline.fit(food_nutrients_LP_df)
# Make evaluation on input data. One can still specify stage-specific parameters when invoking 'transform' on the PipelineModel
# Since there is no lower range for decision variables, let's maximize cost instead! (otherwise, the result is all 0's)
diet_max_cost_df = model.transform(food_nutrients_LP_df, params={cplexSolve.y: food_costs, cplexSolve.sense: 'max'})
diet_max_cost_df.orderBy(desc("value")).show()
In [ ]:
%matplotlib inline
import matplotlib.pyplot as plt
def plot_radar_chart(labels, stats, **kwargs):
angles=np.linspace(0, 2*np.pi, len(labels), endpoint=False)
# close the plot
stats = np.concatenate((stats, [stats[0]]))
angles = np.concatenate((angles, [angles[0]]))
fig = plt.figure()
ax = fig.add_subplot(111, polar=True)
ax.plot(angles, stats, 'o-', linewidth=2, **kwargs)
ax.fill(angles, stats, alpha=0.30, **kwargs)
ax.set_thetagrids(angles * 180/np.pi, labels)
#ax.set_title([df.loc[386,"Name"]])
ax.grid(True)
diet = diet_df.toPandas()
plot_radar_chart(labels=diet['name'], stats=diet['value'], color='r')
In [ ]:
diet_max_cost = diet_max_cost_df.toPandas()
plot_radar_chart(labels=diet_max_cost['name'], stats=diet_max_cost['value'], color='r')
In [ ]: