JoinNode

JoinNode have the opposite effect of a MapNode or iterables. Where they split up the execution workflow into many different branches, a JoinNode merges them back into on node. For a more detailed explanation, check out JoinNode, synchronize and itersource from the main homepage.

Simple example

Let's consider the very simple example depicted at the top of this page:


In [ ]:
from nipype import Node, JoinNode, Workflow

# Specify fake input node A
a = Node(interface=A(), name="a")

# Iterate over fake node B's input 'in_file?
b = Node(interface=B(), name="b")
b.iterables = ('in_file', [file1, file2])

# Pass results on to fake node C
c = Node(interface=C(), name="c")

# Join forked execution workflow in fake node D
d = JoinNode(interface=D(),
             joinsource="b",
             joinfield="in_files",
             name="d")

# Put everything into a workflow as usual
workflow = Workflow(name="workflow")
workflow.connect([(a, b, [('subject', 'subject')]),
                  (b, c, [('out_file', 'in_file')])
                  (c, d, [('out_file', 'in_files')])
                  ])

As you can see, setting up a JoinNode is rather simple. The only difference to a normal Node are the joinsource and the joinfield. joinsource specifies from which node the information to join is coming and the joinfield specifies the input field of the JoinNode where the information to join will be entering the node.

More realistic example

Let's consider another example where we have one node that iterates over 3 different numbers and another node that joins those three different numbers (each coming from a separate branch of the workflow) into one list. To make the whole thing a bit more realistic, the second node will use the Function interface to do something with those numbers, before we spit them out again.


In [ ]:
from nipype import JoinNode, Node, Workflow
from nipype.interfaces.utility import Function, IdentityInterface

In [ ]:
# Create iteration node
from nipype import IdentityInterface
iternode = Node(IdentityInterface(fields=['number_id']),
                name="iternode")
iternode.iterables = [('number_id', [1, 4, 9])]

In [ ]:
# Create join node - compute square root for each element in the joined list
def compute_sqrt(numbers):
    from math import sqrt
    return [sqrt(e) for e in numbers]

joinnode = JoinNode(Function(input_names=['numbers'],
                             output_names=['sqrts'],
                             function=compute_sqrt),
                    name='joinnode',
                    joinsource='iternode',
                    joinfield=['numbers'])

In [ ]:
# Create the workflow and run it
joinflow = Workflow(name='joinflow')
joinflow.connect(iternode, 'number_id', joinnode, 'numbers')
res = joinflow.run()


170306-22:38:22,861 workflow INFO:
	 Workflow joinflow settings: ['check', 'execution', 'logging']
170306-22:38:22,871 workflow INFO:
	 Running serially.
170306-22:38:22,873 workflow INFO:
	 Executing node joinnode in dir: /tmp/tmpm8NCMb/joinflow/joinnode

Now, let's look at the input and output of the joinnode:


In [ ]:
res.nodes()[0].result.outputs


Out[ ]:
sqrts = [1.0, 2.0, 3.0]

In [ ]:
res.nodes()[0].inputs


Out[ ]:
function_str = <undefined>
ignore_exception = <undefined>
numbers = <undefined>
numbersJ1 = 1
numbersJ2 = 4
numbersJ3 = 9