JoinNode

JoinNode have the opposite effect of iterables. Where iterables split up the execution workflow into many different branches, a JoinNode merges them back into on node. For a more detailed explanation, check out JoinNode, synchronize and itersource from the main homepage.

Simple example

Let's consider the very simple example depicted at the top of this page:

from nipype import Node, JoinNode, Workflow

# Specify fake input node A
a = Node(interface=A(), name="a")

# Iterate over fake node B's input 'in_file?
b = Node(interface=B(), name="b")
b.iterables = ('in_file', [file1, file2])

# Pass results on to fake node C
c = Node(interface=C(), name="c")

# Join forked execution workflow in fake node D
d = JoinNode(interface=D(),
             joinsource="b",
             joinfield="in_files",
             name="d")

# Put everything into a workflow as usual
workflow = Workflow(name="workflow")
workflow.connect([(a, b, [('subject', 'subject')]),
                  (b, c, [('out_file', 'in_file')])
                  (c, d, [('out_file', 'in_files')])
                  ])

As you can see, setting up a JoinNode is rather simple. The only difference to a normal Node are the joinsource and the joinfield. joinsource specifies from which node the information to join is coming and the joinfield specifies the input field of the JoinNode where the information to join will be entering the node.

More realistic example

Let's consider another example where we have one node that iterates over 3 different numbers and generates randome numbers. Another node joins those three different numbers (each coming from a separate branch of the workflow) into one list. To make the whole thing a bit more realistic, the second node will use the Function interface to do something with those numbers, before we spit them out again.


In [ ]:
from nipype import JoinNode, Node, Workflow
from nipype.interfaces.utility import Function, IdentityInterface

In [ ]:
def get_data_from_id(id):
    """Generate a random number based on id"""
    import numpy as np
    return id + np.random.rand()

def merge_and_scale_data(data2):
    """Scale the input list by 1000"""
    import numpy as np
    return (np.array(data2) * 1000).tolist()


node1 = Node(Function(input_names=['id'],
                      output_names=['data1'],
                      function=get_data_from_id),
             name='get_data')
node1.iterables = ('id', [1, 2, 3])

node2 = JoinNode(Function(input_names=['data2'],
                          output_names=['data_scaled'],
                          function=merge_and_scale_data),
                 name='scale_data',
                 joinsource=node1,
                 joinfield=['data2'])

In [ ]:
wf = Workflow(name='testjoin')
wf.connect(node1, 'data1', node2, 'data2')
eg = wf.run()


170904-05:48:06,798 workflow INFO:
	 Workflow testjoin settings: ['check', 'execution', 'logging']
170904-05:48:06,805 workflow INFO:
	 Running serially.
170904-05:48:06,806 workflow INFO:
	 Executing node get_data.aI.a2 in dir: /tmp/tmpzn87q73p/testjoin/_id_3/get_data
170904-05:48:06,814 workflow INFO:
	 Executing node get_data.aI.a1 in dir: /tmp/tmp2og1kc6r/testjoin/_id_2/get_data
170904-05:48:06,822 workflow INFO:
	 Executing node get_data.aI.a0 in dir: /tmp/tmpwg9wumyo/testjoin/_id_1/get_data
170904-05:48:06,830 workflow INFO:
	 Executing node scale_data in dir: /tmp/tmpgovda3ra/testjoin/scale_data

In [ ]:
wf.write_graph(graph2use='exec')
from IPython.display import Image
Image(filename='graph_detailed.dot.png')


170904-05:48:06,967 workflow INFO:
	 Generated workflow graph: /repos/nipype_tutorial/notebooks/graph.dot.png (graph2use=exec, simple_form=True).
Out[ ]:

Now, let's look at the input and output of the joinnode:


In [ ]:
res = [node for node in eg.nodes() if 'scale_data' in node.name][0].result
res.outputs


Out[ ]:
data_scaled = [1661.824237477673, 2337.1158606412346, 3108.1021488108117]

In [ ]:
res.inputs


Out[ ]:
{'data2': [1.661824237477673, 2.3371158606412346, 3.108102148810812],
 'function_str': 'def merge_and_scale_data(data2):\n    """Scale the input list by 1000"""\n    import numpy as np\n    return (np.array(data2) * 1000).tolist()\n',
 'ignore_exception': False}

Extending to multiple nodes

We extend the workflow by using three nodes. Note that even this workflow, the joinsource corresponds to the node containing iterables and the joinfield corresponds to the input port of the JoinNode that aggregates the iterable branches. As before the graph below shows how the execution process is setup.


In [ ]:
def get_data_from_id(id):
    import numpy as np
    return id + np.random.rand()

def scale_data(data2):
    import numpy as np
    return data2

def replicate(data3, nreps=2):
    return data3 * nreps

node1 = Node(Function(input_names=['id'],
                      output_names=['data1'],
                      function=get_data_from_id),
             name='get_data')
node1.iterables = ('id', [1, 2, 3])

node2 = Node(Function(input_names=['data2'],
                      output_names=['data_scaled'],
                      function=scale_data),
             name='scale_data')

node3 = JoinNode(Function(input_names=['data3'],
                          output_names=['data_repeated'],
                          function=replicate),
                 name='replicate_data',
                 joinsource=node1,
                 joinfield=['data3'])

In [ ]:
wf = Workflow(name='testjoin')
wf.connect(node1, 'data1', node2, 'data2')
wf.connect(node2, 'data_scaled', node3, 'data3')
eg = wf.run()


170904-05:48:07,38 workflow INFO:
	 Workflow testjoin settings: ['check', 'execution', 'logging']
170904-05:48:07,46 workflow INFO:
	 Running serially.
170904-05:48:07,47 workflow INFO:
	 Executing node get_data.aI.a2 in dir: /tmp/tmpni2ihfre/testjoin/_id_3/get_data
170904-05:48:07,55 workflow INFO:
	 Executing node scale_data.a2 in dir: /tmp/tmpt8hl2xxf/testjoin/_id_3/scale_data
170904-05:48:07,62 workflow INFO:
	 Executing node get_data.aI.a1 in dir: /tmp/tmpcr_5i1dm/testjoin/_id_2/get_data
170904-05:48:07,69 workflow INFO:
	 Executing node scale_data.a1 in dir: /tmp/tmpcypxsp4i/testjoin/_id_2/scale_data
170904-05:48:07,77 workflow INFO:
	 Executing node get_data.aI.a0 in dir: /tmp/tmpyu_5my8q/testjoin/_id_1/get_data
170904-05:48:07,85 workflow INFO:
	 Executing node scale_data.a0 in dir: /tmp/tmp7eer98b7/testjoin/_id_1/scale_data
170904-05:48:07,95 workflow INFO:
	 Executing node replicate_data in dir: /tmp/tmp_1fut4zp/testjoin/replicate_data

In [ ]:
wf.write_graph(graph2use='exec')
Image(filename='graph_detailed.dot.png')


170904-05:48:07,227 workflow INFO:
	 Generated workflow graph: /repos/nipype_tutorial/notebooks/graph.dot.png (graph2use=exec, simple_form=True).
Out[ ]: