In [1]:
from mempamal.datasets import iris
import numpy as np
# iris dataset as usual but with linear regression (Why not! :p).
X, y = iris.get_data()
# adding i.i.d noise to X -> 4 "real" features + 10,000 noise features
# provide a sufficient workload to see the interest of nested parallelism
X = np.hstack((X, np.random.randn(X.shape[0], 10000)))
In [2]:
from sklearn.linear_model import ElasticNetCV
from sklearn.cross_validation import StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score
We use the ElasticNetCV
wich provide warm restart and nested parallism (provided by joblib
). So, we don't need a inner cross-validation loop since the estimator already provide it. in this example, the grid of parameters contains 500 tuples.
In [3]:
l1_ratios = np.linspace(0.1,1,10).tolist()
s = ElasticNetCV(l1_ratio=l1_ratios, n_alphas=50, n_jobs=-1)
est = Pipeline([("enet", s)])
In [4]:
from mempamal.configuration import JSONify_estimator, JSONify_cv, build_dataset
from mempamal.workflow import create_wf, save_wf
We jsonify the estimator and the cross-validation configuration.
We build the dataset in the current directory.
It's create a dataset.joblib
file.
Then we create the workflow in our internal format (create_wf
).
With verbose=True
, it prints the commands on stdout
.
And finally, we output the workflow (save_wf
) in the soma-workflow format
and write it to workflow.json
(need soma-workflow).
In [5]:
method_conf = JSONify_estimator(est, out="./est.json")
cv_conf = JSONify_cv(StratifiedKFold, cv_kwargs={"n_folds": 5},
score_func=r2_score,
stratified=True,
out="./cv.json")
dataset = build_dataset(X, y, method_conf, cv_conf, outputdir=".")
wfi = create_wf(dataset['folds'], cv_conf, method_conf, ".",
verbose=True)
wf = save_wf(wfi, "./workflow.json", mode="soma-workflow")
Now, we create a WorkflowController
and we set the number of processors to 1 in order to let the nested parallelism using the ressources.
We submit the workflow and we wait for workflow completion, then we read the final results.
In [6]:
from soma_workflow.client import WorkflowController
import time
import json
import sklearn.externals.joblib as joblib
controller = WorkflowController()
# limit the scheduler to 1 task (for the nested parallelism)
old_nproc = controller.scheduler_config.get_proc_nb()
controller.scheduler_config.set_proc_nb(1)
wf_id = controller.submit_workflow(workflow=wf, name="fourth example")
while controller.workflow_status(wf_id) != 'workflow_done':
time.sleep(2)
# reset the scheduler policy
controller.scheduler_config.set_proc_nb(old_nproc)
print(joblib.load('./final_res.pkl'))