Introduction to MEmPaMaL

Example with Scikit-learn

In this example, we take the classical iris dataset.


In [1]:
from mempamal.datasets import iris
X, y = iris.get_data()

The pipeline will contains:

  • scaling of the data: centering and scaling wrt. the standard deviation
  • logistic regression with default parameters

The goodness of fit is estimated with:

  • a 5-folds (stratified) cross-validation
  • the score function is a F1 score

In [2]:
from sklearn.linear_model.logistic import LogisticRegression
from sklearn.preprocessing.data import StandardScaler
from sklearn.cross_validation import StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.metrics import f1_score

In [3]:
s1 = StandardScaler(with_mean=True, with_std=True)
s2 = LogisticRegression()
p = [("scaler", s1), ("logit", s2)]
est = Pipeline(p)

Here is an illustration on only one of the folds:


In [4]:
fold_iter = StratifiedKFold(y, n_folds=5)
train, test = fold_iter.__iter__().next()
X_train, X_test = X[train], X[test]
y_train, y_test = y[train], y[test]
y_pred = est.fit(X_train, y_train).predict(X_test)
f1_score(y_test, y_pred)


Out[4]:
0.82949701619778338

Example with Scikit-learn + MEmPaMaL + Soma-Workflow


In [5]:
from mempamal.configuration import JSONify_estimator, JSONify_cv, build_dataset
from mempamal.workflow import create_wf, save_wf

We just take the same estimator:


In [6]:
s1 = StandardScaler(with_mean=True, with_std=True)
s2 = LogisticRegression(C=1e4)
p = [("scaler", s1), ("logit", s2)]
est = Pipeline(p)

We jsonify the estimator and the cross-validation configuration:


In [7]:
method_conf = JSONify_estimator(est, out="./est.json")
cv_conf = JSONify_cv(StratifiedKFold, cv_kwargs={"n_folds": 5},
                     score_func=f1_score, stratified=True,
                     out="./cv.json")

We build the dataset in the current directory. It's create a dataset.joblib file. Then we create the workflow in our internal format (create_wf). With verbose=True, it prints the commands on stdout. And finally, we output the workflow (save_wf) in the soma-workflow format and write it to workflow.json (need soma-workflow).


In [8]:
dataset = build_dataset(X, y, method_conf, cv_conf, ".")
wfi = create_wf(dataset['folds'], cv_conf, method_conf, ".",
               verbose=True)
wf = save_wf(wfi, "./workflow.json", mode="soma-workflow")


python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_0.pkl 0
python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_1.pkl 1
python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_2.pkl 2
python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_3.pkl 3
python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_4.pkl 4
python mempamal/scripts/outer_reducer.py ./final_res.pkl ./red_res_{outer}.pkl

We print all the dependencies and we can check that the Final Reduce depends on all map tasks.


In [9]:
for dep in wfi[1]: print(dep)


('|--- Map outer=0', '|- Final reduce')
('|--- Map outer=1', '|- Final reduce')
('|--- Map outer=2', '|- Final reduce')
('|--- Map outer=3', '|- Final reduce')
('|--- Map outer=4', '|- Final reduce')

Now, we create a WorkflowController and we submit the workflow. We wait for workflow completion then we read the final results.


In [10]:
from soma_workflow.client import WorkflowController

import time
import json
import sklearn.externals.joblib as joblib

controller = WorkflowController()
wf_id = controller.submit_workflow(workflow=wf, name="first example")

while controller.workflow_status(wf_id) != 'workflow_done':
    time.sleep(2)
print(joblib.load('./final_res.pkl'))


light mode
{'std': 0.025080367485459092, 'raw': array([ 0.93333333,  1.        ,  0.96658312,  0.96658312,  0.93265993]), 'median': 0.96658312447786132, 'mean': 0.9598319029897977}