MapNode

If you want to iterate over a list of inputs, but need to feed all iterated outputs afterwards as one input (an array) to the next node, you need to use a MapNode. A MapNode is quite similar to a normal Node, but it can take a list of inputs and operate over each input separately, ultimately returning a list of outputs. (The main homepage has a nice section about MapNode and iterables if you want to learn more).

Let's demonstrate this with a simple function interface:


In [ ]:
from nipype import Function
def square_func(x):
    return x ** 2
square = Function(["x"], ["f_x"], square_func)

We see that this function just takes a numeric input and returns its squared value.


In [ ]:
square.run(x=2).outputs.f_x


Out[ ]:
4

What if we wanted to square a list of numbers? We could set an iterable and just split up the workflow in multiple sub-workflows. But say we were making a simple workflow that squared a list of numbers and then summed them. The sum node would expect a list, but using an iterable would make a bunch of sum nodes, and each would get one number from the list. The solution here is to use a MapNode.

The MapNode constructor has a field called iterfield, which tells it what inputs should be expecting a list.


In [ ]:
from nipype import MapNode
square_node = MapNode(square, name="square", iterfield=["x"])

In [ ]:
square_node.inputs.x = range(4)
square_node.run().outputs.f_x


170301-21:57:47,860 workflow INFO:
	 Executing node square in dir: /tmp/tmpZKVt49/square
170301-21:57:47,870 workflow INFO:
	 Executing node _square0 in dir: /tmp/tmpZKVt49/square/mapflow/_square0
170301-21:57:47,882 workflow INFO:
	 Runtime memory and threads stats unavailable
170301-21:57:47,887 workflow INFO:
	 Executing node _square1 in dir: /tmp/tmpZKVt49/square/mapflow/_square1
170301-21:57:47,906 workflow INFO:
	 Runtime memory and threads stats unavailable
170301-21:57:47,911 workflow INFO:
	 Executing node _square2 in dir: /tmp/tmpZKVt49/square/mapflow/_square2
170301-21:57:47,923 workflow INFO:
	 Runtime memory and threads stats unavailable
170301-21:57:47,926 workflow INFO:
	 Executing node _square3 in dir: /tmp/tmpZKVt49/square/mapflow/_square3
170301-21:57:47,936 workflow INFO:
	 Runtime memory and threads stats unavailable
Out[ ]:
[0, 1, 4, 9]

Because iterfield can take a list of names, you can operate over multiple sets of data, as long as they're the same length. The values in each list will be paired; it does not compute a combinatoric product of the lists.


In [ ]:
def power_func(x, y):
    return x ** y

In [ ]:
power = Function(["x", "y"], ["f_xy"], power_func)
power_node = MapNode(power, name="power", iterfield=["x", "y"])
power_node.inputs.x = range(4)
power_node.inputs.y = range(4)
print(power_node.run().outputs.f_xy)


170301-21:57:47,970 workflow INFO:
	 Executing node power in dir: /tmp/tmpJPLPn8/power
170301-21:57:47,985 workflow INFO:
	 Executing node _power0 in dir: /tmp/tmpJPLPn8/power/mapflow/_power0
170301-21:57:47,999 workflow INFO:
	 Runtime memory and threads stats unavailable
170301-21:57:48,2 workflow INFO:
	 Executing node _power1 in dir: /tmp/tmpJPLPn8/power/mapflow/_power1
170301-21:57:48,15 workflow INFO:
	 Runtime memory and threads stats unavailable
170301-21:57:48,20 workflow INFO:
	 Executing node _power2 in dir: /tmp/tmpJPLPn8/power/mapflow/_power2
170301-21:57:48,34 workflow INFO:
	 Runtime memory and threads stats unavailable
170301-21:57:48,43 workflow INFO:
	 Executing node _power3 in dir: /tmp/tmpJPLPn8/power/mapflow/_power3
170301-21:57:48,59 workflow INFO:
	 Runtime memory and threads stats unavailable
[1, 1, 4, 27]

But not every input needs to be an iterfield.


In [ ]:
power_node = MapNode(power, name="power", iterfield=["x"])
power_node.inputs.x = range(4)
power_node.inputs.y = 3
print(power_node.run().outputs.f_xy)


170301-21:57:48,84 workflow INFO:
	 Executing node power in dir: /tmp/tmphJmuk0/power
170301-21:57:48,96 workflow INFO:
	 Executing node _power0 in dir: /tmp/tmphJmuk0/power/mapflow/_power0
170301-21:57:48,112 workflow INFO:
	 Runtime memory and threads stats unavailable
170301-21:57:48,117 workflow INFO:
	 Executing node _power1 in dir: /tmp/tmphJmuk0/power/mapflow/_power1
170301-21:57:48,131 workflow INFO:
	 Runtime memory and threads stats unavailable
170301-21:57:48,135 workflow INFO:
	 Executing node _power2 in dir: /tmp/tmphJmuk0/power/mapflow/_power2
170301-21:57:48,150 workflow INFO:
	 Runtime memory and threads stats unavailable
170301-21:57:48,159 workflow INFO:
	 Executing node _power3 in dir: /tmp/tmphJmuk0/power/mapflow/_power3
170301-21:57:48,176 workflow INFO:
	 Runtime memory and threads stats unavailable
[0, 1, 8, 27]

As in the case of iterables, each underlying MapNode execution can happen in parallel. Hopefully, you see how these tools allow you to write flexible, reusable workflows that will help you processes large amounts of data efficiently and reproducibly.

Why is this important?

Let's consider we have multiple functional images (A) and each of them should be motioned corrected (B1, B2, B3,..). But afterwards, we want to put them all together into a GLM, i.e. the input for the GLM should be an array of [B1, B2, B3, ...]. Iterables can't do that. They would split up the pipeline. Therefore, we need MapNodes.

Let's look at a simple example, where we want to motion correct two functional images. For this we need two nodes:

  • Gunzip, to unzip the files (plural)
  • Realign, to do the motion correction

In [ ]:
from nipype.algorithms.misc import Gunzip
from nipype.interfaces.spm import Realign
from nipype.pipeline.engine import Node, MapNode, Workflow

files = ['/data/ds102/sub-01/func/sub-01_task-flanker_run-1_bold.nii.gz',
         '/data/ds102/sub-01/func/sub-01_task-flanker_run-2_bold.nii.gz']

realign = Node(Realign(register_to_mean=True),
               name='motion_correction')

If we try to specify the input for the Gunzip node with a simple Node, we get the following error:


In [ ]:
gunzip = Node(Gunzip(), name='gunzip',)
gunzip.inputs.in_file = files


---------------------------------------------------------------------------
TraitError                                Traceback (most recent call last)
<ipython-input-9-4c705ede4d1a> in <module>()
      1 gunzip = Node(Gunzip(), name='gunzip',)
----> 2 gunzip.inputs.in_file = files

/opt/conda/envs/python2/lib/python2.7/site-packages/nipype/interfaces/traits_extension.pyc in validate(self, object, name, value)
     72             Note: The 'fast validator' version performs this check in C.
     73         """
---> 74         validated_value = super(BaseFile, self).validate(object, name, value)
     75         if not self.exists:
     76             return validated_value

/opt/conda/envs/python2/lib/python2.7/site-packages/traits/trait_types.pyc in validate(self, object, name, value)
    347             return value
    348 
--> 349         self.error( object, name, value )
    350 
    351     def create_editor ( self ):

/opt/conda/envs/python2/lib/python2.7/site-packages/traits/trait_handlers.pyc in error(self, object, name, value)
    170         """
    171         raise TraitError( object, name, self.full_info( object, name, value ),
--> 172                           value )
    173 
    174     def full_info ( self, object, name, value ):

TraitError: The 'in_file' trait of a GunzipInputSpec instance must be an existing file name, but a value of ['/data/ds102/sub-01/func/sub-01_task-flanker_run-1_bold.nii.gz', '/data/ds102/sub-01/func/sub-01_task-flanker_run-2_bold.nii.gz'] <type 'list'> was specified.
TraitError: The 'in_file' trait of a GunzipInputSpec instance must be an existing file name, but a value of ['/data/ds102/sub-01/func/sub-01_task-flanker_run-1_bold.nii.gz', '/data/ds102/sub-01/func/sub-01_task-flanker_run-2_bold.nii.gz'] <type 'list'> was specified.

But if we do it with a MapNode, it works:


In [ ]:
gunzip = MapNode(Gunzip(), name='gunzip',
                 iterfield=['in_file'])
gunzip.inputs.in_file = files

Now, we just have to create a workflow, connect the nodes and we can run it:


In [ ]:
mcflow = Workflow(name='realign_with_spm')
mcflow.connect(gunzip, 'out_file', realign, 'in_files')
mcflow.base_dir = '/data'
mcflow.run('MultiProc', plugin_args={'n_procs': 4})