In [1]:
from adaptivemd import Project
Let's open our tutorial
project by its name. If you completed the first examples this should all work out of the box.
In [2]:
project = Project('tutorial')
Open all connections to the MongoDB
and Session
so we can get started.
An interesting thing to note here is, that since we use a DB in the back, data is synced between notebooks. If you want to see how this works, just run some tasks in Tutorial 1 or 2, then come back here and check on the change of the project contents.
Let's see where we are. These numbers will depend on whether you run this notebook for the first time or just continue again. Unless you delete your project it will accumulate models and files over time, as is our ultimate goal.
In [3]:
print(project.files)
print(project.generators)
print(project.models)
Now restore our old ways to generate tasks by loading the previously used generators.
In [4]:
engine = project.generators['openmm']
modeller = project.generators['pyemma']
pdb_file = project.files['initial_pdb']
You are free to conduct your simulations from a notebook but normally you will use a script. The main point about adaptivity is to make decision about tasks along the way.
To make this happen we need Conditions
which are functions that evaluate to True
or False
and once they are True
they cannot change anymore back to False
. Like a one time on switch.
These are used to describe the happening of an event. We will now deal with some types of events.
We want to first look into a way to run python code asynchronously in the project. For this, we write a function that should be executed. Inside you will create tasks and submit them.
If the function should pause, write yield {condition_to_continue}
. This will interrupt your script until the function you return will return True
when called. An example event function here with different (some equivalent) conditions described:
In [5]:
def strategy(loops=10, trajs_per_loop=4, length=100):
for loop in range(loops):
# submit some trajectory tasks
trajectories = project.new_ml_trajectory(engine, length, trajs_per_loop)
tasks = map(engine.run, trajectories)
project.queue(tasks)
# continue if ALL of the tasks are done (can be failed)
#yield [task.is_done for task in tasks]
#yield lambda: all([task.is_done() for task in tasks])
yield lambda: all(map(lambda task: task.is_done(), tasks))
# how about ANY of the tasks
# --> some won't be included in model
#yield lambda: any(map(lambda task: task.is_done(), tasks))
# LESS smart since tasks might fail, so we'd get the progress
# with task.is_done but not traj.exists
#yield lambda: all(map(lambda tj: tj.exists, trajectories))
# submit an analysis
task = modeller.execute(list(project.trajectories))
project.queue(task)
# when it is done do next loop
yield task.is_done
and add the event to the project (these cannot be stored!)
This logical layer is implemented by the class adaptivemd.Event
and is used to run the strategy function. Here we can see why
this function is a generator, and needs to yield functions that
return a Boolean value. The blocks between yield
statements are
used to generate the workflow as seen above, and the yielded functions
should be used to inspect the state of the workflow.
done = False
proceed = True
while not done:
try:
if proceed:
# _next is a function reference
_next_func = next(strategy())
proceed = False
# val is Boolean, returned by _next_func
val = _next_func()
if val is True:
proceed = True
time.sleep(5)
except StopIteration:
done = True
When the strategy has been exhausted, the workflow is done.
In [6]:
event = project.add_event(strategy(loops=2))
What is missing now? The adding of the event triggered the first part of the code. But to recheck if we should continue needs to be done manually.
Still that is no problem, we can do that easily and watch what is happening
Let's see how our project is growing. TODO: Add threading.Timer to auto trigger.
In [7]:
import sys, time
from IPython.display import clear_output
One way to wait for an event is to use a reference to it, returned by the project.add_event
method. The event objects are a False condition when completed, and True before this.
In [8]:
try:
while project._events:
print('# of files %8d : %s' % (len(project.trajectories), '#' * len(project.trajectories)))
print('# of models %8d : %s' % (len(project.models), '#' * len(project.models)))
sys.stdout.flush()
project.trigger()
time.sleep(3)
clear_output(wait=True)
except KeyboardInterrupt:
pass
Let's do another round with more loops. This time we will wait using the project's events_done
condition. In the prior example, the project is manually triggered until the event is complete. By using wait_until
method, the project will trigger itself.
In [9]:
project.add_event(strategy(loops=2))
project.wait_until(project.events_done)
And some analysis (might have better functions for that)
In [10]:
from adaptivemd import File
In [11]:
# find, which frames from which trajectories have been chosen
trajs = project.trajectories
q = {}
ins = {}
for f in trajs:
source = f.frame if isinstance(f.frame, File) else f.frame.trajectory
ind = 0 if isinstance(f.frame, File) else f.frame.index
ins[source] = ins.get(source, []) + [ind]
for a,b in ins.iteritems():
print a.short, ':', b
And do this with multiple events in parallel.
In [12]:
def strategy2():
for loop in range(10):
num = len(project.trajectories)
task = modeller.execute(list(project.trajectories))
print(task)
project.queue(task)
yield task.is_done
# continue only when there are at least 2 more trajectories
print("Requiring %d trajectories for strategy2 to complete" % num)
yield project.on_ntraj(num + 2)
In [18]:
project.add_event(strategy(loops=10, trajs_per_loop=2))
project.add_event(strategy2())
Out[18]:
See, that we again reused our strategy.
In [19]:
ev=project._events[0]
In [21]:
project._events[0].trigger()
In [22]:
project._events[0]._finish_conditions[0]()
Out[22]:
In [23]:
project.wait_until(project.events_done)
# Its hard to catch this becuase the _events list
# clears when an event's finish_conditions evaluate
# to True
project._events[0]._finish_conditions[0]()
In [24]:
project.workers.all.execute('shutdown')
project.close()
In [ ]: