Dissecting Nipype Workflows

Nipype team | contact: satra@mit.edu | nipy.org/nipype
(Hit Esc to get an overview)
Latest version | Latest slideshow

Contributors

http://nipy.org/nipype/about.html#code-contributors

Funding

  • 1R03EB008673-01 from NIBIB, Satrajit Ghosh, Susan Whitfield-Gabrieli
  • 5R01MH081909-02 from NIMH, Mark D'Esposito
  • INCF

Conflict of interest

Satrajit Ghosh: TankThink Labs, LLC

What is Nipype?


Figure designed and created by: Arno Klein (www.mindboggle.info)

Make life a little easier

Poline et al. (2012)

Solution requirements

Coming at it from a developer's perspective, we needed something

  • lightweight
  • scriptable
  • provided formal, common semantics
  • allowed interactive exploration
  • supported efficient batch processing
  • enabled rapid algorithm prototyping
  • was flexible and adaptive
  • part of an ecosystem

Python ecosystem

Existing technologies

shell scripting:

Can be quick to do, and powerful, but only provides application specific scalability, and not easy to port across different architectures.

make/CMake:

Similar in concept to workflow execution in Nipype, but again limited by the need for command line tools and flexibility in terms of scaling across hardware architectures (although see makeflow.

Octave/MATLAB:

Integration with other tools is ad hoc (i.e., system call) and dataflow is managed at a programmatic level. However, see PSOM which offers a nice alternative to some aspects of Nipype for Octave/Matlab users.

Graphical options: (e.g., LONI Pipeline, VisTrails)

Are easy to use but reduces flexibility relative to scripting options.

Nipype architecture

  • Interface: Wraps a program or function

  • Node/MapNode: Wraps an Interface for use in a Workflow that provides caching and other goodies (e.g., pseudo-sandbox)

  • Workflow: A graph or forest of graphs whose nodes are of type Node, MapNode or Workflow and whose edges represent data flow

  • Plugin: A component that describes how a Workflow should be executed

Software interfaces

Currently supported (5-2-2013). Click here for latest

Most used/contributed policy!

Not all components of these packages are available.

Workflows

  • Properties:

    • processing pipeline is a directed acyclic graph (DAG)
    • nodes are processes
    • edges represent data flow
    • compact represenation for any process
    • code and data separation

Execution Plugins

Allows seamless execution across many architectures

  • Local

    • Serial
    • Multicore
  • Clusters

    • HTCondor
    • PBS/Torque/SGE/LSF (native and via IPython)
    • SSH (via IPython)
    • Soma Workflow

Learn Nipype concepts in 10 easy steps

  1. Installing and testing the installation
  2. Working with interfaces
  3. Using Nipype caching
  4. Creating Nodes, MapNodes and Workflows
  5. Getting and saving data
  6. Using Iterables
  7. Function nodes
  8. Distributed computation
  9. Connecting to databases
  10. Execution configuration options

Step 1. Installing Nipype

Scientific Python:

Installing Nipype:

  • Available from @NeuroDebian, @PyPI, and @GitHub

    - pip install nipype
    - easy_install nipype
    - sudo apt-get install python-nipype
  • Dependencies: networkx, nibabel, numpy, scipy, traits

Running Nipype (Quickstart):

  • Ensure underlying tools are installed and accessible
  • Nipype is a wrapper, not a substitute for AFNI, ANTS, FreeSurfer, FSL, SPM, NiPy, etc.,.

Step 1. Testing nipype

$ ipython notebook

In [ ]:
import nipype

# Comment the following section to increase verbosity of output
nipype.config.set('logging', 'workflow_level', 'CRITICAL')
nipype.config.set('logging', 'interface_level', 'CRITICAL')
nipype.logging.update_logging(nipype.config)

nipype.test(verbose=0) # Increase verbosity parameter for more info

If all goes well you will see an OK:

----------------------------------------------------------------------
Ran 2497 tests in 68.486s

OK (SKIP=13)

The number of tests and time will vary depending on which interfaces you have installed on your system.


In [ ]:
nipype.get_info()

Environment and data setup

Setting up your Ipython notebook environment and download some data to play with


In [ ]:
%pylab inline

In [ ]:
# Some preliminaries
import os
cwd = os.getcwd()
tutorial_dir = '/software/temp/nipype-tutorial/ohbm/'
if not os.path.exists(tutorial_dir):
    os.mkdir(tutorial_dir)
os.chdir(tutorial_dir)

In [ ]:
import urllib
required_files = ['ds107/sub001/BOLD/task001_run001/bold.nii.gz',
                  'ds107/sub001/BOLD/task001_run002/bold.nii.gz',
                  'ds107/sub001/anatomy/highres001.nii.gz',
                  'ds107/sub044/BOLD/task001_run001/bold.nii.gz',
                  'ds107/sub044/BOLD/task001_run002/bold.nii.gz',
                  'ds107/sub044/anatomy/highres001.nii.gz'
                  ]
base_url = 'http://openfmri.aws.amazon.com.s3.amazonaws.com/'
for filepath in required_files:
    file_location = os.path.join(tutorial_dir, filepath)
    if not os.path.exists(file_location):
        print('Retrieving: ' + file_location)
        os.makedirs(os.path.dirname(file_location))
        urllib.urlretrieve(base_url + filepath, file_location)

Step 2. Working with interfaces


In [ ]:
import nipype.algorithms

In [ ]:
from nipype.interfaces.fsl import DTIFit
from nipype.interfaces.spm import Realign

Finding interface inputs and outputs and examples


In [ ]:
DTIFit.help()

In [ ]:
Realign.help()

Creating a directory for running interfaces


In [ ]:
import os
from shutil import copyfile
library_dir = os.path.join(tutorial_dir, 'as_a_library')
if not os.path.exists(library_dir):
    os.mkdir(library_dir)
os.chdir(library_dir)

Executing interfaces


In [ ]:
from nipype.interfaces.freesurfer import MRIConvert
convert = MRIConvert(in_file='../ds107/sub001/BOLD/task001_run001/bold.nii.gz',
                     out_file='ds107.nii')
print(convert.cmdline)
results = convert.run(terminal_output='none')  # allatonce, stream (default), file

In [ ]:
results.outputs

Other ways


In [ ]:
convert = MRIConvert()
convert.inputs.in_file='../ds107/sub001/BOLD/task001_run001/bold.nii.gz'
convert.inputs.out_file='ds107.nii'
convert.run()

In [ ]:
convert = MRIConvert()
convert.run(in_file='../ds107/sub001/BOLD/task001_run001/bold.nii.gz',
            out_file='ds107.nii')

In [ ]:
convert.inputs

Look at only the defined inputs


In [ ]:
results.inputs

Experiment with other interfaces

For example, run realignment with SPM


In [ ]:
from nipype.interfaces.spm import Realign
results1 = Realign(in_files='ds107.nii',
                   register_to_mean=False).run()

And now use FSL


In [ ]:
from nipype.interfaces.fsl import MCFLIRT
results2 = MCFLIRT(in_file='ds107.nii', ref_vol=0,
                   save_plots=True).run()

Now we can look at some results


In [ ]:
print results1.runtime.duration, results2.runtime.duration
subplot(211);plot(genfromtxt('ds107_mcf.nii.gz.par')[:, 3:]);title('FSL')
subplot(212);plot(genfromtxt('rp_ds107.txt')[:,:3]);title('SPM')

if i execute the MCFLIRT line again, well, it runs again!

Step 3. Nipype caching


In [ ]:
from nipype.caching import Memory
mem = Memory('.')

Create cacheable objects


In [ ]:
spm_realign = mem.cache(Realign)
fsl_realign = mem.cache(MCFLIRT)

Execute interfaces


In [ ]:
spm_results = spm_realign(in_files='ds107.nii', register_to_mean=False)
fsl_results = fsl_realign(in_file='ds107.nii', ref_vol=0, save_plots=True)

In [ ]:
subplot(211);plot(genfromtxt(fsl_results.outputs.par_file)[:, 3:])
subplot(212);plot(genfromtxt(spm_results.outputs.realignment_parameters)[:,:3])

In [ ]:
spm_results = spm_realign(in_files='ds107.nii', register_to_mean=False)
fsl_results = fsl_realign(in_file='ds107.nii', ref_vol=0, save_plots=True)

More caching


In [ ]:
from os.path import abspath as opap
files = [opap('../ds107/sub001/BOLD/task001_run001/bold.nii.gz'),
         opap('../ds107/sub001/BOLD/task001_run002/bold.nii.gz')]
converter = mem.cache(MRIConvert)
newfiles = []
for idx, fname in enumerate(files):
    newfiles.append(converter(in_file=fname,
                              out_type='nii').outputs.out_file)

In [ ]:
os.chdir(tutorial_dir)

Step 4: Nodes, Mapnodes and workflows

Where:


In [ ]:
from nipype.pipeline.engine import Node, MapNode, Workflow

Node:


In [ ]:
realign_spm = Node(Realign(), name='motion_correct')

Mapnode:


In [ ]:
convert2nii = MapNode(MRIConvert(out_type='nii'),
                      iterfield=['in_file'],
                      name='convert2nii')

"Hello World" of Nipype workflows

Connect them up:


In [ ]:
realignflow = Workflow(name='realign_with_spm')
realignflow.connect(convert2nii, 'out_file',
                    realign_spm, 'in_files')

In [ ]:
convert2nii.inputs.in_file = files
realign_spm.inputs.register_to_mean = False

realignflow.base_dir = opap('.')
realignflow.run()

Visualize the workflow


In [ ]:
realignflow.write_graph()

In [ ]:
from IPython.core.display import Image
Image('realign_with_spm/graph.dot.png')

In [ ]:
realignflow.write_graph(graph2use='orig')
Image('realign_with_spm/graph_detailed.dot.png')

Step 5. Getting and saving data

Instead of assigning data ourselves, let's glob it


In [ ]:
os.chdir(tutorial_dir)

In [ ]:
from nipype.interfaces.io import DataGrabber, DataFinder
ds = Node(DataGrabber(infields=['subject_id'], outfields=['func']),
          name='datasource')
ds.inputs.base_directory = opap('ds107')
ds.inputs.template = '%s/BOLD/task001*/bold.nii.gz'
ds.inputs.sort_filelist = True

ds.inputs.subject_id = 'sub001'
print ds.run().outputs

In [ ]:
ds.inputs.subject_id = 'sub044'
print ds.run().outputs

Multiple files

A little more practical usage


In [ ]:
ds = Node(DataGrabber(infields=['subject_id', 'task_id'],
                              outfields=['func', 'anat']),
          name='datasource')
ds.inputs.base_directory = opap('ds107')
ds.inputs.template = '*'
ds.inputs.template_args = {'func': [['subject_id', 'task_id']],
                           'anat': [['subject_id']]}
ds.inputs.field_template = {'func': '%s/BOLD/task%03d*/bold.nii.gz',
                            'anat': '%s/anatomy/highres001.nii.gz'}
ds.inputs.sort_filelist = True
ds.inputs.subject_id = 'sub001'
ds.inputs.task_id = 1
print ds.run().outputs

Connecting to computation


In [ ]:
convert2nii = MapNode(MRIConvert(out_type='nii'),
                      iterfield=['in_file'],
                      name='convert2nii')

realign_spm = Node(Realign(), name='motion_correct')
realign_spm.inputs.register_to_mean = False

connectedworkflow = Workflow(name='connectedtogether')
connectedworkflow.base_dir = opap('working_dir')
connectedworkflow.connect(ds, 'func', convert2nii, 'in_file')
connectedworkflow.connect(convert2nii, 'out_file', realign_spm, 'in_files')

Data sinking

Take output computed in a workflow out of it.


In [ ]:
from nipype.interfaces.io import DataSink
sinker = Node(DataSink(), name='sinker')
sinker.inputs.base_directory = opap('output')
connectedworkflow.connect(realign_spm, 'realigned_files',
                          sinker, 'realigned')
connectedworkflow.connect(realign_spm, 'realignment_parameters',
                          sinker, 'realigned.@parameters')
connectedworkflow.run()

How to determine output location

'base_directory/container/parameterization/destloc/filename'

destloc = [@]string[[.[@]]string[[.[@]]string]...] and
destloc = realigned.@parameters --> 'realigned'
destloc = realigned.parameters.@1 --> 'realigned/parameters'
destloc = realigned.parameters.@2 --> 'realigned/parameters'
filename comes from the input to the connect statement.

Step 6: iterables - parametric execution

Workflow + iterables: runs subgraph several times, attribute not input


In [ ]:
ds.iterables = ('subject_id', ['sub001', 'sub044'])
connectedworkflow.run()

Putting it all together

iterables + MapNode + Node + Workflow + DataGrabber + DataSink


In [ ]:
connectedworkflow.write_graph()
Image('working_dir/connectedtogether/graph.dot.png')

Step 7: The Function interface

The do anything you want card


In [ ]:
from nipype.interfaces.utility import Function

def myfunc(input1, input2):
    """Add and subtract two inputs
    """
    return input1 + input2, input1 - input2

calcfunc = Node(Function(input_names=['input1', 'input2'],
                         output_names = ['sum', 'difference'],
                         function=myfunc),
                name='mycalc')
calcfunc.inputs.input1 = 1
calcfunc.inputs.input2 = 2
res = calcfunc.run()
print res.outputs

Step 8: Distributed computing

Normally calling run executes the workflow in series


In [ ]:
connectedworkflow.run()

but you can scale very easily

For example, to use multiple cores on your local machine


In [ ]:
connectedworkflow.run('MultiProc', plugin_args={'n_procs': 4})

Or to other job managers

  • connectedworkflow.run('PBS', plugin_args={'qsub_args': '-q many'})
  • connectedworkflow.run('SGE', plugin_args={'qsub_args': '-q many'})
  • connectedworkflow.run('LSF', plugin_args={'qsub_args': '-q many'})
  • connectedworkflow.run('Condor')
  • connectedworkflow.run('IPython')

or submit graphs as a whole

  • connectedworkflow.run('PBSGraph', plugin_args={'qsub_args': '-q many'})
  • connectedworkflow.run('SGEGraph', plugin_args={'qsub_args': '-q many'})
  • connectedworkflow.run('CondorDAGMan')

Current Requirement: SHARED FILESYSTEM

You can also set node specific plugin arguments

  • node.plugin_args = {'qsub_args': '-l nodes=1:ppn=3', 'overwrite': True}

Step 9: Connecting to Databases


In [ ]:
from os.path import abspath as opap

from nipype.interfaces.io import XNATSource
from nipype.pipeline.engine import Node, Workflow
from nipype.interfaces.fsl import BET

subject_id = 'xnat_S00001'

dg = Node(XNATSource(infields=['subject_id'],
                     outfields=['struct'],
                     config='/Users/satra/xnat_configs/nitrc_ir_config'),
          name='xnatsource')
dg.inputs.query_template = ('/projects/fcon_1000/subjects/%s/experiments/xnat_E00001'
                            '/scans/%s/resources/NIfTI/files')
dg.inputs.query_template_args['struct'] = [['subject_id', 'anat_mprage_anonymized']]
dg.inputs.subject_id = subject_id

bet = Node(BET(), name='skull_stripper')

wf = Workflow(name='testxnat')
wf.base_dir = opap('xnattest')
wf.connect(dg, 'struct', bet, 'in_file')

In [ ]:
from nipype.interfaces.io import XNATSink

ds = Node(XNATSink(config='/Users/satra/xnat_configs/central_config'),
          name='xnatsink')
ds.inputs.project_id = 'NPTEST'
ds.inputs.subject_id = 'NPTEST_xnat_S00001'
ds.inputs.experiment_id = 'test_xnat'
ds.inputs.reconstruction_id = 'bet'
ds.inputs.share = True
wf.connect(bet, 'out_file', ds, 'brain')

In [ ]:
wf.run()

Step 10: Configuration options

Configurable options control workflow and node execution options

At the global level:


In [ ]:
from nipype import config, logging

config.enable_debug_mode()
logging.update_logging(config)

config.set('execution', 'stop_on_first_crash', 'true')

At the workflow level:


In [ ]:
wf.config['execution']['hash_method'] = 'content'

Configurations can also be set at the node level.


In [ ]:
bet.config = {'execution': {'keep_unnecessary_outputs': 'true'}}

In [ ]:
wf.run()

Reusable workflows


In [ ]:
config.set_default_config()
logging.update_logging(config)

In [ ]:
from nipype.workflows.fmri.fsl.preprocess import create_susan_smooth

smooth = create_susan_smooth()
smooth.inputs.inputnode.in_files = opap('output/realigned/_subject_id_sub044/rbold_out.nii')
smooth.inputs.inputnode.fwhm = 5
smooth.inputs.inputnode.mask_file = 'mask.nii'

smooth.run() # Will error because mask.nii does not exist

In [ ]:
from nipype.interfaces.fsl import BET, MeanImage, ImageMaths
from nipype.pipeline.engine import Node


remove_nan = Node(ImageMaths(op_string= '-nan'), name='nanremove')
remove_nan.inputs.in_file = opap('output/realigned/_subject_id_sub044/rbold_out.nii')

mi = Node(MeanImage(), name='mean')

mask = Node(BET(mask=True), name='mask')

wf = Workflow('reuse')
wf.base_dir = opap('.')
wf.connect(remove_nan, 'out_file', mi, 'in_file')
wf.connect(mi, 'out_file', mask, 'in_file')
wf.connect(mask, 'out_file', smooth, 'inputnode.mask_file')
wf.connect(remove_nan, 'out_file', smooth, 'inputnode.in_files')

wf.run()

Setting internal parameters of workflows


In [ ]:
print(smooth.list_node_names())

median = smooth.get_node('median')
median.inputs.op_string = '-k %s -p 60'

In [ ]:
wf.run()

Summary

  • This tutorial covers the concepts of Nipype

    1. Installing and testing the installation
    2. Working with interfaces
    3. Using Nipype caching
    4. Creating Nodes, MapNodes and Workflows
    5. Getting and saving data
    6. Using Iterables
    7. Function nodes
    8. Distributed computation
    9. Connecting to databases
    10. Execution configuration options
  • It will allow you to reuse and debug the various workflows available in Nipype, BIPS and CPAC

  • Please contribute new interfaces and workflows!

In [ ]:
import os
basedir = '/Users/satra/Dropbox/WORK/notebooks/'
if os.path.exists(basedir):
    os.chdir(basedir)