Nipype Concepts: Iteration

The previous notebooks explained Interfaces and Workflows, which are the main building blocks of any processing stream implemented in Nipype. Here, we'll introduce two simple constructs that allow you to pipe multiple instances of similar data through the same workflow.

In neuroimaging, it is common to collect multiple datasets a given experiment, perhaps one for each subject. It is often also the case that each subject will have multiple runs, or sessions, which should be processed identically. Nipype supports this kind of design with the constructs iterables and MapNode.

There is a good page on the online documentation explaining in detail how these work. This notebook will hopefully make these concepts a bit more concrete.

Smooth and Mask Workflow

Let's start off with our simple brain extraction workflow from before. Now say that instead of processing only one brain, you had a set of subjects you needed to analyze. Of course, you'd probably turn to that old programming standby, the for loop:


In [1]:
from __future__ import print_function

In [2]:
%%bash
for f in data/T1-a data/T1-b
  do
  bet $f ${f}_brain -m
  fslmaths $f -s 2 ${f}_smooth
  fslmaths ${f}_smooth -mas ${f}_brain_mask ${f}_smooth_mask
done

In [3]:
ls data/*_smooth_mask.nii.gz


data/T1-a_smooth_mask.nii.gz  data/T1-b_smooth_mask.nii.gz

How would we translate this to Nipype? First, write the workflow as before.


In [4]:
from nipype.interfaces import fsl
from nipype import Workflow, Node


/Users/mwaskom/anaconda/lib/python2.7/site-packages/nipy/labs/glm/glm.py:7: FutureWarning: Module nipy.labs.utils.routines deprecated, will be removed
  from ..utils import mahalanobis
/Users/mwaskom/anaconda/lib/python2.7/site-packages/nipype/interfaces/nipy/model.py:17: FutureWarning: Module nipy.labs.glm deprecated, will be removed. Please use nipy.modalities.fmri.glm instead.
  import nipy.labs.glm.glm as GLM

In [5]:
skullstrip = Node(fsl.BET(mask=True), name="skullstrip")
smooth = Node(fsl.IsotropicSmooth(fwhm=4), name="smooth")
mask = Node(fsl.ApplyMask(), name="mask")

wf = Workflow(name="smoothflow")
wf.base_dir = "."
wf.connect(skullstrip, "mask_file", mask, "mask_file")
wf.connect(smooth, "out_file", mask, "in_file")

Next, we're going to use a special kind of Interface called an IdentityInterface. Basically, whatever this node receives as an input will get exposed as an output. I like to think of this as using a "variable" in the workflow graph, as it minimizes the extent to which you have to repeat yourself. We'll use this to make a single place where we specify the input file, which goes to both the skullstripping and smoothing nodes of the workflow.


In [6]:
from nipype import IdentityInterface

In [7]:
inputs = Node(IdentityInterface(fields=["mri_file"]), name="inputs")
wf.connect(inputs, "mri_file", skullstrip, "in_file")
wf.connect(inputs, "mri_file", smooth, "in_file")

Let's also set the config so the workflow output isn't so loud in the notebook


In [8]:
from nipype import config, logging
config.set('logging', 'workflow_level', 'CRITICAL')
config.set('logging', 'interface_level', 'CRITICAL')
logging.update_logging(config)

Iterables

Say we had a list of files we wanted to skullstrip, smooth, and mask.


In [9]:
from os.path import abspath
files = ["data/T1-a.nii.gz", "data/T1-b.nii.gz"]
files = map(abspath, files)

You could use a for loop to execute this workflow over each of these files:


In [10]:
for f in files:
    inputs.inputs.mri_file = f
    wf.run()

But a much better way to execute a workflow over a sequence of data is using iterables. You configure iterables by assigning a tuple of (field_name, [val_1, val_2, ...]) to a particular node.


In [11]:
inputs.iterables = ("mri_file", files)

What happens here is that everything in the graph that is dependent on a Node with iterables will be duplicated for each value in the iterables list. Now when we run the workflow, both images will be processed.

Of course, if you run a workflow with a distributed plugin, the mutiple files will be processed in parallel. With a few lines of code, you can set up parallel processing over a group of subjects and ensure that each is processed in exactly the same way.


In [12]:
wf.run(plugin="MultiProc", plugin_args={"n_proc": 2})


Out[12]:
<networkx.classes.digraph.DiGraph at 0x109d029d0>

If we look in the workflow directory, we see that the intermediate processing is separated for each iterable input. So, you could add more data and rerun without clobbering the old working files. You'll also see that the directory names just get set with the iterable value, and are very long when you use a filename. In the next notebook, we'll show you a better way to inject data into a workflow.


In [13]:
ls smoothflow/_mri_file*/smooth/*.nii.gz


smoothflow/_mri_file_..Users..mwaskom..Dropbox..Projects..Nipype_Concepts..data..T1-a.nii.gz/smooth/T1-a_smooth.nii.gz
smoothflow/_mri_file_..Users..mwaskom..Dropbox..Projects..Nipype_Concepts..data..T1-b.nii.gz/smooth/T1-b_smooth.nii.gz

MapNode

There's a second way to iterate over a list of data in the context of a workflow. The Node class has a close cousin, called MapNode. A MapNode is quite similar to a Node, but it can take a list of inputs and operate over each input separately, ultimately returning a list of outputs. Let's demonstrate this with a simple function interface.


In [14]:
from nipype import Function
def square_func(x):
    return x ** 2
square = Function(["x"], ["f_x"], square_func)

We see that this function just takes a numeric input and returns its squared value.


In [15]:
print(square.run(x=2).outputs.f_x)


4

What if we wanted to square a list of numbers? We could set an iterable, as we see above. But say we were making a simple workflow that squared a list of numbers and then summed them. The sum node would expect a list, but using an iterable would make a bunch of sum nodes, and each would get one number from the list. The solution here (and in similar problems) is to use a MapNode.

The MapNode constructor has a field called iterfield, which tells it what inputs should be expecting a list.


In [16]:
from nipype import MapNode
square_node = MapNode(square, name="square", iterfield=["x"])

In [17]:
square_node.inputs.x = range(4)
print(square_node.run().outputs.f_x)


[0, 1, 4, 9]

Because iterfield can take a list of names, you can operate over multiple sets of data, as long as they're the same length. (The values in each list will be paired; it does not compute a combinatoric product of the lists).


In [18]:
def power_func(x, y):
    return x ** y

In [19]:
power = Function(["x", "y"], ["f_xy"], power_func)
power_node = MapNode(power, name="power", iterfield=["x", "y"])
power_node.inputs.x = range(4)
power_node.inputs.y = range(4)
print(power_node.run().outputs.f_xy)


[1, 1, 4, 27]

But not every input needs to be an iterfield.


In [21]:
power_node = MapNode(power, name="power", iterfield=["x"])
power_node.inputs.x = range(4)
power_node.inputs.y = 3
print(power_node.run().outputs.f_xy)


[0, 1, 8, 27]

As in the case of iterables, each underlying MapNode execution can happen in parallel. Hopefully, you see how these tools allow you to write flexible, reusable workflows that will help you processes large amounts of data efficiently and reproducibly.


In [22]:
!make clean

In [ ]: