Quickstart

This is a very quick non-imaging introduction to Nipype workflows. For more comprehensive introduction, check the next section of the tutorial.

Import a few things from nipype


In [ ]:
import os
from nipype import Workflow, Node, Function

Creating Workflow with one Node that adds two numbers


In [ ]:
def sum(a, b):
    return a + b

wf = Workflow('hello')

adder = Node(Function(input_names=['a', 'b'],
                      output_names=['sum'],
                      function=sum), 
             name='a_plus_b')

adder.inputs.a = 1
adder.inputs.b = 3

wf.add_nodes([adder])

wf.base_dir = os.getcwd()

eg = wf.run()

eg.nodes()[0].result.outputs


170903-22:30:01,744 workflow INFO:
	 Workflow hello settings: ['check', 'execution', 'logging']
170903-22:30:01,746 workflow INFO:
	 Running serially.
170903-22:30:01,747 workflow INFO:
	 Executing node a_plus_b in dir: /repos/nipype_tutorial/notebooks/hello/a_plus_b
Out[ ]:
sum = 4

Creating a second node and connecting to the hello Workflow


In [ ]:
def concat(a, b):
    return [a, b]


concater = Node(Function(input_names=['a', 'b'],
                         output_names=['some_list'],
                         function=concat), 
                name='concat_a_b')

wf.connect(adder, 'sum', concater, 'a')
concater.inputs.b = 3

eg = wf.run()
print(eg.nodes())


170903-22:30:01,778 workflow INFO:
	 Workflow hello settings: ['check', 'execution', 'logging']
170903-22:30:01,783 workflow INFO:
	 Running serially.
170903-22:30:01,784 workflow INFO:
	 Executing node a_plus_b in dir: /repos/nipype_tutorial/notebooks/hello/a_plus_b
170903-22:30:01,790 workflow INFO:
	 Executing node concat_a_b in dir: /repos/nipype_tutorial/notebooks/hello/concat_a_b
[hello.a_plus_b, hello.concat_a_b]

And we can check results of our Workflow, we should see a list:


In [ ]:
eg.nodes()[-1].result.outputs


Out[ ]:
some_list = [4, 3]

We will try to add additional Node that adds one:


In [ ]:
def plus_one(a):
    print("PLUS ONE, a = {}".format(a))
    return a + 1

plusone = Node(Function(input_names=['a'],
                        output_names=['out'],
                        function=plus_one), 
               name='add_1')

wf.connect(concater, 'some_list', plusone, 'a')

eg = wf.run()
print(eg.nodes())


170903-22:30:01,819 workflow INFO:
	 Workflow hello settings: ['check', 'execution', 'logging']
170903-22:30:01,822 workflow INFO:
	 Running serially.
170903-22:30:01,824 workflow INFO:
	 Executing node a_plus_b in dir: /repos/nipype_tutorial/notebooks/hello/a_plus_b
170903-22:30:01,825 workflow INFO:
	 Collecting precomputed outputs
170903-22:30:01,827 workflow INFO:
	 Executing node concat_a_b in dir: /repos/nipype_tutorial/notebooks/hello/concat_a_b
170903-22:30:01,835 workflow INFO:
	 Executing node add_1 in dir: /repos/nipype_tutorial/notebooks/hello/add_1
PLUS ONE, a = [4, 3]
170903-22:30:01,840 workflow ERROR:
	 ['Node add_1 failed to run on host c9e1069a3506.']
170903-22:30:01,842 workflow INFO:
	 Saving crash info to /repos/nipype_tutorial/notebooks/crash-20170903-223001-neuro-add_1-5ddf08e7-c93d-4853-9527-619a6a570a39.pklz
170903-22:30:01,843 workflow INFO:
	 Traceback (most recent call last):
  File "/opt/conda/envs/neuro3/lib/python3.6/site-packages/nipype/pipeline/plugins/linear.py", line 43, in run
    node.run(updatehash=updatehash)
  File "/opt/conda/envs/neuro3/lib/python3.6/site-packages/nipype/pipeline/engine/nodes.py", line 372, in run
    self._run_interface()
  File "/opt/conda/envs/neuro3/lib/python3.6/site-packages/nipype/pipeline/engine/nodes.py", line 482, in _run_interface
    self._result = self._run_command(execute)
  File "/opt/conda/envs/neuro3/lib/python3.6/site-packages/nipype/pipeline/engine/nodes.py", line 613, in _run_command
    result = self._interface.run()
  File "/opt/conda/envs/neuro3/lib/python3.6/site-packages/nipype/interfaces/base.py", line 1084, in run
    runtime = self._run_wrapper(runtime)
  File "/opt/conda/envs/neuro3/lib/python3.6/site-packages/nipype/interfaces/base.py", line 1032, in _run_wrapper
    runtime = self._run_interface(runtime)
  File "/opt/conda/envs/neuro3/lib/python3.6/site-packages/nipype/interfaces/utility/wrappers.py", line 192, in _run_interface
    out = function_handle(**args)
  File "<string>", line 3, in plus_one
TypeError: can only concatenate list (not "int") to list
Interface Function failed to run. 

170903-22:30:01,845 workflow INFO:
	 ***********************************
170903-22:30:01,846 workflow ERROR:
	 could not run node: hello.add_1
170903-22:30:01,846 workflow INFO:
	 crashfile: /repos/nipype_tutorial/notebooks/crash-20170903-223001-neuro-add_1-5ddf08e7-c93d-4853-9527-619a6a570a39.pklz
170903-22:30:01,847 workflow INFO:
	 ***********************************
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-5-673369f26ab3> in <module>()
     10 wf.connect(concater, 'some_list', plusone, 'a')
     11 
---> 12 eg = wf.run()
     13 print(eg.nodes())

/opt/conda/envs/neuro3/lib/python3.6/site-packages/nipype/pipeline/engine/workflows.py in run(self, plugin, plugin_args, updatehash)
    588         if str2bool(self.config['execution']['create_report']):
    589             self._write_report_info(self.base_dir, self.name, execgraph)
--> 590         runner.run(execgraph, updatehash=updatehash, config=self.config)
    591         datestr = datetime.utcnow().strftime('%Y%m%dT%H%M%S')
    592         if str2bool(self.config['execution']['write_provenance']):

/opt/conda/envs/neuro3/lib/python3.6/site-packages/nipype/pipeline/plugins/linear.py in run(self, graph, config, updatehash)
     59                 if self._status_callback:
     60                     self._status_callback(node, 'exception')
---> 61         report_nodes_not_run(notrun)

/opt/conda/envs/neuro3/lib/python3.6/site-packages/nipype/pipeline/plugins/base.py in report_nodes_not_run(notrun)
     99                 logger.debug(subnode._id)
    100         logger.info("***********************************")
--> 101         raise RuntimeError(('Workflow did not execute cleanly. '
    102                             'Check log for details'))
    103 

RuntimeError: Workflow did not execute cleanly. Check log for details

This time the workflow didn't execute cleanly and we got an error. We can use nipypecli to read the crashfile (note, that if you have multiple crashfiles in the directory you'll have to provide a full name):


In [ ]:
!LC_ALL= nipypecli crash crash*


Traceback (most recent call last):
  File "/opt/conda/envs/neuro3/bin/nipypecli", line 11, in <module>
    load_entry_point('nipype==1.0.0.dev0', 'console_scripts', 'nipypecli')()
  File "/opt/conda/envs/neuro3/lib/python3.6/site-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/envs/neuro3/lib/python3.6/site-packages/click/core.py", line 676, in main
    _verify_python3_env()
  File "/opt/conda/envs/neuro3/lib/python3.6/site-packages/click/_unicodefun.py", line 118, in _verify_python3_env
    'for mitigation steps.' + extra)
RuntimeError: Click will abort further execution because Python 3 was configured to use ASCII as encoding for the environment.  Consult http://click.pocoo.org/python3/for mitigation steps.

This system supports the C.UTF-8 locale which is recommended.
You might be able to resolve your issue by exporting the
following environment variables:

    export LC_ALL=C.UTF-8
    export LANG=C.UTF-8

It clearly shows the problematic Node and the its input. We tried to add an integer to a list, this operation is not allowed in Python.

Let's try using MapNode


In [ ]:
from nipype import MapNode

plusone = MapNode(Function(input_names=['a'],
                           output_names=['out'],
                           function=plus_one), 
                  iterfield=['a'],
                  name='add_1')

wf = Workflow('hello_mapnode')

adder = Node(Function(input_names=['a', 'b'],
                      output_names=['sum'],
                      function=sum), 
             name='a_plus_b')

adder.inputs.a = 1
adder.inputs.b = 3
wf.connect(adder, 'sum', concater, 'a')
concater.inputs.b = 3

wf.connect(concater, 'some_list', plusone, 'a')

wf.base_dir = os.getcwd()

eg = wf.run()
print(eg.nodes())


170903-22:30:10,606 workflow INFO:
	 Workflow hello_mapnode settings: ['check', 'execution', 'logging']
170903-22:30:10,610 workflow INFO:
	 Running serially.
170903-22:30:10,611 workflow INFO:
	 Executing node a_plus_b in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/a_plus_b
170903-22:30:10,618 workflow INFO:
	 Executing node concat_a_b in dir: /repos/nipype_tutorial/notebooks/hello/concat_a_b
170903-22:30:10,620 workflow INFO:
	 Collecting precomputed outputs
170903-22:30:10,622 workflow INFO:
	 Executing node add_1 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/add_1
170903-22:30:10,625 workflow INFO:
	 Executing node _add_10 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/add_1/mapflow/_add_10
PLUS ONE, a = 4
170903-22:30:10,632 workflow INFO:
	 Executing node _add_11 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/add_1/mapflow/_add_11
PLUS ONE, a = 3
[hello_mapnode.a_plus_b, hello.concat_a_b, hello_mapnode.add_1]

Now the workflow finished without problems, let's see the results from hello.add_1:


In [ ]:
print(eg.nodes()[2].result.outputs)


Bunch(out=[5, 4])

And now we will run example with iterables:


In [ ]:
adder.iterables = ('a', [1, 2])
adder.inputs.b = 2

eg = wf.run()
print(eg.nodes())


170903-22:30:11,71 workflow INFO:
	 Workflow hello_mapnode settings: ['check', 'execution', 'logging']
170903-22:30:11,83 workflow INFO:
	 Running serially.
170903-22:30:11,84 workflow INFO:
	 Executing node a_plus_b.aI.a1 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/a_plus_b
170903-22:30:11,91 workflow INFO:
	 Executing node concat_a_b.a1 in dir: /repos/nipype_tutorial/notebooks/hello/_a_2/concat_a_b
170903-22:30:11,97 workflow INFO:
	 Executing node add_1.a1 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/add_1
170903-22:30:11,101 workflow INFO:
	 Executing node _add_10 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/add_1/mapflow/_add_10
PLUS ONE, a = 4
170903-22:30:11,108 workflow INFO:
	 Executing node _add_11 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/add_1/mapflow/_add_11
PLUS ONE, a = 3
170903-22:30:11,116 workflow INFO:
	 Executing node a_plus_b.aI.a0 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/a_plus_b
170903-22:30:11,122 workflow INFO:
	 Executing node concat_a_b.a0 in dir: /repos/nipype_tutorial/notebooks/hello/_a_1/concat_a_b
170903-22:30:11,129 workflow INFO:
	 Executing node add_1.a0 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/add_1
170903-22:30:11,132 workflow INFO:
	 Executing node _add_10 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/add_1/mapflow/_add_10
PLUS ONE, a = 3
170903-22:30:11,140 workflow INFO:
	 Executing node _add_11 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/add_1/mapflow/_add_11
PLUS ONE, a = 3
[hello_mapnode.a_plus_b.aI.a0, hello.concat_a_b.a0, hello_mapnode.add_1.a0, hello_mapnode.a_plus_b.aI.a1, hello.concat_a_b.a1, hello_mapnode.add_1.a1]

Now we have 6 nodes, we can check results for hello.add_1.a1


In [ ]:
eg.nodes()[5].result.outputs


Out[ ]:
Bunch(out=[5, 4])

In [ ]:
wf.write_graph(graph2use='exec')


170903-22:30:12,24 workflow INFO:
	 Generated workflow graph: /repos/nipype_tutorial/notebooks/hello_mapnode/graph.dot.png (graph2use=exec, simple_form=True).
Out[ ]:
'/repos/nipype_tutorial/notebooks/hello_mapnode/graph.dot.png'

In [ ]:
from IPython.display import Image

We can plot a general structure of the workflow:


In [ ]:
Image("hello_mapnode/graph.dot.png")


Out[ ]:

And more detailed structure with all nodes:


In [ ]:
Image("hello_mapnode/graph_detailed.dot.png")


Out[ ]:

We will introduce another iterables, for the concater Node:


In [ ]:
concater.iterables = ('b', [3, 4])
eg = wf.run()
eg.nodes()


170903-22:30:13,992 workflow INFO:
	 Workflow hello_mapnode settings: ['check', 'execution', 'logging']
170903-22:30:14,4 workflow INFO:
	 Running serially.
170903-22:30:14,5 workflow INFO:
	 Executing node a_plus_b.bI.b1 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/a_plus_b
170903-22:30:14,7 workflow INFO:
	 Collecting precomputed outputs
170903-22:30:14,9 workflow INFO:
	 Executing node concat_a_b.aI.a0.b1 in dir: /repos/nipype_tutorial/notebooks/hello/_a_2/_b_3/concat_a_b
170903-22:30:14,16 workflow INFO:
	 Executing node add_1.a0.b1 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_3/add_1
170903-22:30:14,22 workflow INFO:
	 Executing node _add_10 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_3/add_1/mapflow/_add_10
PLUS ONE, a = 4
170903-22:30:14,31 workflow INFO:
	 Executing node _add_11 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_3/add_1/mapflow/_add_11
PLUS ONE, a = 3
170903-22:30:14,41 workflow INFO:
	 Executing node concat_a_b.aI.a1.b1 in dir: /repos/nipype_tutorial/notebooks/hello/_a_2/_b_4/concat_a_b
170903-22:30:14,49 workflow INFO:
	 Executing node add_1.a1.b1 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_4/add_1
170903-22:30:14,53 workflow INFO:
	 Executing node _add_10 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_4/add_1/mapflow/_add_10
PLUS ONE, a = 4
170903-22:30:14,60 workflow INFO:
	 Executing node _add_11 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_4/add_1/mapflow/_add_11
PLUS ONE, a = 4
170903-22:30:14,68 workflow INFO:
	 Executing node a_plus_b.bI.b0 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/a_plus_b
170903-22:30:14,70 workflow INFO:
	 Collecting precomputed outputs
170903-22:30:14,71 workflow INFO:
	 Executing node concat_a_b.aI.a0.b0 in dir: /repos/nipype_tutorial/notebooks/hello/_a_1/_b_3/concat_a_b
170903-22:30:14,78 workflow INFO:
	 Executing node add_1.a0.b0 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_3/add_1
170903-22:30:14,81 workflow INFO:
	 Executing node _add_10 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_3/add_1/mapflow/_add_10
PLUS ONE, a = 3
170903-22:30:14,89 workflow INFO:
	 Executing node _add_11 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_3/add_1/mapflow/_add_11
PLUS ONE, a = 3
170903-22:30:14,99 workflow INFO:
	 Executing node concat_a_b.aI.a1.b0 in dir: /repos/nipype_tutorial/notebooks/hello/_a_1/_b_4/concat_a_b
170903-22:30:14,106 workflow INFO:
	 Executing node add_1.a1.b0 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_4/add_1
170903-22:30:14,110 workflow INFO:
	 Executing node _add_10 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_4/add_1/mapflow/_add_10
PLUS ONE, a = 3
170903-22:30:14,118 workflow INFO:
	 Executing node _add_11 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_4/add_1/mapflow/_add_11
PLUS ONE, a = 4
Out[ ]:
[hello_mapnode.a_plus_b.bI.b0,
 hello.concat_a_b.aI.a0.b0,
 hello_mapnode.add_1.a0.b0,
 hello.concat_a_b.aI.a1.b0,
 hello_mapnode.add_1.a1.b0,
 hello_mapnode.a_plus_b.bI.b1,
 hello.concat_a_b.aI.a0.b1,
 hello_mapnode.add_1.a0.b1,
 hello.concat_a_b.aI.a1.b1,
 hello_mapnode.add_1.a1.b1]

In [ ]:
wf.write_graph(graph2use='exec')


170903-22:30:14,414 workflow INFO:
	 Generated workflow graph: /repos/nipype_tutorial/notebooks/hello_mapnode/graph.dot.png (graph2use=exec, simple_form=True).
Out[ ]:
'/repos/nipype_tutorial/notebooks/hello_mapnode/graph.dot.png'

In [ ]:
Image("hello_mapnode/graph_detailed.dot.png")


Out[ ]:

Now we will introduce JoinNode that allows us to merge results together:


In [ ]:
def merge_and_scale_data(data2):
    import numpy as np
    return (np.array(data2) * 1000).tolist()


from nipype import JoinNode
joiner = JoinNode(Function(input_names=['data2'],
                          output_names=['data_scaled'],
                          function=merge_and_scale_data),
                 name='join_scale_data',
                 joinsource=adder,
                 joinfield=['data2'])

wf.connect(plusone, 'out', joiner, 'data2')

eg = wf.run()
eg.nodes()


170903-22:30:15,790 workflow INFO:
	 Workflow hello_mapnode settings: ['check', 'execution', 'logging']
170903-22:30:15,805 workflow INFO:
	 Running serially.
170903-22:30:15,807 workflow INFO:
	 Executing node a_plus_b.bI.b1 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/a_plus_b
170903-22:30:15,808 workflow INFO:
	 Collecting precomputed outputs
170903-22:30:15,810 workflow INFO:
	 Executing node concat_a_b.aI.a0.b1 in dir: /repos/nipype_tutorial/notebooks/hello/_a_2/_b_3/concat_a_b
170903-22:30:15,812 workflow INFO:
	 Collecting precomputed outputs
170903-22:30:15,814 workflow INFO:
	 Executing node add_1.a0.b1 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_3/add_1
170903-22:30:15,818 workflow INFO:
	 Executing node _add_10 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_3/add_1/mapflow/_add_10
PLUS ONE, a = 4
170903-22:30:15,825 workflow INFO:
	 Executing node _add_11 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_3/add_1/mapflow/_add_11
PLUS ONE, a = 3
170903-22:30:15,838 workflow INFO:
	 Executing node concat_a_b.aI.a1.b1 in dir: /repos/nipype_tutorial/notebooks/hello/_a_2/_b_4/concat_a_b
170903-22:30:15,839 workflow INFO:
	 Collecting precomputed outputs
170903-22:30:15,842 workflow INFO:
	 Executing node add_1.a1.b1 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_4/add_1
170903-22:30:15,846 workflow INFO:
	 Executing node _add_10 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_4/add_1/mapflow/_add_10
PLUS ONE, a = 4
170903-22:30:15,853 workflow INFO:
	 Executing node _add_11 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_2/_b_4/add_1/mapflow/_add_11
PLUS ONE, a = 4
170903-22:30:15,861 workflow INFO:
	 Executing node a_plus_b.bI.b0 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/a_plus_b
170903-22:30:15,863 workflow INFO:
	 Collecting precomputed outputs
170903-22:30:15,865 workflow INFO:
	 Executing node concat_a_b.aI.a0.b0 in dir: /repos/nipype_tutorial/notebooks/hello/_a_1/_b_3/concat_a_b
170903-22:30:15,867 workflow INFO:
	 Collecting precomputed outputs
170903-22:30:15,870 workflow INFO:
	 Executing node add_1.a0.b0 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_3/add_1
170903-22:30:15,874 workflow INFO:
	 Executing node _add_10 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_3/add_1/mapflow/_add_10
PLUS ONE, a = 3
170903-22:30:15,882 workflow INFO:
	 Executing node _add_11 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_3/add_1/mapflow/_add_11
PLUS ONE, a = 3
170903-22:30:15,892 workflow INFO:
	 Executing node concat_a_b.aI.a1.b0 in dir: /repos/nipype_tutorial/notebooks/hello/_a_1/_b_4/concat_a_b
170903-22:30:15,893 workflow INFO:
	 Collecting precomputed outputs
170903-22:30:15,895 workflow INFO:
	 Executing node add_1.a1.b0 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_4/add_1
170903-22:30:15,899 workflow INFO:
	 Executing node _add_10 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_4/add_1/mapflow/_add_10
PLUS ONE, a = 3
170903-22:30:15,907 workflow INFO:
	 Executing node _add_11 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_a_1/_b_4/add_1/mapflow/_add_11
PLUS ONE, a = 4
170903-22:30:15,916 workflow INFO:
	 Executing node join_scale_data.a1 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_b_4/join_scale_data
170903-22:30:15,923 workflow INFO:
	 Executing node join_scale_data.a0 in dir: /repos/nipype_tutorial/notebooks/hello_mapnode/_b_3/join_scale_data
Out[ ]:
[hello_mapnode.join_scale_data.a0,
 hello_mapnode.join_scale_data.a1,
 hello_mapnode.a_plus_b.bI.b0,
 hello.concat_a_b.aI.a0.b0,
 hello_mapnode.add_1.a0.b0,
 hello.concat_a_b.aI.a1.b0,
 hello_mapnode.add_1.a1.b0,
 hello_mapnode.a_plus_b.bI.b1,
 hello.concat_a_b.aI.a0.b1,
 hello_mapnode.add_1.a0.b1,
 hello.concat_a_b.aI.a1.b1,
 hello_mapnode.add_1.a1.b1]

Let's check the output of hello.join_scale_data.a0 node:


In [ ]:
eg.nodes()[0].result.outputs


Out[ ]:
data_scaled = [[4000, 4000], [5000, 4000]]

In [ ]:
wf.write_graph(graph2use='exec')


170903-22:30:16,978 workflow INFO:
	 Generated workflow graph: /repos/nipype_tutorial/notebooks/hello_mapnode/graph.dot.png (graph2use=exec, simple_form=True).
Out[ ]:
'/repos/nipype_tutorial/notebooks/hello_mapnode/graph.dot.png'

In [ ]:
Image("hello_mapnode/graph.dot.png")


Out[ ]:

In [ ]:
Image("hello_mapnode/graph_detailed.dot.png")


Out[ ]:

In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})


170903-22:30:17,298 workflow INFO:
	 Workflow hello_mapnode settings: ['check', 'execution', 'logging']
170903-22:30:17,315 workflow INFO:
	 Running in parallel.
170903-22:30:17,318 workflow INFO:
	 Executing: a_plus_b.bI.b1 ID: 0
170903-22:30:17,322 workflow INFO:
	 [Job finished] jobname: a_plus_b.bI.b1 jobid: 0
170903-22:30:17,324 workflow INFO:
	 Executing: a_plus_b.bI.b0 ID: 5
170903-22:30:17,325 workflow INFO:
	 [Job finished] jobname: a_plus_b.bI.b0 jobid: 5
...
170903-22:30:17,437 workflow INFO:
	 [Job finished] jobname: join_scale_data.a1 jobid: 10
CPU times: user 136 ms, sys: 36 ms, total: 172 ms
Wall time: 161 ms

In [ ]:
wf.base_dir = os.path.join(os.getcwd(), 'alt')

In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})


170903-22:30:18,129 workflow INFO:
	 Workflow hello_mapnode settings: ['check', 'execution', 'logging']
170903-22:30:18,143 workflow INFO:
	 Running in parallel.
170903-22:30:18,147 workflow INFO:
	 Executing: a_plus_b.bI.b1 ID: 0
170903-22:30:18,149 workflow INFO:
	 Executing: a_plus_b.bI.b0 ID: 5
170903-22:30:18,150 workflow INFO:
	 Executing node a_plus_b.bI.b1 in dir: /repos/nipype_tutorial/notebooks/alt/hello_mapnode/_a_2/a_plus_b
170903-22:30:18,152 workflow INFO:
	 Executing node a_plus_b.bI.b0 in dir: /repos/nipype_tutorial/notebooks/alt/hello_mapnode/_a_1/a_plus_b
170903-22:30:18,161 workflow INFO:
	 [Job finished] jobname: a_plus_b.bI.b1 jobid: 0
...
170903-22:30:18,360 workflow INFO:
	 Collecting precomputed outputs
170903-22:30:18,369 workflow INFO:
	 [Job finished] jobname: add_1.a1.b0 jobid: 9
170903-22:30:18,374 workflow INFO:
	 Executing: join_scale_data.a1 ID: 10
170903-22:30:18,377 workflow INFO:
	 Executing node join_scale_data.a1 in dir: /repos/nipype_tutorial/notebooks/alt/hello_mapnode/_b_4/join_scale_data
170903-22:30:18,384 workflow INFO:
	 [Job finished] jobname: join_scale_data.a1 jobid: 10
CPU times: user 212 ms, sys: 32 ms, total: 244 ms
Wall time: 269 ms

In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})


170903-22:30:19,58 workflow INFO:
	 Workflow hello_mapnode settings: ['check', 'execution', 'logging']
170903-22:30:19,73 workflow INFO:
	 Running in parallel.
170903-22:30:19,77 workflow INFO:
	 Executing: a_plus_b.bI.b1 ID: 0
170903-22:30:19,80 workflow INFO:
	 [Job finished] jobname: a_plus_b.bI.b1 jobid: 0
170903-22:30:19,82 workflow INFO:
	 Executing: a_plus_b.bI.b0 ID: 5
170903-22:30:19,84 workflow INFO:
	 [Job finished] jobname: a_plus_b.bI.b0 jobid: 5
170903-22:30:19,89 workflow INFO:
	 Executing: concat_a_b.aI.a0.b1 ID: 1
170903-22:30:19,92 workflow INFO:
	 [Job finished] jobname: concat_a_b.aI.a0.b1 jobid: 1
170903-22:30:19,94 workflow INFO:
	 Executing: concat_a_b.aI.a1.b1 ID: 3
170903-22:30:19,96 workflow INFO:
	 [Job finished] jobname: concat_a_b.aI.a1.b1 jobid: 3
170903-22:30:19,103 workflow INFO:
	 Executing: add_1.a0.b1 ID: 2
170903-22:30:19,107 workflow INFO:
	 Adding 2 jobs for mapnode add_1.a0.b1
170903-22:30:19,111 workflow INFO:
	 Executing: add_1.a1.b1 ID: 4
...
170903-22:30:19,219 workflow INFO:
	 [Job finished] jobname: join_scale_data.a1 jobid: 10
CPU times: user 144 ms, sys: 36 ms, total: 180 ms
Wall time: 177 ms

Exercise 1

Create a workflow to calculate a sum of factorials of numbers from a range between $n_{min}$ and $n_{max}$, i.e.:

$$\sum _{k=n_{min}}^{n_{max}} k! = 0! + 1! +2! + 3! + \cdots$$

if $n_{min}=0$ and $n_{max}=3$ $$\sum _{k=0}^{3} k! = 0! + 1! +2! + 3! = 1 + 1 + 2 + 6 = 10$$


In [ ]:
from nipype import Workflow, Node, MapNode, Function
import os

def range_fun(n_min, n_max):
    return list(range(n_min, n_max+1))

def factorial(n):
    # print("FACTORIAL, {}".format(n))
    import math
    return math.factorial(n)

def summing(terms):
    return sum(terms)

wf_ex1 = Workflow('ex1')
wf_ex1.base_dir = os.getcwd()

range_nd = Node(Function(input_names=['n_min', 'n_max'],
                         output_names=['range_list'],
                         function=range_fun), 
                name='range_list')

factorial_nd = MapNode(Function(input_names=['n'],
                                output_names=['fact_out'],
                                function=factorial), 
                       iterfield=['n'],
                       name='factorial')

summing_nd = Node(Function(input_names=['terms'],
                           output_names=['sum_out'],
                           function=summing), 
                  name='summing')


range_nd.inputs.n_min = 0
range_nd.inputs.n_max = 3

wf_ex1.add_nodes([range_nd])
wf_ex1.connect(range_nd, 'range_list', factorial_nd, 'n')
wf_ex1.connect(factorial_nd, 'fact_out', summing_nd, "terms")


eg = wf_ex1.run()

let's print all nodes:


In [ ]:
eg.nodes()

the final result should be 10:


In [ ]:
eg.nodes()[2].result.outputs

we can also check the results of two other nodes:


In [ ]:
print(eg.nodes()[0].result.outputs)
print(eg.nodes()[1].result.outputs)

In [ ]:
#write your code here

# 1. write 3 functions: one that return a list of number from specific range, 
#    second that returns n! (you can use math.factorial) and third that sums the elements from a list

# 2. create a workflow and define the working directory

# 3. define 3 nodes using Node and MapNode and connect them within the workflow

# 4. run the workflow and check the results

Exercise 2

Create a workflow to calculate the following sum for chosen $n$ and five different values of $x$: $0$, $\frac{1}{2} \pi$, $\pi$, $\frac{3}{2} \pi$, and $ 2 \pi$.

$\sum _{{k=0}}^{{n}}{\frac {(-1)^{k}}{(2k+1)!}}x^{{2k+1}}\quad =x-{\frac {x^{3}}{3!}}+{\frac {x^{5}}{5!}}-\cdots $


In [ ]:
# we can reuse function from previous exercise, but they need some edits
from nipype import Workflow, Node, MapNode, JoinNode, Function
import os
import math

def range_fun(n_max):
    return list(range(n_max+1))

def term(k, x):
    import math
    fract = math.factorial(2 * k + 1)
    polyn = x ** (2 * k + 1) 
    return (-1)**k * polyn / fract

def summing(terms):
    return sum(terms)

wf_ex2 = Workflow('ex2')
wf_ex2.base_dir = os.getcwd()

range_nd = Node(Function(input_names=['n_max'],
                         output_names=['range_list'],
                         function=range_fun), 
                name='range_list')

term_nd = MapNode(Function(input_names=['k', 'x'],
                           output_names=['term_out'],
                           function=term), 
                  iterfield=['k'],
                  name='term')

summing_nd = Node(Function(input_names=['terms'],
                           output_names=['sum_out'],
                           function=summing), 
                name='summing')


range_nd.inputs.n_max = 15

x_list = [0, 0.5 * math.pi, math.pi, 1.5 * math.pi, 2 * math.pi]

term_nd.iterables = ('x', x_list)

wf_ex2.add_nodes([range_nd])
wf_ex2.connect(range_nd, 'range_list', term_nd, 'k')
wf_ex2.connect(term_nd, 'term_out', summing_nd, "terms")


eg = wf_ex2.run()

let's check all nodes


In [ ]:
eg.nodes()

let's print all results of ex2.summing


In [ ]:
print(eg.nodes()[2].result.outputs)
print(eg.nodes()[4].result.outputs)
print(eg.nodes()[6].result.outputs)
print(eg.nodes()[8].result.outputs)
print(eg.nodes()[10].result.outputs)

Great, we just implemented pretty good Sine function! Those number should be approximately 0, 1, 0, -1 and 0. If they are not, try to increase $n_max$.


In [ ]:
# write your solution here

# 1. write 3 functions: one that return a list of number from a range between 0 and some n, 
#    second that returns a term for a specific k, and third that sums the elements from a list

# 2. create a workflow and define the working directory

# 3. define 3 nodes using Node and MapNode and connect them within the workflow

# 4. use iterables for 4 values of x

# 5. run the workflow and check the final results for every value of x

Exercise 2a

Use JoinNode to combine results from Exercise 2 in one container, e.g. a dictionary, that takes value $x$ as a key and the result from summing Node as a value.


In [ ]:
def merge_results(results, x):
    return dict(zip(x, results))

join_nd = JoinNode(Function(input_names=['results', 'x'],
                            output_names=['results_cont'],
                            function=merge_results),
                   name='merge',
                   joinsource=term_nd, # this is the node that used iterables for x
                   joinfield=['results'])

# taking the list of arguments from the previous part 
join_nd.inputs.x = x_list

# connecting a new node to the summing_nd
wf_ex2.connect(summing_nd, "sum_out", join_nd, "results")

eg = wf_ex2.run()

let's print all nodes


In [ ]:
eg.nodes()

and results from merge Node:


In [ ]:
eg.nodes()[1].result.outputs

In [ ]:
# write your code here

# 1. create an additional function that takes 2 list and combines them into one container, e.g. dictionary

# 2. use JoinNode to define a new node that merge results from Exercise 2 and connect it to the workflow

# 3. run the workflow and check the results of the merging node