Interface: Wraps a program or function
Node/MapNode: Wraps an Interface for use in a Workflow
Workflow: A graph whose nodes are of type Node, MapNode, or Workflow and whose edge represent data flow
Interface: A component that describes how a Workflow should be executed
In [ ]:
# Define Python Modules to import
from nipype.interfaces import spm
import nipype.interfaces.io as nio # Data i/o
import nipype.interfaces.utility as util # utility
from nipype.pipeline.engine import Node, Workflow
from nipype.interfaces.base import BaseInterface, TraitedSpec, File, traits
import nipype.algorithms.rapidart as ra # artifact detection
from nipype.interfaces.nipy.preprocess import ComputeMask
import nipype.interfaces.matlab as mlab
import os
import nibabel as nib
from IPython.display import Image
import glob
# Specify various inputs files for pipeline
# spm_path = '/projects/ics/software/spm/spm8_r5236/'
spm_path = '/Users/lukechang/Documents/Matlab/spm8/'
canonical_file = spm_path + 'canonical/single_subj_T1.nii'
template_file = spm_path + 'templates/T1.nii'
# Set the way matlab should be called
# mlab.MatlabCommand.set_default_matlab_cmd("matlab -nodesktop -nosplash -nojvm -noFigureWindows")
mlab.MatlabCommand.set_default_matlab_cmd("matlab -nodesktop -nosplash")
mlab.MatlabCommand.set_default_paths(spm_path)
Nodes are processes
They can refer to specific functions of an interface (e.g., coregister)
They can be a custom function
Iterables allow you to iterate over a vector of parameters (e.g., subjects, or smoothing parameters)
In [5]:
#Setup Data Source for Input Data
ds = Node(nio.DataGrabber(infields=['subject_id', 'task_id'], outfields=['func', 'struc']),name='datasource')
ds.inputs.base_directory = os.path.abspath(data_dir + '/' + subject_id)
ds.inputs.template = '*'
ds.inputs.sort_filelist = True
ds.inputs.template_args = {'func': [['task_id']], 'struc':[]}
ds.inputs.field_template = {'func': 'Functional/Raw/%s/func.nii','struc': 'Structural/SPGR/spgr.nii'}
ds.inputs.subject_id = subject_id
ds.inputs.task_id = task_list
ds.iterables = ('task_id',task_list)
#Get Timing Acquisition for slice timing
tr = 2
ta = Node(interface=util.Function(input_names=['tr', 'n_slices'], output_names=['ta'], function = get_ta), name="ta")
ta.inputs.tr=tr
#Slice Timing: sequential ascending
slice_timing = Node(interface=spm.SliceTiming(), name="slice_timing")
slice_timing.inputs.time_repetition = tr
slice_timing.inputs.ref_slice = 1
#Realignment - 6 parameters - realign to first image of very first series.
realign = Node(interface=spm.Realign(), name="realign")
realign.inputs.register_to_mean = True
#Plot Realignment
plot_realign = Node(interface=PlotRealignmentParameters(), name="plot_realign")
#Artifact Detection
art = Node(interface=ra.ArtifactDetect(), name="art")
art.inputs.use_differences = [True,False]
art.inputs.use_norm = True
art.inputs.norm_threshold = 1
art.inputs.zintensity_threshold = 3
art.inputs.mask_type = 'file'
art.inputs.parameter_source = 'SPM'
#Coregister - 12 parameters, cost function = 'nmi', fwhm 7, interpolate, don't mask
#anatomical to functional mean across all available data.
coregister = Node(interface=spm.Coregister(), name="coregister")
coregister.inputs.jobtype = 'estimate'
# Segment structural, gray/white/csf,mni,
segment = Node(interface=spm.Segment(), name="segment")
segment.inputs.save_bias_corrected = True
#Normalize - structural to MNI - then apply this to the coregistered functionals
normalize = Node(interface=spm.Normalize(), name = "normalize")
normalize.inputs.template = os.path.abspath(template_file)
#Plot normalization Check
plot_normalization_check = Node(interface=Plot_Coregistration_Montage(), name="plot_normalization_check")
plot_normalization_check.inputs.canonical_img = canonical_file
#Create Mask
compute_mask = Node(interface=ComputeMask(), name="compute_mask")
#remove lower 5% of histogram of mean image
compute_mask.inputs.m = .05
#Smooth
#implicit masking (.im) = 0, dtype = 0
smooth = Node(interface=spm.Smooth(), name = "smooth")
fwhmlist = [8]
smooth.iterables = ('fwhm',fwhmlist)
#Create Covariate matrix
make_covariates = Node(interface=Create_Covariates(), name="make_covariates")
A workflow is a processing pipeline
It is a directed acyclic graph that represents data flow
Nodes are processes
Edges are direction of data flow
Must define input and and output of processing node
In [ ]:
Preprocessed = Workflow(name="Preprocessed")
Preprocessed.base_dir = os.path.abspath(data_dir + '/' + subject_id + '/Functional')
Preprocessed.connect([
(ds, ta, [(('func', get_n_slices), "n_slices")]),
(ta, slice_timing, [("ta", "time_acquisition")]),
(ds, slice_timing, [('func', 'in_files'),
(('func', get_n_slices), "num_slices"),
(('func', get_slice_order), "slice_order"),
]),
(slice_timing, realign, [('timecorrected_files', 'in_files')]),
(realign, compute_mask, [('mean_image','mean_volume')]),
(realign,coregister, [('mean_image', 'target')]),
(ds,coregister, [('struc', 'source')]),
(coregister,segment, [('coregistered_source', 'data')]),
(segment, normalize, [('transformation_mat','parameter_file'),
('bias_corrected_image', 'source'),]),
(realign,normalize, [('realigned_files', 'apply_to_files'),
(('realigned_files', get_vox_dims), 'write_voxel_sizes')]),
(normalize, smooth, [('normalized_files', 'in_files')]),
(compute_mask,art,[('brain_mask','mask_file')]),
(realign,art,[('realignment_parameters','realignment_parameters')]),
(realign,art,[('realigned_files','realigned_files')]),
(realign,plot_realign, [('realignment_parameters', 'realignment_parameters')]),
(normalize, plot_normalization_check, [('normalized_files', 'wra_img')]),
(realign, make_covariates, [('realignment_parameters', 'realignment_parameters')]),
(art, make_covariates, [('outlier_files', 'spike_id')]),
])
You can easily iterate over a vector of parameters using iterables (e.g., different smoothing parameters)
Here is a directed acyclic graph of the preprocessing pipeline.
In [6]:
data_dir = '/Users/lukechang/Dropbox/PTSD/Data/Imaging/'
sub = 'subj46153C'
Preprocessed = create_preproc_func_pipeline(data_dir = data_dir, subject_id=sub)
Preprocessed.write_graph(data_dir + sub + "/Preprocessed_Workflow.dot")
Image(filename=data_dir + sub + '/Preprocessed_Workflow.dot.png')
Allows seamless execution across many architectures
In [ ]:
# Create Pipeline for subject
data_dir='/Users/lukechang/Dropbox/PTSD/Data/Imaging'
# subject_id = 'subj46153C'
task_list=['s1_r1Cond','s1_r1Ext']
Preprocessed = create_preproc_func_pipeline(data_dir=data_dir, subject_id = subject_id, task_list=task_list)
# Get List of subjects
# sublist = sorted([x.split('/')[-1] for x in glob.glob(data_dir + '/subj*')])
# Loop over subjects
for sub in reversed(sublist):
#Glob Subject runs as they vary
runlist = [x.split('/')[-1] for x in glob.glob(data_dir + '/' + sub + '/Functional/Raw/*')]
Preprocessed = create_preproc_func_pipeline(data_dir=data_dir, subject_id = sub, task_list=runlist)
print data_dir + '/' + sub
# Write out pipeline as a DAG
Preprocessed.write_graph(dotfilename=data_dir + '/' + sub + "/Functional/Preprocessed_Workflow.dot.svg",format='svg')
Preprocessed.run(plugin='MultiProc', plugin_args={'n_procs' : 4})
You can easily create your own Nodes running custom functions
Use object oriented python coding
You need to define Input and Output Specs
Can use python, shell scripts, matlab, etc.
In [ ]:
# Example to create a custom coregistration plot using nilearn plotting tools
class Plot_Coregistration_Montage_InputSpec(TraitedSpec):
wra_img = File(exists=True, mandatory=True)
canonical_img = File(exists=True, mandatory=True)
title = traits.Str("Normalized Functional Check", usedefault=True)
class Plot_Coregistration_Montage_OutputSpec(TraitedSpec):
plot = File(exists=True)
class Plot_Coregistration_Montage(BaseInterface):
# This function creates a plot of an axial montage of the average normalized functional data
# with the spm MNI space single subject T1 overlaid. Useful for checking normalization
input_spec = Plot_Coregistration_Montage_InputSpec
output_spec = Plot_Coregistration_Montage_OutputSpec
def _run_interface(self, runtime):
import nibabel as nib
from nilearn import plotting, datasets, image
from nipype.interfaces.base import isdefined
import numpy as np
import pylab as plt
import os
wra_img = nib.load(self.inputs.wra_img)
canonical_img = nib.load(self.inputs.canonical_img)
title = self.inputs.title
mean_wraimg = image.mean_img(wra_img)
if title != "":
filename = title.replace(" ", "_")+".pdf"
else:
filename = "plot.pdf"
fig = plotting.plot_anat(mean_wraimg, title="wrafunc & canonical single subject", cut_coords=range(-40, 40, 10), display_mode='z')
fig.add_edges(canonical_img)
fig.savefig(filename)
fig.close()
self._plot = filename
runtime.returncode=0
return runtime
def _list_outputs(self):
outputs = self._outputs().get()
outputs["plot"] = os.path.abspath(self._plot)
return outputs