Nipype Concepts: Workflows

The goal of this notebook is to introduce and illustrate the basic concepts underlying Workflows. Although it would be possible to write analysis scripts using just Nipype Interfaces, and this may provide some advantages over directly making command-line calls, the main benefits of Nipype will come by creating workflows.

First we'll use our helper function from before.


In [1]:
%matplotlib inline
from __future__ import print_function
import numpy as np
import matplotlib.pyplot as plt
import nibabel as nib

In [2]:
def plot_slice(fname, z_idx=5):

    # Load the image and collect the data
    # and orientation information
    img = nib.load(fname)
    data = img.get_data()
    aff = img.get_affine()

    # Find the center of the brain matrix
    ctr = np.dot(np.linalg.inv(aff), [0, 0, 0, 1])[:3]

    # Plot the data
    vmin, vmax = (0, 1) if data.dtype == np.int16 else (30, 150)
    plt.imshow(np.rot90(data[:, :, ctr[2] + z_idx]),
           cmap="gray", vmin=vmin, vmax=vmax)
    plt.gca().set_axis_off()

To motivate the need for and principles behind workflows, let's start with a simple bit of preprocessing. We'd like to perform the following steps of processing.

  • Skullstrip an image to obtain a mask
  • Smooth the original image
  • Mask the smoothed image

Let's first see what this would look like with a shell script, using the Bash cell magic function. This is how FSL scripts are usually written.


In [3]:
%%bash
bet data/T1 data/T1_brain -m
fslmaths data/T1 -s 2 data/T1_smooth
fslmaths data/T1_smooth -mas data/T1_brain_mask data/T1_smooth_mask

This is simple and straightforward. We can see that this does exactly what we wanted by plotting the three steps of processing.


In [4]:
f = plt.figure(figsize=(12, 4))
for i, img in enumerate(["T1", "T1_smooth", "T1_smooth_mask"]):
    f.add_subplot(1, 3, i + 1)
    plot_slice("data/%s.nii.gz" % img)
    plt.title(img)


The resulting images are exactly what we would expect given the script. Before moving on, let's clean these intermediate files


In [5]:
!make clean

Now let's see what this would look like if we used Nipype, but only the Interfaces functionality. It's simple enough to write a basic procedural script, this time in Python, to do basically the same thing.


In [6]:
from nipype.interfaces import fsl

skullstrip = fsl.BET(in_file="data/T1.nii.gz", mask=True,
                     out_file="data/T1_brain.nii.gz")
res = skullstrip.run()

smooth = fsl.IsotropicSmooth(in_file="data/T1.nii.gz", fwhm=4,
                             out_file="data/T1_smooth.nii.gz")
smooth.run()

mask = fsl.ApplyMask(in_file="data/T1_smooth.nii.gz",
                     mask_file="data/T1_brain_mask.nii.gz",
                     out_file="data/T1_smooth_mask.nii.gz")
mask.run()

f = plt.figure(figsize=(12, 4))
for i, img in enumerate(["T1", "T1_smooth", "T1_smooth_mask"]):
    f.add_subplot(1, 3, i + 1)
    plot_slice("data/%s.nii.gz" % img)
    plt.title(img)


/Users/mwaskom/anaconda/lib/python2.7/site-packages/nipy/labs/glm/glm.py:7: FutureWarning: Module nipy.labs.utils.routines deprecated, will be removed
  from ..utils import mahalanobis
/Users/mwaskom/anaconda/lib/python2.7/site-packages/nipype/interfaces/nipy/model.py:17: FutureWarning: Module nipy.labs.glm deprecated, will be removed. Please use nipy.modalities.fmri.glm instead.
  import nipy.labs.glm.glm as GLM

This is more verbose, although it does have its advantages. There's the automated input validation we saw previously, some of the options are named more meaningfully, and you don't need to remember, for example, that fslmaths' smoothing kernel is set in sigma instead of FWHM -- Nipype does that conversion behind the scenes.

In this little script, we want the output of the BET command to be an input to the ApplyMask command. We accomplished this by manually ensuring that the two parameters are provided with the same filename. Let's use a little bit more of the Nipype Interfaces machinery to automate that processes somewhat.


In [7]:
!make clean

In [8]:
skullstrip = fsl.BET(in_file="data/T1.nii.gz", mask=True)
bet_result = skullstrip.run()

smooth = fsl.IsotropicSmooth(in_file="data/T1.nii.gz", fwhm=4)
smooth_result = smooth.run()

mask = fsl.ApplyMask(in_file=smooth_result.outputs.out_file,
                     mask_file=bet_result.outputs.mask_file)
mask.run()

f = plt.figure(figsize=(12, 4))
for i, img in enumerate(["data/T1", "T1_smooth", "T1_smooth_masked"]):
    f.add_subplot(1, 3, i + 1)
    plot_slice("%s.nii.gz" % img)
    plt.title(img)


Here we didn't need to name the intermediate files; Nipype did that behind the scenes, and then we passed the result object (which knows those names) onto the next step in the processing stream. This is somewhat more concsise than the first Nipype version, but it's still a procedural script, and the dependency relationship between the stages of processing is not particularly obvious. To address these issues, and to provide solutions to problems we might not know we have yet, Nipype offers Workflows.


In [9]:
from nipype import Node, Workflow

What we've implicitly done above is to encode our processing stream as a directed acyclic graphs: each stage of processing is a node in this graph, and some nodes are unidirectionally dependent on others. In this case there is one input file and several output files, but there are no cycles -- there's a clear line of directionality to the processing. What the Node and Workflow classes do is make these relationships more explicit.

The basic architecture is that the Node provides a light wrapper around an Interface. It exposes the inputs and outputs of the Interface as its own, but it add some additional functionality that allows you to connect Nodes into a Workflow. Let's rewrite the above script with these tools.


In [10]:
# For reasons that will later become clear, it's important to
# pass filenames to Nodes as absolute paths
from os.path import abspath
in_file = abspath("data/T1.nii.gz")

skullstrip = Node(fsl.BET(in_file=in_file, mask=True), name="skullstrip")

smooth = Node(fsl.IsotropicSmooth(in_file=in_file, fwhm=4), name="smooth")

mask = Node(fsl.ApplyMask(), name="mask")

The first argument to Node is the iterface object that it will wrap, and then each Node needs a unique name. This doesn't need to bear any particular relationship to the variable name you assign the Node object to, but it's probably good form to keep them the same.

This looks mostly similar to what we did above, but we've left out the two crucial inputs to the ApppleMask step. We'll set those up by defining a Workflow object and then making connections among the Nodes.


In [11]:
wf = Workflow(name="smoothflow")  # Workflows need names too

The Workflow object has a method called connect that is going to do most of the work here. Let's look at its docstring.


In [12]:
print(wf.connect.__doc__)


Connect nodes in the pipeline.

        This routine also checks if inputs and outputs are actually provided by
        the nodes that are being connected.

        Creates edges in the directed graph using the nodes and edges specified
        in the `connection_list`.  Uses the NetworkX method
        DiGraph.add_edges_from.

        Parameters
        ----------

        args : list or a set of four positional arguments

            Four positional arguments of the form::

              connect(source, sourceoutput, dest, destinput)

            source : nodewrapper node
            sourceoutput : string (must be in source.outputs)
            dest : nodewrapper node
            destinput : string (must be in dest.inputs)

            A list of 3-tuples of the following form::

             [(source, target,
                 [('sourceoutput/attribute', 'targetinput'),
                 ...]),
             ...]

            Or::

             [(source, target, [(('sourceoutput1', func, arg2, ...),
                                         'targetinput'), ...]),
             ...]
             sourceoutput1 will always be the first argument to func
             and func will be evaluated and the results sent ot targetinput

             currently func needs to define all its needed imports within the
             function as we use the inspect module to get at the source code
             and execute it remotely
        

We see that there are two ways to call connect. You can either make one connection at a time, or you can pass a list of several connections. In either case, you're providing it with four pieces of information to define the connection:

  • The source node object
  • The name of the output field from the source node
  • The destination node object
  • The name of the input field from the desination node

We'll illustrate each method in the following cell:


In [13]:
# First the "simple", but more restricted method
wf.connect(skullstrip, "mask_file", mask, "mask_file")

# Now the more complicated method. Note this way you can define several connections at once,
# and you can even define several connnections between two nodes in one smalller step
wf.connect([(smooth, mask, [("out_file", "in_file")])])

Now the workflow is complete. Above, we mentioned that the workflow can be thought of as a directed acyclic graph. In fact, that's literally how it's represented behind the scenes, and we can use that to explore the workflow visually:


In [14]:
wf.write_graph("workflow_graph.dot")
from IPython.display import Image
Image(filename="workflow_graph.dot.png")


INFO:workflow:Converting dotfile: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/workflow_graph.dot to png format
Out[14]:

This representation makes the dependency structure of the workflow obvious. (By the way, the names of the nodes in this graph are the names we gave our Node objects above, so pick something meaningful for those!)

At this point, all we've done is define the workflow. We haven't executed any code yet. Much like Interface objects, the Workflow object has a run method that we can call so that it executes. Let's do that and then examine the results.


In [15]:
!make clean

In [16]:
wf.run()


INFO:workflow:['check', 'execution', 'logging']
INFO:workflow:Running serially.
INFO:workflow:Executing node smooth in dir: /var/folders/rr/svlz5yhs0tsdqnsp8g4qg65m0000gn/T/tmp4aVNLa/smoothflow/smooth
INFO:workflow:Running: fslmaths /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/data/T1.nii.gz -s 1.69864 /private/var/folders/rr/svlz5yhs0tsdqnsp8g4qg65m0000gn/T/tmp4aVNLa/smoothflow/smooth/T1_smooth.nii.gz
INFO:workflow:Executing node skullstrip in dir: /var/folders/rr/svlz5yhs0tsdqnsp8g4qg65m0000gn/T/tmpkx0sCV/smoothflow/skullstrip
INFO:workflow:Running: bet /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/data/T1.nii.gz /private/var/folders/rr/svlz5yhs0tsdqnsp8g4qg65m0000gn/T/tmpkx0sCV/smoothflow/skullstrip/T1_brain.nii.gz -m
INFO:workflow:Executing node mask in dir: /var/folders/rr/svlz5yhs0tsdqnsp8g4qg65m0000gn/T/tmp9lSuyp/smoothflow/mask
INFO:workflow:Running: fslmaths /private/var/folders/rr/svlz5yhs0tsdqnsp8g4qg65m0000gn/T/tmp4aVNLa/smoothflow/smooth/T1_smooth.nii.gz -mas /private/var/folders/rr/svlz5yhs0tsdqnsp8g4qg65m0000gn/T/tmpkx0sCV/smoothflow/skullstrip/T1_brain_mask.nii.gz /private/var/folders/rr/svlz5yhs0tsdqnsp8g4qg65m0000gn/T/tmp9lSuyp/smoothflow/mask/T1_smooth_masked.nii.gz
Out[16]:
<networkx.classes.digraph.DiGraph at 0x10c658fd0>

We see that executing the workflow prints some verbose logging information to the screen. If we look closely, we see that things are actually being executed over in /tmp. This is fundamental to the concept of Nodes/Workflows (and is why we needed to use absolute paths above). Unlike interfaces, which by default spit out results to the local directry, the Workflow engine executes things off in its own directory hierarchy. By default this is a temporary directory, but you can set it manually.


In [17]:
wf.base_dir = "working_dir"
wf.run()


INFO:workflow:['check', 'execution', 'logging']
INFO:workflow:Running serially.
INFO:workflow:Executing node smooth in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/smooth
INFO:workflow:Running: fslmaths /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/data/T1.nii.gz -s 1.69864 /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/smooth/T1_smooth.nii.gz
INFO:workflow:Executing node skullstrip in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/skullstrip
INFO:workflow:Running: bet /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/data/T1.nii.gz /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/skullstrip/T1_brain.nii.gz -m
INFO:workflow:Executing node mask in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/mask
INFO:workflow:Running: fslmaths /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/smooth/T1_smooth.nii.gz -mas /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/skullstrip/T1_brain_mask.nii.gz /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/mask/T1_smooth_masked.nii.gz
Out[17]:
<networkx.classes.digraph.DiGraph at 0x10c6630d0>

Let's take a look at the resulting images to convince ourselves we've done the same thing as before


In [18]:
f = plt.figure(figsize=(12, 4))
for i, img in enumerate(["data/T1.nii.gz",
                         "working_dir/smoothflow/smooth/T1_smooth.nii.gz",
                         "working_dir/smoothflow/mask/T1_smooth_masked.nii.gz"]):
    f.add_subplot(1, 3, i + 1)
    plot_slice(img)


When you start writing full-fledged analysis workflows, things can get quite complicated. Some aspects of neuroimaging analysis can be thought of as a coherent step at a level more abstract than the execution of a single command line binary. For instance, in the standard FEAT script in FSL, several calls are made in the process of using susan to perform nonlinear smoothing on an image. In Nipype, you can write nested workflows, where a sub-workflow can take the place of a Node in a given script. That way, you can write smaller, more self-contained

Let's use the prepackaged susan workflow that ships with Nipype to replace our gaussian filtering node and demonstrate how this works.


In [19]:
from nipype.workflows.fmri.fsl import create_susan_smooth

Calling this function will return a pre-written Workflow object:


In [20]:
susan = create_susan_smooth(separate_masks=False)

Let's display the graph to see what happens here.


In [21]:
susan.write_graph("susan_workflow.dot")
Image(filename="susan_workflow.dot.png")


INFO:workflow:Converting dotfile: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/susan_workflow.dot to png format
Out[21]:

We see that the workflow has an inputnode and an outputnode. While not strictly necessary, this is standard practice for workflows (especially those that are intended to be used as nested workflows in the context of a longer analysis graph) and makes it more clear how to connect inputs and outputs from this workflow. Let's take a look at what those inputs and outputs are. Like Nodes, Workflows have inputs and outputs attributes that take a second subattribute corresponding to the specific node we want to make connections to.


In [22]:
print("Inputs:")
print(susan.inputs.inputnode)
print("Outputs:")
print(susan.outputs.outputnode)


Inputs:

fwhm = <undefined>
in_files = <undefined>
mask_file = <undefined>

Outputs:

smoothed_files = None

Note that inputnode and outputnode are just conventions, and the Workflow object exposes connections to all of its component nodes:


In [23]:
print(susan.inputs.smooth)


args = <undefined>
dimension = 3
environ = {'FSLOUTPUTTYPE': 'NIFTI_GZ'}
ignore_exception = False
out_file = <undefined>
output_type = NIFTI_GZ
terminal_output = stream
use_median = 1

Let's see how we would write a new workflow that uses this nested smoothing step. The susan workflow actually expects to receive and output a list of files (it's intended to be executed on each of several runs of fMRI data. We'll cover exactly how that works in later tutorials, but for the moment we need to add an additional Function node to deal with the fact that susan is outputting a list. We can use a simple lambda function; this is trivial.


In [24]:
from nipype import Function
extract_func = lambda l: l[0]
list_extract = Node(Function(input_names=["l"], output_names=["out"],
                             function=extract_func), name="list_extract")

In [25]:
wf2 = Workflow(name="susanflow", base_dir="working_dir")
skullstrip2 = Node(fsl.BET(in_file=in_file, mask=True), name="skullstrip")
mask2 = Node(fsl.ApplyMask(), name="mask")


wf2.connect([
    (skullstrip2, mask2, [("mask_file", "mask_file")]),
    (skullstrip2, susan, [("mask_file", "inputnode.mask_file")]),
    (susan, list_extract, [("outputnode.smoothed_files", "l")]),
    (list_extract, mask2, [("out", "in_file")])
    ])
susan.inputs.inputnode.in_files = abspath("data/T1.nii.gz")
susan.inputs.inputnode.fwhm = 5

First, let's see what this new processing graph looks like.


In [26]:
wf2.write_graph("./full_susan_flow.dot")
Image(filename="full_susan_flow.dot.png")


INFO:workflow:Converting dotfile: ./full_susan_flow.dot to png format
Out[26]:

We can see how there is a nested smoothing workflow in the place of our previous smooth node. This provides a very detailed view, but what if you just wanted to give a higher-level summary of the processing steps? After all, that is the purpose of encaupsulating smaller sterotyped streams in a nested workflow. That, fortunately, is an option when writing out the graph:


In [27]:
wf2.write_graph("./full_susan_workflow_toplevel.dot", "orig")
Image(filename="full_susan_workflow_toplevel.dot.png")


INFO:workflow:Creating detailed dot file: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/full_susan_workflow_toplevel_detailed.dot
INFO:workflow:Creating dot file: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/full_susan_workflow_toplevel.dot
Out[27]:

That's much more managable. Now let's execute the workflow and examine the input and output:


In [28]:
wf2.run()


INFO:workflow:['check', 'execution', 'logging']
INFO:workflow:Running serially.
INFO:workflow:Executing node skullstrip in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/skullstrip
INFO:workflow:Running: bet /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/data/T1.nii.gz /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/skullstrip/T1_brain.nii.gz -m
INFO:workflow:Executing node mask in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/mask
INFO:workflow:Executing node _mask0 in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/mask/mapflow/_mask0
INFO:workflow:Running: fslmaths /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/data/T1.nii.gz -mas /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/skullstrip/T1_brain_mask.nii.gz /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/mask/mapflow/_mask0/T1_mask.nii.gz
INFO:workflow:Executing node median in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/median
INFO:workflow:Executing node _median0 in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/median/mapflow/_median0
INFO:workflow:Running: fslstats /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/data/T1.nii.gz -k /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/skullstrip/T1_brain_mask.nii.gz -p 50 
INFO:interface:stdout 2015-01-11T13:51:27.103853:78.463234 
INFO:workflow:Executing node meanfunc2 in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/meanfunc2
INFO:workflow:Executing node _meanfunc20 in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/meanfunc2/mapflow/_meanfunc20
INFO:workflow:Running: fslmaths /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/mask/mapflow/_mask0/T1_mask.nii.gz -Tmean /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/meanfunc2/mapflow/_meanfunc20/T1_mask_mean.nii.gz
INFO:workflow:Executing node merge in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/merge
INFO:workflow:Executing node smooth in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/smooth
INFO:workflow:Executing node _smooth0 in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/smooth/mapflow/_smooth0
INFO:workflow:Running: susan /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/data/T1.nii.gz 58.8474255000 2.1233045007 3 1 1 /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/meanfunc2/mapflow/_meanfunc20/T1_mask_mean.nii.gz 58.8474255000 /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/smooth/mapflow/_smooth0/T1_smooth.nii.gz
INFO:workflow:Executing node list_extract in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/list_extract
INFO:workflow:Executing node mask in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/mask
INFO:workflow:Running: fslmaths /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/susan_smooth/smooth/mapflow/_smooth0/T1_smooth.nii.gz -mas /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/skullstrip/T1_brain_mask.nii.gz /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/susanflow/mask/T1_smooth_masked.nii.gz
Out[28]:
<networkx.classes.digraph.DiGraph at 0x10723f3d0>

In [29]:
f = plt.figure(figsize=(12, 4))
for i, img in enumerate(["data/T1.nii.gz",
                         "working_dir/susanflow/mask/T1_smooth_masked.nii.gz"]):
    f.add_subplot(1, 2, i + 1)
    plot_slice(img)


So far, we've seen that you can build up rather complex analysis workflows. But at the moment, it's not been made clear why this is worth the extra trouble from writing a simple procedural script. To demonstrate the first added benefit of the Nipype, let's return to our original workflow and run it again.


In [30]:
wf.run()


INFO:workflow:['check', 'execution', 'logging']
INFO:workflow:Running serially.
INFO:workflow:Executing node smooth in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/smooth
INFO:workflow:Collecting precomputed outputs
INFO:workflow:Executing node skullstrip in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/skullstrip
INFO:workflow:Collecting precomputed outputs
INFO:workflow:Executing node mask in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/mask
INFO:workflow:Collecting precomputed outputs
Out[30]:
<networkx.classes.digraph.DiGraph at 0x106f71490>

That happened quickly! If you look at the log messages, you'll see 'Collecting precomputed outputs.' Workflows (actually this is handled by the Node code) are smart, and know if their inputs have changed from the last time they are run. If they have not, they don't recompute; they just turn around and pass out the resulting files from the previous run. This is done on a node-by-node basis, also. Here's what happened if we just tweak one thing:


In [31]:
wf.inputs.smooth.fwhm = 1
wf.run()


INFO:workflow:['check', 'execution', 'logging']
INFO:workflow:Running serially.
INFO:workflow:Executing node skullstrip in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/skullstrip
INFO:workflow:Collecting precomputed outputs
INFO:workflow:Executing node smooth in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/smooth
INFO:workflow:Running: fslmaths /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/data/T1.nii.gz -s 0.42466 /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/smooth/T1_smooth.nii.gz
INFO:workflow:Executing node mask in dir: /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/mask
INFO:workflow:Running: fslmaths /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/smooth/T1_smooth.nii.gz -mas /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/skullstrip/T1_brain_mask.nii.gz /Users/mwaskom/Dropbox/Projects/Nipype_Concepts/working_dir/smoothflow/mask/T1_smooth_masked.nii.gz
Out[31]:
<networkx.classes.digraph.DiGraph at 0x106d27910>

We see that the smoothing stage was reexecuted, since its inputs had changed. This triggered a cascade such that any file depending on the smooth node (in this case, the mask node, also recomputed). However, the skullstrip node hasn't changed since the first time it ran, so it just coughed up its original files.

That's the first main benefit of using Workflows: efficient recomputing.

So far, when we've called wf.run(), each step in the workflow has executed in serial using a single compute core. Perhaps the principle advantage of Nipype is how easily you can scale up a workflow to an extremely parallel cluster computing environment. The run method takes as argument a Plugin string or class. See here for detailed documentation. Basically, it works like this:


In [32]:
wf.run("MultiProc")


INFO:workflow:['check', 'execution', 'logging']
INFO:workflow:Running in parallel.
INFO:workflow:Submitting 2 jobs
INFO:workflow:Executing: skullstrip ID: 0
INFO:workflow:[Job finished] jobname: skullstrip jobid: 0
INFO:workflow:Executing: smooth ID: 1
INFO:workflow:[Job finished] jobname: smooth jobid: 1
INFO:workflow:Submitting 1 jobs
INFO:workflow:Executing: mask ID: 2
INFO:workflow:[Job finished] jobname: mask jobid: 2
Out[32]:
<networkx.classes.digraph.DiGraph at 0x10bd8cdd0>

You can also pass a dictionary of arguments that are relevant to the plugin; in the case of MultiProc, you can specify how large the worker pool should be:


In [33]:
wf.run("MultiProc", dict(n_procs=2))


INFO:workflow:['check', 'execution', 'logging']
INFO:workflow:Running in parallel.
INFO:workflow:Submitting 2 jobs
INFO:workflow:Executing: smooth ID: 0
INFO:workflow:[Job finished] jobname: smooth jobid: 0
INFO:workflow:Executing: skullstrip ID: 1
INFO:workflow:[Job finished] jobname: skullstrip jobid: 1
INFO:workflow:Submitting 1 jobs
INFO:workflow:Executing: mask ID: 2
INFO:workflow:[Job finished] jobname: mask jobid: 2
Out[33]:
<networkx.classes.digraph.DiGraph at 0x10bda5910>

The MultiProc plugin uses the multiprocessing package in the standard library, and is the only parallel plugin that is garunteed to work right out of the box. However, Nipype comes provided with plugins to IPython's cluster architecture, SGE, PBS, and several other distributed engines. Because the workflow is represented as a directed graph, all Nodes with no parents or those whose parents have completed will run be executed simultaneously.

In this case, that just means that the skullstrip and smooth Nodes execute together, but when you scale up to Workflows with many subjects and many runs per subject (we will discuss how to accomplish this in following tutorials), each can run together, such that (in the case of unlimited computing resources), you could process 50 subjects with 10 runs of functional data in esentially the time it would take to process a single run.

To emphasize the contribution of Nipype here, you can write and test your workflow on one subject computing on your local CPU, where it is easier to debug. Then, with the change of a single function parameter, you can scale your processing up to a 1000+ node SGE cluster.


In [34]:
!make clean

In [34]: