In [1]:
from mempamal.datasets import iris
X, y = iris.get_data()
The pipeline will contains:
The goodness of fit is estimated with:
In [2]:
from sklearn.linear_model.logistic import LogisticRegression
from sklearn.preprocessing.data import StandardScaler
from sklearn.cross_validation import StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.metrics import f1_score
In [3]:
s1 = StandardScaler(with_mean=True, with_std=True)
s2 = LogisticRegression()
p = [("scaler", s1), ("logit", s2)]
est = Pipeline(p)
Here is an illustration on only one of the folds:
In [4]:
fold_iter = StratifiedKFold(y, n_folds=5)
train, test = fold_iter.__iter__().next()
X_train, X_test = X[train], X[test]
y_train, y_test = y[train], y[test]
y_pred = est.fit(X_train, y_train).predict(X_test)
f1_score(y_test, y_pred)
Out[4]:
In [5]:
from mempamal.configuration import JSONify_estimator, JSONify_cv, build_dataset
from mempamal.workflow import create_wf, save_wf
We just take the same estimator:
In [6]:
s1 = StandardScaler(with_mean=True, with_std=True)
s2 = LogisticRegression(C=1e4)
p = [("scaler", s1), ("logit", s2)]
est = Pipeline(p)
We jsonify the estimator and the cross-validation configuration:
In [7]:
method_conf = JSONify_estimator(est, out="./est.json")
cv_conf = JSONify_cv(StratifiedKFold, cv_kwargs={"n_folds": 5},
score_func=f1_score, stratified=True,
out="./cv.json")
We build the dataset in the current directory.
It's create a dataset.joblib
file.
Then we create the workflow in our internal format (create_wf
).
With verbose=True
, it prints the commands on stdout
.
And finally, we output the workflow (save_wf
) in the soma-workflow format
and write it to workflow.json
(need soma-workflow).
In [8]:
dataset = build_dataset(X, y, method_conf, cv_conf, ".")
wfi = create_wf(dataset['folds'], cv_conf, method_conf, ".",
verbose=True)
wf = save_wf(wfi, "./workflow.json", mode="soma-workflow")
We print all the dependencies and we can check that the Final Reduce depends on all map tasks.
In [9]:
for dep in wfi[1]: print(dep)
Now, we create a WorkflowController
and we submit the workflow.
We wait for workflow completion then we read the final results.
In [10]:
from soma_workflow.client import WorkflowController
import time
import json
import sklearn.externals.joblib as joblib
controller = WorkflowController()
wf_id = controller.submit_workflow(workflow=wf, name="first example")
while controller.workflow_status(wf_id) != 'workflow_done':
time.sleep(2)
print(joblib.load('./final_res.pkl'))