Creating flexible neuroimaging pipelines with Nipype

Luke Chang

University of Colorado Boulder

Overview

  1. Why Python?
  2. The beauty of iPython Notebooks
  3. What is Nipype?
  4. Example preprocessing pipeline

Python's Cannibalization of Scientific Computing

iPython Notebooks

  • Interactive browser based python engine
  • Great for lab notebooks
  • Great for creating tutorials
  • Can create slideshows (like this one)
  • Can be easily shared (i.e., dropbox/github link rendered in nbviewer)
  • Can run on blanca!

How to Set up iPython Notebook on Blanca

  1. 1) log on to Blanca and make sure you are using Luke's Python Anaconda distribution

    ```/projects/luch0518/software/anaconda```

  2. Start ipython notebook server on blanca. Disable browser and specify port

    ``` ipython notebook --no-browser --port=8889```

  3. On your local computer, login to blanca using portforwarding

    ``` ssh -N -f -L localhost:8889:localhost:8889 luch0518@blogin01.rc.colorado.edu ```

  4. Open your browser to localhost:8889/tree
  5. The iPython notebook server should shutdown when you close the SSH connection, but just in case you can also kill the process. Here is the command to find the process id:

    ```ps aux | grep localhost:8889```

  6. Nipype can submit jobs automatically to Blanca. You can check your queue or delete any process

    Check queue

    ```qstat -u $USER```

    Delete Queue

    ```qselect -u $USER | xargs qdel```

Problems With Neuroimaging Analyses

  • Lot's of software packages, but all have different assumptions, philosophies, interfaces, and file formats. What if we wanted to mix and match?
  • How do we train people?
  • How do we process big data? (not feasible to use a gui for 1000 subjects)
  • There are many pipeline packages, but have you ever tried using them? (e.g., BIRN Tools, HCP, LONI, MRN auto-analysis pipeline)
  • How do we facilitate reproducible research?
  • How do we create different lab pipelines?
  • How can we have a pipeline that runs on different platforms and computing architectures?

What is nipype?

What is nipype?

  • easily interact with tools from different software packages
  • combine processing steps from different software packages
  • develop new workflows faster by reusing common steps from old ones
  • process data faster by running it in parallel on many cores/machines
  • make your research easily reproducible
  • share your processing workflows with the community

Nipype Architecture

  • Interface
  • Engine
  • Executable Plugins
  • </ul>

    Nipype: Interface

    • Interface: Wraps a program or function

    • </ul>

      Nipype: Engine

      • Node/MapNode: Wraps an Interface for use in a Workflow

      • Workflow: A graph whose nodes are of type Node, MapNode, or Workflow and whose edge represent data flow

      • </ul> <img src = "Images/Nipype_Workflow.png", height = "600" width="600">

        Nipype: Plugin

        • Interface: A component that describes how a Workflow should be executed

        • </ul>

          Installing environment

          • Scientific Python

            • Debian/Ubuntu/Scientific Fedora
            • Enthought Python Distribution
            • Anaconda
            • There are modules on Blanca, though old versions of python.
            • We have our own Anaconda build installed on Blanca.
          • Installing Nipype

            • PyPi
            • Github
            • Neurodebian
          • Running Nipype

            • Ensure tools are installed and accessible on your path
            • Remember Nipype is simpyl a wrapper for other software packages (e.g., spm, fsl, afni, etc)

          Nipype Preprocessing Pipeline Example

          Preprocessing Steps

          1. Slice timing
          2. Realignment to mean image
          3. Coregister anatomical to functional
          4. Segment anatomical with bias correction
          5. Normalize anatomical to MNI space
          6. Apply normalization to functionals
          7. Smooth - 8mm
          8. Artifact Detection - 3 sd of global mean (3.5 sd of root mean squared sucessive differences, mahalonobis distance across slices? ask tor for more details, but ignore fo now)
          9. Create nuisance covariate matrix for glm (note the file has headers and NaNs for missing data in the derivative columns)

          Figures Generated

          • Realignment Parameters (plot_realign/Realignment_parameters.pdf)
          • MNI space single subject overlayed on mean warped functional (plot_normalization_check/Normalized_Functional_Check.pdf)
          • Spikes identified using global signal (art/plot.rafunc.png)

          Setup

          • Import Modules and set up paths and defaults
          
          
          In [ ]:
          # Define Python Modules to import
          from nipype.interfaces import spm
          import nipype.interfaces.io as nio           # Data i/o
          import nipype.interfaces.utility as util     # utility
          from nipype.pipeline.engine import Node, Workflow
          from nipype.interfaces.base import BaseInterface, TraitedSpec, File, traits
          import nipype.algorithms.rapidart as ra      # artifact detection
          from nipype.interfaces.nipy.preprocess import ComputeMask
          import nipype.interfaces.matlab as mlab
          import os
          import nibabel as nib
          from IPython.display import Image
          import glob
          
          # Specify various inputs files for pipeline
          # spm_path = '/projects/ics/software/spm/spm8_r5236/'
          spm_path = '/Users/lukechang/Documents/Matlab/spm8/'
          canonical_file = spm_path + 'canonical/single_subj_T1.nii'
          template_file = spm_path + 'templates/T1.nii'
          
          # Set the way matlab should be called
          # mlab.MatlabCommand.set_default_matlab_cmd("matlab -nodesktop -nosplash -nojvm -noFigureWindows")
          mlab.MatlabCommand.set_default_matlab_cmd("matlab -nodesktop -nosplash")
          mlab.MatlabCommand.set_default_paths(spm_path)
          

          Define Nodes

          • Nodes are processes

          • They can refer to specific functions of an interface (e.g., coregister)

          • They can be a custom function

          • Iterables allow you to iterate over a vector of parameters (e.g., subjects, or smoothing parameters)

          
          
          In [5]:
          #Setup Data Source for Input Data
          ds = Node(nio.DataGrabber(infields=['subject_id', 'task_id'], outfields=['func', 'struc']),name='datasource')
          ds.inputs.base_directory = os.path.abspath(data_dir + '/' + subject_id)
          ds.inputs.template = '*'
          ds.inputs.sort_filelist = True
          ds.inputs.template_args = {'func': [['task_id']], 'struc':[]}
          ds.inputs.field_template = {'func': 'Functional/Raw/%s/func.nii','struc': 'Structural/SPGR/spgr.nii'}
          ds.inputs.subject_id = subject_id
          ds.inputs.task_id = task_list
          ds.iterables = ('task_id',task_list)
          
          #Get Timing Acquisition for slice timing
          tr = 2
          ta = Node(interface=util.Function(input_names=['tr', 'n_slices'], output_names=['ta'],  function = get_ta), name="ta")
          ta.inputs.tr=tr
          
          #Slice Timing: sequential ascending 
          slice_timing = Node(interface=spm.SliceTiming(), name="slice_timing")
          slice_timing.inputs.time_repetition = tr
          slice_timing.inputs.ref_slice = 1
          
          #Realignment - 6 parameters - realign to first image of very first series.
          realign = Node(interface=spm.Realign(), name="realign")
          realign.inputs.register_to_mean = True
          
          #Plot Realignment
          plot_realign = Node(interface=PlotRealignmentParameters(), name="plot_realign")
          
          #Artifact Detection
          art = Node(interface=ra.ArtifactDetect(), name="art")
          art.inputs.use_differences      = [True,False]
          art.inputs.use_norm             = True
          art.inputs.norm_threshold       = 1
          art.inputs.zintensity_threshold = 3
          art.inputs.mask_type            = 'file'
          art.inputs.parameter_source     = 'SPM'
              
          #Coregister - 12 parameters, cost function = 'nmi', fwhm 7, interpolate, don't mask
          #anatomical to functional mean across all available data.
          coregister = Node(interface=spm.Coregister(), name="coregister")
          coregister.inputs.jobtype = 'estimate'
          
          # Segment structural, gray/white/csf,mni, 
          segment = Node(interface=spm.Segment(), name="segment")
          segment.inputs.save_bias_corrected = True
              
          #Normalize - structural to MNI - then apply this to the coregistered functionals
          normalize = Node(interface=spm.Normalize(), name = "normalize")
          normalize.inputs.template = os.path.abspath(template_file)
          
          #Plot normalization Check
          plot_normalization_check = Node(interface=Plot_Coregistration_Montage(), name="plot_normalization_check")
          plot_normalization_check.inputs.canonical_img = canonical_file
              
          #Create Mask
          compute_mask = Node(interface=ComputeMask(), name="compute_mask")
          #remove lower 5% of histogram of mean image
          compute_mask.inputs.m = .05
                  
          #Smooth
          #implicit masking (.im) = 0, dtype = 0
          smooth = Node(interface=spm.Smooth(), name = "smooth")
          fwhmlist = [8]
          smooth.iterables = ('fwhm',fwhmlist)
          
          #Create Covariate matrix
          make_covariates = Node(interface=Create_Covariates(), name="make_covariates")
          
          
          
          
          ---------------------------------------------------------------------------
          NameError                                 Traceback (most recent call last)
          <ipython-input-5-a033aa3bfa93> in <module>()
                1 #Setup Data Source for Input Data
          ----> 2 ds = Node(nio.DataGrabber(infields=['subject_id', 'task_id'], outfields=['func', 'struc']),name='datasource')
                3 ds.inputs.base_directory = os.path.abspath(data_dir + '/' + subject_id)
                4 ds.inputs.template = '*'
                5 ds.inputs.sort_filelist = True
          
          NameError: name 'Node' is not defined

          Create a Workflow

          • A workflow is a processing pipeline

          • It is a directed acyclic graph that represents data flow

          • Nodes are processes

          • Edges are direction of data flow

          • Must define input and and output of processing node

          
          
          In [ ]:
          Preprocessed = Workflow(name="Preprocessed")
          Preprocessed.base_dir = os.path.abspath(data_dir + '/' + subject_id + '/Functional')
          
          Preprocessed.connect([  
                              (ds, ta, [(('func', get_n_slices), "n_slices")]),
                              (ta, slice_timing, [("ta", "time_acquisition")]),
                              (ds, slice_timing, [('func', 'in_files'),
                                                  (('func', get_n_slices), "num_slices"),
                                                  (('func', get_slice_order), "slice_order"),
                                                     ]),
                              (slice_timing, realign, [('timecorrected_files', 'in_files')]),
                              (realign, compute_mask, [('mean_image','mean_volume')]),
                              (realign,coregister, [('mean_image', 'target')]),
                              (ds,coregister, [('struc', 'source')]),
                              (coregister,segment, [('coregistered_source', 'data')]),
                              (segment, normalize, [('transformation_mat','parameter_file'),
                                                  ('bias_corrected_image', 'source'),]),
                              (realign,normalize, [('realigned_files', 'apply_to_files'),
                                                  (('realigned_files', get_vox_dims), 'write_voxel_sizes')]),
                              (normalize, smooth, [('normalized_files', 'in_files')]),
                              (compute_mask,art,[('brain_mask','mask_file')]),
                              (realign,art,[('realignment_parameters','realignment_parameters')]),
                              (realign,art,[('realigned_files','realigned_files')]),
                              (realign,plot_realign, [('realignment_parameters', 'realignment_parameters')]),
                              (normalize, plot_normalization_check, [('normalized_files', 'wra_img')]),
                              (realign, make_covariates, [('realignment_parameters', 'realignment_parameters')]),
                              (art, make_covariates, [('outlier_files', 'spike_id')]),
                                ])
          

          Visualize the graphical pipeline

          • Each processing step in the workflow is a node in the graph
          • Because it is a DAG, you can easily run different pipelines on the same data without interfering with other pipelines.
          • All of the files you will need for subsequent analyses will be in each of these folders.
          • If you make changes to a node, nipype will only rerun the thing you changed and all dependent nodes.
          • You can easily iterate over a vector of parameters using iterables (e.g., different smoothing parameters)

          Here is a directed acyclic graph of the preprocessing pipeline.

          
          
          In [6]:
          data_dir = '/Users/lukechang/Dropbox/PTSD/Data/Imaging/'
          sub = 'subj46153C'
          Preprocessed = create_preproc_func_pipeline(data_dir = data_dir, subject_id=sub)
          Preprocessed.write_graph(data_dir + sub + "/Preprocessed_Workflow.dot")
          Image(filename=data_dir + sub + '/Preprocessed_Workflow.dot.png')
          
          
          
          
          ---------------------------------------------------------------------------
          NameError                                 Traceback (most recent call last)
          <ipython-input-6-295c75f0b91b> in <module>()
                1 data_dir = '/Users/lukechang/Dropbox/PTSD/Data/Imaging/'
                2 sub = 'subj46153C'
          ----> 3 Preprocessed = create_preproc_func_pipeline(data_dir = data_dir, subject_id=sub)
                4 Preprocessed.write_graph(data_dir + sub + "/Preprocessed_Workflow.dot")
                5 Image(filename=data_dir + sub + '/Preprocessed_Workflow.dot.png')
          
          NameError: name 'create_preproc_func_pipeline' is not defined

          Execution Plugins

          Allows seamless execution across many architectures

          • Local

            • serially
            • multicore
          • Clusters

            • PBS/Torque
            • SGE
            • SLURM
            • SSH (via IPython)
          • Caveats

            • Submitting jobs using PBS is really slow because nipype is waiting to submit jobs while they are in "c" completed state.
            • Haven't tested it on SLURM yet.
            • Works looping over subjects on interactive node.
            • Should be able to submit each subject to a node and run multiprocessing on each job submission.
          
          
          In [ ]:
          # Create Pipeline for subject
          data_dir='/Users/lukechang/Dropbox/PTSD/Data/Imaging'
          
          # subject_id = 'subj46153C'
          task_list=['s1_r1Cond','s1_r1Ext']
          Preprocessed = create_preproc_func_pipeline(data_dir=data_dir, subject_id = subject_id, task_list=task_list)
          
          # Get List of subjects
          # sublist = sorted([x.split('/')[-1] for x in glob.glob(data_dir + '/subj*')])
          
          # Loop over subjects
          for sub in reversed(sublist):
              #Glob Subject runs as they vary
              runlist = [x.split('/')[-1] for x in glob.glob(data_dir + '/' + sub + '/Functional/Raw/*')]
              Preprocessed = create_preproc_func_pipeline(data_dir=data_dir, subject_id = sub, task_list=runlist)
              print  data_dir + '/' + sub
             
              # Write out pipeline as a DAG
              Preprocessed.write_graph(dotfilename=data_dir + '/' + sub + "/Functional/Preprocessed_Workflow.dot.svg",format='svg')
              Preprocessed.run(plugin='MultiProc', plugin_args={'n_procs' : 4})
          

          Create your own custom functions

          • You can easily create your own Nodes running custom functions

          • Use object oriented python coding

          • You need to define Input and Output Specs

          • Can use python, shell scripts, matlab, etc.

          
          
          In [ ]:
          # Example to create a custom coregistration plot using nilearn plotting tools
          
          class Plot_Coregistration_Montage_InputSpec(TraitedSpec):
              wra_img = File(exists=True, mandatory=True) 
              canonical_img = File(exists=True, mandatory=True)
              title = traits.Str("Normalized Functional Check", usedefault=True)
              
          class Plot_Coregistration_Montage_OutputSpec(TraitedSpec):
              plot = File(exists=True)
          
          class Plot_Coregistration_Montage(BaseInterface):
              # This function creates a plot of an axial montage of the average normalized functional data 
              # with the spm MNI space single subject T1 overlaid. Useful for checking normalization
              input_spec = Plot_Coregistration_Montage_InputSpec
              output_spec = Plot_Coregistration_Montage_OutputSpec
          
              def _run_interface(self, runtime):
                  import nibabel as nib
                  from nilearn import plotting, datasets, image
                  from nipype.interfaces.base import isdefined
                  import numpy as np
                  import pylab as plt
                  import os
                  
                  wra_img = nib.load(self.inputs.wra_img)
                  canonical_img = nib.load(self.inputs.canonical_img)
                  title = self.inputs.title
                  mean_wraimg = image.mean_img(wra_img)
                  
                  if title != "":
                      filename = title.replace(" ", "_")+".pdf"
                  else:
                      filename = "plot.pdf"
                  
                  fig = plotting.plot_anat(mean_wraimg, title="wrafunc & canonical single subject", cut_coords=range(-40, 40, 10), display_mode='z')
                  fig.add_edges(canonical_img)     
                  fig.savefig(filename)
                  fig.close()
                  
                  self._plot = filename
                  
                  runtime.returncode=0
                  return runtime
              
              def _list_outputs(self):
                  outputs = self._outputs().get()
                  outputs["plot"] = os.path.abspath(self._plot)
                  return outputs
          

          Coregistration Check