Example with nested parallelism

This example shows how to benefit from existing parallelism (and optimizations like warm restart)


In [1]:
from mempamal.datasets import iris
import numpy as np
# iris dataset as usual but with linear regression (Why not! :p).
X, y = iris.get_data()
# adding i.i.d noise to X -> 4 "real" features + 10,000 noise features
# provide a sufficient workload to see the interest of nested parallelism
X = np.hstack((X, np.random.randn(X.shape[0], 10000)))

In [2]:
from sklearn.linear_model import ElasticNetCV
from sklearn.cross_validation import StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score

We use the ElasticNetCV wich provide warm restart and nested parallism (provided by joblib). So, we don't need a inner cross-validation loop since the estimator already provide it. in this example, the grid of parameters contains 500 tuples.


In [3]:
l1_ratios = np.linspace(0.1,1,10).tolist()
s = ElasticNetCV(l1_ratio=l1_ratios, n_alphas=50, n_jobs=-1)
est = Pipeline([("enet", s)])

In [4]:
from mempamal.configuration import JSONify_estimator, JSONify_cv, build_dataset
from mempamal.workflow import create_wf, save_wf

We jsonify the estimator and the cross-validation configuration. We build the dataset in the current directory. It's create a dataset.joblib file. Then we create the workflow in our internal format (create_wf). With verbose=True, it prints the commands on stdout. And finally, we output the workflow (save_wf) in the soma-workflow format and write it to workflow.json (need soma-workflow).


In [5]:
method_conf = JSONify_estimator(est, out="./est.json")
cv_conf = JSONify_cv(StratifiedKFold, cv_kwargs={"n_folds": 5},
                     score_func=r2_score, 
                     stratified=True,
                     out="./cv.json")
dataset = build_dataset(X, y, method_conf, cv_conf, outputdir=".")
wfi = create_wf(dataset['folds'], cv_conf, method_conf, ".",
               verbose=True)
wf = save_wf(wfi, "./workflow.json", mode="soma-workflow")


python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_0.pkl 0
python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_1.pkl 1
python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_2.pkl 2
python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_3.pkl 3
python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_4.pkl 4
python mempamal/scripts/outer_reducer.py ./final_res.pkl ./red_res_{outer}.pkl

Now, we create a WorkflowController and we set the number of processors to 1 in order to let the nested parallelism using the ressources. We submit the workflow and we wait for workflow completion, then we read the final results.


In [6]:
from soma_workflow.client import WorkflowController

import time
import json
import sklearn.externals.joblib as joblib

controller = WorkflowController()
# limit the scheduler to 1 task (for the nested parallelism)
old_nproc = controller.scheduler_config.get_proc_nb()
controller.scheduler_config.set_proc_nb(1)
wf_id = controller.submit_workflow(workflow=wf, name="fourth example")

while controller.workflow_status(wf_id) != 'workflow_done':
    time.sleep(2)
# reset the scheduler policy
controller.scheduler_config.set_proc_nb(old_nproc)

print(joblib.load('./final_res.pkl'))


light mode
{'std': 0.01165793880264987, 'raw': array([ 0.88502578,  0.88896717,  0.89306213,  0.88935599,  0.86066079]), 'median': 0.88896717072541787, 'mean': 0.88341437212120044}