JoinNode have the opposite effect of iterables. Where iterables
split up the execution workflow into many different branches, a JoinNode merges them back into on node. For a more detailed explanation, check out JoinNode, synchronize and itersource from the main homepage.
from nipype import Node, JoinNode, Workflow
# Specify fake input node A
a = Node(interface=A(), name="a")
# Iterate over fake node B's input 'in_file?
b = Node(interface=B(), name="b")
b.iterables = ('in_file', [file1, file2])
# Pass results on to fake node C
c = Node(interface=C(), name="c")
# Join forked execution workflow in fake node D
d = JoinNode(interface=D(),
joinsource="b",
joinfield="in_files",
name="d")
# Put everything into a workflow as usual
workflow = Workflow(name="workflow")
workflow.connect([(a, b, [('subject', 'subject')]),
(b, c, [('out_file', 'in_file')])
(c, d, [('out_file', 'in_files')])
])
As you can see, setting up a JoinNode
is rather simple. The only difference to a normal Node
are the joinsource
and the joinfield
. joinsource
specifies from which node the information to join is coming and the joinfield
specifies the input field of the JoinNode where the information to join will be entering the node.
Let's consider another example where we have one node that iterates over 3 different numbers and generates randome numbers. Another node joins those three different numbers (each coming from a separate branch of the workflow) into one list. To make the whole thing a bit more realistic, the second node will use the Function
interface to do something with those numbers, before we spit them out again.
In [ ]:
from nipype import JoinNode, Node, Workflow
from nipype.interfaces.utility import Function, IdentityInterface
In [ ]:
def get_data_from_id(id):
"""Generate a random number based on id"""
import numpy as np
return id + np.random.rand()
def merge_and_scale_data(data2):
"""Scale the input list by 1000"""
import numpy as np
return (np.array(data2) * 1000).tolist()
node1 = Node(Function(input_names=['id'],
output_names=['data1'],
function=get_data_from_id),
name='get_data')
node1.iterables = ('id', [1, 2, 3])
node2 = JoinNode(Function(input_names=['data2'],
output_names=['data_scaled'],
function=merge_and_scale_data),
name='scale_data',
joinsource=node1,
joinfield=['data2'])
In [ ]:
wf = Workflow(name='testjoin')
wf.connect(node1, 'data1', node2, 'data2')
eg = wf.run()
In [ ]:
wf.write_graph(graph2use='exec')
from IPython.display import Image
Image(filename='graph_detailed.dot.png')
Out[ ]:
Now, let's look at the input and output of the joinnode:
In [ ]:
res = [node for node in eg.nodes() if 'scale_data' in node.name][0].result
res.outputs
Out[ ]:
In [ ]:
res.inputs
Out[ ]:
We extend the workflow by using three nodes. Note that even this workflow, the joinsource corresponds to the node containing iterables and the joinfield corresponds to the input port of the JoinNode that aggregates the iterable branches. As before the graph below shows how the execution process is setup.
In [ ]:
def get_data_from_id(id):
import numpy as np
return id + np.random.rand()
def scale_data(data2):
import numpy as np
return data2
def replicate(data3, nreps=2):
return data3 * nreps
node1 = Node(Function(input_names=['id'],
output_names=['data1'],
function=get_data_from_id),
name='get_data')
node1.iterables = ('id', [1, 2, 3])
node2 = Node(Function(input_names=['data2'],
output_names=['data_scaled'],
function=scale_data),
name='scale_data')
node3 = JoinNode(Function(input_names=['data3'],
output_names=['data_repeated'],
function=replicate),
name='replicate_data',
joinsource=node1,
joinfield=['data3'])
In [ ]:
wf = Workflow(name='testjoin')
wf.connect(node1, 'data1', node2, 'data2')
wf.connect(node2, 'data_scaled', node3, 'data3')
eg = wf.run()
In [ ]:
wf.write_graph(graph2use='exec')
Image(filename='graph_detailed.dot.png')
Out[ ]: