This notebook is taken from reproducible-imaging repository
In [ ]:
import os
from nipype import Workflow, Node, Function
Creating Workflow with one Node that adds two numbers
In [ ]:
def sum(a, b):
return a + b
wf = Workflow('hello')
adder = Node(Function(input_names=['a', 'b'],
output_names=['sum'],
function=sum),
name='a_plus_b')
adder.inputs.a = 1
adder.inputs.b = 3
wf.add_nodes([adder])
wf.base_dir = os.getcwd()
eg = wf.run()
list(eg.nodes())[0].result.outputs
Creating a second node and connecting to the hello
Workflow
In [ ]:
def concat(a, b):
return [a, b]
concater = Node(Function(input_names=['a', 'b'],
output_names=['some_list'],
function=concat),
name='concat_a_b')
wf.connect(adder, 'sum', concater, 'a')
concater.inputs.b = 3
eg = wf.run()
print(eg.nodes())
And we can check results of our Workflow, we should see a list:
In [ ]:
list(eg.nodes())[-1].result.outputs
We will try to add additional Node that adds one:
In [ ]:
def plus_one(a):
return a + 1
plusone = Node(Function(input_names=['a'],
output_names=['out'],
function=plus_one),
name='add_1')
wf.connect(concater, 'some_list', plusone, 'a')
try:
eg = wf.run()
except(RuntimeError) as err:
print("RuntimeError:", err)
else:
raise
This time the workflow didn't execute cleanly and we got an error. We can use nipypecli
to read the crashfile (note, that if you have multiple crashfiles in the directory you'll have to provide a full name):
In [ ]:
!nipypecli crash crash*
It clearly shows the problematic Node and its input. We tried to add an integer to a list, this operation is not allowed in Python.
Let's try using MapNode
In [ ]:
from nipype import MapNode
plusone = MapNode(Function(input_names=['a'],
output_names=['out'],
function=plus_one),
iterfield=['a'],
name='add_1')
wf = Workflow('hello_mapnode')
adder = Node(Function(input_names=['a', 'b'],
output_names=['sum'],
function=sum),
name='a_plus_b')
adder.inputs.a = 1
adder.inputs.b = 3
wf.connect(adder, 'sum', concater, 'a')
concater.inputs.b = 3
wf.connect(concater, 'some_list', plusone, 'a')
wf.base_dir = os.getcwd()
eg = wf.run()
print(eg.nodes())
Now the workflow finished without problems, let's see the results from hello.add_1
:
In [ ]:
print(list(eg.nodes())[2].result.outputs)
And now we will run the example with iterables
:
In [ ]:
adder.iterables = ('a', [1, 2])
adder.inputs.b = 2
eg = wf.run()
print(eg.nodes())
Now we have 6 nodes, we can check results for hello.add_1.a1
In [ ]:
list(eg.nodes())[5].result.outputs
In [ ]:
wf.write_graph(graph2use='exec')
In [ ]:
from IPython.display import Image
We can plot a general structure of the workflow:
In [ ]:
Image("hello_mapnode/graph.png")
And more detailed structure with all nodes:
In [ ]:
Image("hello_mapnode/graph_detailed.png")
We will introduce another iterables, for the concater Node:
In [ ]:
concater.iterables = ('b', [3, 4])
eg = wf.run()
eg.nodes();
In [ ]:
wf.write_graph(graph2use='exec')
In [ ]:
Image("hello_mapnode/graph_detailed.png")
Now we will introduce JoinNode that allows us to merge results together:
In [ ]:
def merge_and_scale_data(data2):
import numpy as np
return (np.array(data2) * 1000).tolist()
from nipype import JoinNode
joiner = JoinNode(Function(input_names=['data2'],
output_names=['data_scaled'],
function=merge_and_scale_data),
name='join_scale_data',
joinsource=adder,
joinfield=['data2'])
wf.connect(plusone, 'out', joiner, 'data2')
eg = wf.run()
eg.nodes()
Let's check the output of hello.join_scale_data.a0
node:
In [ ]:
list(eg.nodes())[0].result.outputs
In [ ]:
wf.write_graph(graph2use='exec')
In [ ]:
Image("hello_mapnode/graph.png")
In [ ]:
Image("hello_mapnode/graph_detailed.png")
In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})
In [ ]:
wf.base_dir = os.path.join(os.getcwd(), 'alt')
In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})
In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})
In [ ]:
#write your code here
# 1. write 3 functions: one that returns a list of number from a specific range,
# second that returns n! (you can use math.factorial) and third, that sums the elements from a list
# 2. create a workflow and define the working directory
# 3. define 3 nodes using Node and MapNode and connect them within the workflow
# 4. run the workflow and check the results
In [ ]:
from nipype import Workflow, Node, MapNode, Function
import os
def range_fun(n_min, n_max):
return list(range(n_min, n_max+1))
def factorial(n):
# print("FACTORIAL, {}".format(n))
import math
return math.factorial(n)
def summing(terms):
return sum(terms)
wf_ex1 = Workflow('ex1')
wf_ex1.base_dir = os.getcwd()
range_nd = Node(Function(input_names=['n_min', 'n_max'],
output_names=['range_list'],
function=range_fun),
name='range_list')
factorial_nd = MapNode(Function(input_names=['n'],
output_names=['fact_out'],
function=factorial),
iterfield=['n'],
name='factorial')
summing_nd = Node(Function(input_names=['terms'],
output_names=['sum_out'],
function=summing),
name='summing')
range_nd.inputs.n_min = 0
range_nd.inputs.n_max = 3
wf_ex1.add_nodes([range_nd])
wf_ex1.connect(range_nd, 'range_list', factorial_nd, 'n')
wf_ex1.connect(factorial_nd, 'fact_out', summing_nd, "terms")
eg = wf_ex1.run()
let's print all nodes:
In [ ]:
eg.nodes()
the final result should be 10:
In [ ]:
list(eg.nodes())[2].result.outputs
we can also check the results of two other nodes:
In [ ]:
print(list(eg.nodes())[0].result.outputs)
print(list(eg.nodes())[1].result.outputs)
In [ ]:
# write your solution here
# 1. write 3 functions: one that returns a list of number from a range between 0 and some n,
# second that returns a term for a specific k, and third, that sums the elements from a list
# 2. create a workflow and define the working directory
# 3. define 3 nodes using Node and MapNode and connect them within the workflow
# 4. use iterables for 4 values of x
# 5. run the workflow and check the final results for every value of x
In [ ]:
# we can reuse function from previous exercise, but they need some edits
from nipype import Workflow, Node, MapNode, JoinNode, Function
import os
import math
def range_fun(n_max):
return list(range(n_max+1))
def term(k, x):
import math
fract = math.factorial(2 * k + 1)
polyn = x ** (2 * k + 1)
return (-1)**k * polyn / fract
def summing(terms):
return sum(terms)
wf_ex2 = Workflow('ex2')
wf_ex2.base_dir = os.getcwd()
range_nd = Node(Function(input_names=['n_max'],
output_names=['range_list'],
function=range_fun),
name='range_list')
term_nd = MapNode(Function(input_names=['k', 'x'],
output_names=['term_out'],
function=term),
iterfield=['k'],
name='term')
summing_nd = Node(Function(input_names=['terms'],
output_names=['sum_out'],
function=summing),
name='summing')
range_nd.inputs.n_max = 15
x_list = [0, 0.5 * math.pi, math.pi, 1.5 * math.pi, 2 * math.pi]
term_nd.iterables = ('x', x_list)
wf_ex2.add_nodes([range_nd])
wf_ex2.connect(range_nd, 'range_list', term_nd, 'k')
wf_ex2.connect(term_nd, 'term_out', summing_nd, "terms")
eg = wf_ex2.run()
let's check all nodes
In [ ]:
eg.nodes()
let's print all results of ex2.summing
In [ ]:
print(list(eg.nodes())[2].result.outputs)
print(list(eg.nodes())[4].result.outputs)
print(list(eg.nodes())[6].result.outputs)
print(list(eg.nodes())[8].result.outputs)
print(list(eg.nodes())[10].result.outputs)
Great, we just implemented pretty good Sine function! Those number should be approximately 0, 1, 0, -1 and 0. If they are not, try to increase $n_max$.
In [ ]:
# write your code here
# 1. create an additional function that takes 2 lists and combines them into one container, e.g. dictionary
# 2. use JoinNode to define a new node that merges results from Exercise 2 and connect it to the workflow
# 3. run the workflow and check the results of the merging node
In [ ]:
def merge_results(results, x):
return dict(zip(x, results))
join_nd = JoinNode(Function(input_names=['results', 'x'],
output_names=['results_cont'],
function=merge_results),
name='merge',
joinsource=term_nd, # this is the node that used iterables for x
joinfield=['results'])
# taking the list of arguments from the previous part
join_nd.inputs.x = x_list
# connecting a new node to the summing_nd
wf_ex2.connect(summing_nd, "sum_out", join_nd, "results")
eg = wf_ex2.run()
let's print all nodes
In [ ]:
eg.nodes()
and results from merge
Node:
In [ ]:
list(eg.nodes())[1].result.outputs