This notebook taken from reproducible-imaging repository
In [ ]:
import os
from nipype import Workflow, Node, Function
Creating Workflow with one Node that adds two numbers
In [ ]:
def sum(a, b):
return a + b
wf = Workflow('hello')
adder = Node(Function(input_names=['a', 'b'],
output_names=['sum'],
function=sum),
name='a_plus_b')
adder.inputs.a = 1
adder.inputs.b = 3
wf.add_nodes([adder])
wf.base_dir = os.getcwd()
eg = wf.run()
eg.nodes()[0].result.outputs
Out[ ]:
Creating a second node and connecting to the hello
Workflow
In [ ]:
def concat(a, b):
return [a, b]
concater = Node(Function(input_names=['a', 'b'],
output_names=['some_list'],
function=concat),
name='concat_a_b')
wf.connect(adder, 'sum', concater, 'a')
concater.inputs.b = 3
eg = wf.run()
print(eg.nodes())
And we can check results of our Workflow, we should see a list:
In [ ]:
eg.nodes()[-1].result.outputs
Out[ ]:
We will try to add additional Node that adds one:
In [ ]:
def plus_one(a):
print("PLUS ONE, a = {}".format(a))
return a + 1
plusone = Node(Function(input_names=['a'],
output_names=['out'],
function=plus_one),
name='add_1')
wf.connect(concater, 'some_list', plusone, 'a')
eg = wf.run()
print(eg.nodes())
This time the workflow didn't execute cleanly and we got an error. We can use nipypecli
to read the crashfile (note, that if you have multiple crashfiles in the directory you'll have to provide a full name):
In [ ]:
!LC_ALL= nipypecli crash crash*
It clearly shows the problematic Node and the its input. We tried to add an integer to a list, this operation is not allowed in Python.
Let's try using MapNode
In [ ]:
from nipype import MapNode
plusone = MapNode(Function(input_names=['a'],
output_names=['out'],
function=plus_one),
iterfield=['a'],
name='add_1')
wf = Workflow('hello_mapnode')
adder = Node(Function(input_names=['a', 'b'],
output_names=['sum'],
function=sum),
name='a_plus_b')
adder.inputs.a = 1
adder.inputs.b = 3
wf.connect(adder, 'sum', concater, 'a')
concater.inputs.b = 3
wf.connect(concater, 'some_list', plusone, 'a')
wf.base_dir = os.getcwd()
eg = wf.run()
print(eg.nodes())
Now the workflow finished without problems, let's see the results from hello.add_1
:
In [ ]:
print(eg.nodes()[2].result.outputs)
And now we will run example with iterables
:
In [ ]:
adder.iterables = ('a', [1, 2])
adder.inputs.b = 2
eg = wf.run()
print(eg.nodes())
Now we have 6 nodes, we can check results for hello.add_1.a1
In [ ]:
eg.nodes()[5].result.outputs
Out[ ]:
In [ ]:
wf.write_graph(graph2use='exec')
Out[ ]:
In [ ]:
from IPython.display import Image
We can plot a general structure of the workflow:
In [ ]:
Image("hello_mapnode/graph.dot.png")
Out[ ]:
And more detailed structure with all nodes:
In [ ]:
Image("hello_mapnode/graph_detailed.dot.png")
Out[ ]:
We will introduce another iterables, for the concater Node:
In [ ]:
concater.iterables = ('b', [3, 4])
eg = wf.run()
eg.nodes()
Out[ ]:
In [ ]:
wf.write_graph(graph2use='exec')
Out[ ]:
In [ ]:
Image("hello_mapnode/graph_detailed.dot.png")
Out[ ]:
Now we will introduce JoinNode that allows us to merge results together:
In [ ]:
def merge_and_scale_data(data2):
import numpy as np
return (np.array(data2) * 1000).tolist()
from nipype import JoinNode
joiner = JoinNode(Function(input_names=['data2'],
output_names=['data_scaled'],
function=merge_and_scale_data),
name='join_scale_data',
joinsource=adder,
joinfield=['data2'])
wf.connect(plusone, 'out', joiner, 'data2')
eg = wf.run()
eg.nodes()
Out[ ]:
Let's check the output of hello.join_scale_data.a0
node:
In [ ]:
eg.nodes()[0].result.outputs
Out[ ]:
In [ ]:
wf.write_graph(graph2use='exec')
Out[ ]:
In [ ]:
Image("hello_mapnode/graph.dot.png")
Out[ ]:
In [ ]:
Image("hello_mapnode/graph_detailed.dot.png")
Out[ ]:
In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})
In [ ]:
wf.base_dir = os.path.join(os.getcwd(), 'alt')
In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})
In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})
In [ ]:
from nipype import Workflow, Node, MapNode, Function
import os
def range_fun(n_min, n_max):
return list(range(n_min, n_max+1))
def factorial(n):
# print("FACTORIAL, {}".format(n))
import math
return math.factorial(n)
def summing(terms):
return sum(terms)
wf_ex1 = Workflow('ex1')
wf_ex1.base_dir = os.getcwd()
range_nd = Node(Function(input_names=['n_min', 'n_max'],
output_names=['range_list'],
function=range_fun),
name='range_list')
factorial_nd = MapNode(Function(input_names=['n'],
output_names=['fact_out'],
function=factorial),
iterfield=['n'],
name='factorial')
summing_nd = Node(Function(input_names=['terms'],
output_names=['sum_out'],
function=summing),
name='summing')
range_nd.inputs.n_min = 0
range_nd.inputs.n_max = 3
wf_ex1.add_nodes([range_nd])
wf_ex1.connect(range_nd, 'range_list', factorial_nd, 'n')
wf_ex1.connect(factorial_nd, 'fact_out', summing_nd, "terms")
eg = wf_ex1.run()
let's print all nodes:
In [ ]:
eg.nodes()
the final result should be 10:
In [ ]:
eg.nodes()[2].result.outputs
we can also check the results of two other nodes:
In [ ]:
print(eg.nodes()[0].result.outputs)
print(eg.nodes()[1].result.outputs)
In [ ]:
#write your code here
# 1. write 3 functions: one that return a list of number from specific range,
# second that returns n! (you can use math.factorial) and third that sums the elements from a list
# 2. create a workflow and define the working directory
# 3. define 3 nodes using Node and MapNode and connect them within the workflow
# 4. run the workflow and check the results
In [ ]:
# we can reuse function from previous exercise, but they need some edits
from nipype import Workflow, Node, MapNode, JoinNode, Function
import os
import math
def range_fun(n_max):
return list(range(n_max+1))
def term(k, x):
import math
fract = math.factorial(2 * k + 1)
polyn = x ** (2 * k + 1)
return (-1)**k * polyn / fract
def summing(terms):
return sum(terms)
wf_ex2 = Workflow('ex2')
wf_ex2.base_dir = os.getcwd()
range_nd = Node(Function(input_names=['n_max'],
output_names=['range_list'],
function=range_fun),
name='range_list')
term_nd = MapNode(Function(input_names=['k', 'x'],
output_names=['term_out'],
function=term),
iterfield=['k'],
name='term')
summing_nd = Node(Function(input_names=['terms'],
output_names=['sum_out'],
function=summing),
name='summing')
range_nd.inputs.n_max = 15
x_list = [0, 0.5 * math.pi, math.pi, 1.5 * math.pi, 2 * math.pi]
term_nd.iterables = ('x', x_list)
wf_ex2.add_nodes([range_nd])
wf_ex2.connect(range_nd, 'range_list', term_nd, 'k')
wf_ex2.connect(term_nd, 'term_out', summing_nd, "terms")
eg = wf_ex2.run()
let's check all nodes
In [ ]:
eg.nodes()
let's print all results of ex2.summing
In [ ]:
print(eg.nodes()[2].result.outputs)
print(eg.nodes()[4].result.outputs)
print(eg.nodes()[6].result.outputs)
print(eg.nodes()[8].result.outputs)
print(eg.nodes()[10].result.outputs)
Great, we just implemented pretty good Sine function! Those number should be approximately 0, 1, 0, -1 and 0. If they are not, try to increase $n_max$.
In [ ]:
# write your solution here
# 1. write 3 functions: one that return a list of number from a range between 0 and some n,
# second that returns a term for a specific k, and third that sums the elements from a list
# 2. create a workflow and define the working directory
# 3. define 3 nodes using Node and MapNode and connect them within the workflow
# 4. use iterables for 4 values of x
# 5. run the workflow and check the final results for every value of x
In [ ]:
def merge_results(results, x):
return dict(zip(x, results))
join_nd = JoinNode(Function(input_names=['results', 'x'],
output_names=['results_cont'],
function=merge_results),
name='merge',
joinsource=term_nd, # this is the node that used iterables for x
joinfield=['results'])
# taking the list of arguments from the previous part
join_nd.inputs.x = x_list
# connecting a new node to the summing_nd
wf_ex2.connect(summing_nd, "sum_out", join_nd, "results")
eg = wf_ex2.run()
let's print all nodes
In [ ]:
eg.nodes()
and results from merge
Node:
In [ ]:
eg.nodes()[1].result.outputs
In [ ]:
# write your code here
# 1. create an additional function that takes 2 list and combines them into one container, e.g. dictionary
# 2. use JoinNode to define a new node that merge results from Exercise 2 and connect it to the workflow
# 3. run the workflow and check the results of the merging node