AdaptiveMD

Example 3 - Running an adaptive loop


In [1]:
import sys, os

In [2]:
# stop RP from printing logs until severe
# verbose = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = 'ERROR'

In [3]:
from adaptivemd import (
    Project,
    Event, FunctionalEvent,
    File
)

# We need this to be part of the imports. You can only restore known objects
# Once these are imported you can load these objects.
from adaptivemd.engine.openmm import OpenMMEngine
from adaptivemd.analysis.pyemma import PyEMMAAnalysis


/Users/jan-hendrikprinz/anaconda/lib/python2.7/site-packages/radical/utils/atfork/stdlib_fixer.py:58: UserWarning: logging module already imported before fixup.
  warnings.warn('logging module already imported before fixup.')

Let's open our test project by its name. If you completed the first examples this should all work out of the box.


In [4]:
project = Project('test')

Open all connections to the MongoDB and Session so we can get started.

An interesting thing to note here is, that since we use a DB in the back, data is synced between notebooks. If you want to see how this works, just run some tasks in the last example, go back here and check on the change of the contents of the project.

Let's see where we are. These numbers will depend on whether you run this notebook for the first time or just continue again. Unless you delete your project it will accumulate models and files over time, as is our ultimate goal.


In [5]:
print project.files
print project.generators
print project.models


<StoredBundle with 122 file(s) @ 0x12057c790>
<StoredBundle with 2 file(s) @ 0x12057c750>
<StoredBundle with 10 file(s) @ 0x12057c710>

Now restore our old ways to generate tasks by loading the previously used generators.


In [6]:
engine = project.generators['openmm']
modeller = project.generators['pyemma']
pdb_file = project.files['initial_pdb']

Run simulations

Now we really start simulations. The general way to do so is to create a simulation task and then submit it to a cluster to be executed. A Task object is a general description of what should be done and boils down to staging some files to your working directory, executing a bash script and finally moving files back from your working directory to a shared storage. RP takes care of most of this very elegantly and hence a Task is designed somewhat to cover the capabilities but in a somehow simpler and more pythonic way.

For example there is a RPC Python Call Task that allows you to execute a function remotely and pull back the results.

Functional Events

We want to first look into a way to run python code asynchroneously in the project. For this, write a function that should be executed. Start with opening a scheduler or using an existing one (in the latter case you need to make sure that when it is executed - which can take a while - the scheduler still exists).

If the function should pause, write yield {condition_to_continue}. This will interrupt your script until the function you return will return True when called.


In [7]:
def strategy():
    # create a new scheduler
    with project.get_scheduler(cores=2) as local_scheduler:
        for loop in range(10):
            tasks = local_scheduler(project.new_ml_trajectory(
                length=100, number=10))
            yield tasks.is_done()

            task = local_scheduler(modeller.execute(list(project.trajectories)))
            yield task.is_done

turn a generator of your function use add strategy() and not strategy to the FunctionalEvent


In [8]:
ev = FunctionalEvent(strategy())

and execute the event inside your project


In [9]:
project.add_event(ev)


Out[9]:
<adaptivemd.event.FunctionalEvent at 0x11ffc1290>

after some time you will have 10 more trajectories. Just like that.

Let's see how our project is growing


In [13]:
import time
from IPython.display import clear_output

In [14]:
try:
    while True:
        clear_output(wait=True)
        print '# of files  %8d : %s' % (len(project.trajectories), '#' * len(project.trajectories))
        print '# of models %8d : %s' % (len(project.models), '#' * len(project.models))
        sys.stdout.flush()
        time.sleep(1)
        
except KeyboardInterrupt:
    pass


# of files       169 : #########################################################################################################################################################################
# of models       20 : ####################

And some analysis


In [33]:
trajs = project.trajectories
q = {}
ins = {}
for f in trajs:
    source = f.frame if isinstance(f.frame, File) else f.frame.trajectory
    ind = 0 if isinstance(f.frame, File) else f.frame.index
    ins[source] = ins.get(source, []) + [ind]

Event


In [7]:
scheduler = project.get_scheduler(cores=2)

In [8]:
def strategy1():
    for loop in range(10):
        tasks = scheduler(project.new_ml_trajectory(
            length=100, number=10))
        yield tasks.is_done()

In [9]:
def strategy2():
    for loop in range(10):
        num = len(project.trajectories)
        task = scheduler(modeller.execute(list(project.trajectories)))
        yield task.is_done
        yield project.on_ntraj(num + 5)

In [16]:
project._events = []

In [10]:
project.add_event(FunctionalEvent(strategy1))
project.add_event(FunctionalEvent(strategy2))


Out[10]:
<adaptivemd.event.FunctionalEvent at 0x12151e190>

In [18]:
project.close()

Tasks

To actually run simulations you need to have a scheduler (maybe a better name?). This instance can execute tasks or more precise you can use it to submit tasks which will be converted to ComputeUnitDescriptions and executed on the cluster previously chosen.


In [32]:
scheduler = project.get_scheduler(cores=2)  # get the default scheduler using 2 cores

Now we are good to go and can run a first simulation

This works by creating a Trajectory object with a filename, a length and an initial frame. Then the engine will take this information and create a real trajectory with exactly this name, this initil frame and the given length.

Since this is such a common task you can also submit just a Trajectory without the need tp convert it to a Task first (which the engine can also do).

Out project can create new names automatically and so we want 4 new trajectories of length 100 and starting at the existing pdb_file we use to initialize the engine.


In [33]:
trajs = project.new_trajectory(pdb_file, 100, 4)

Let's submit and see


In [34]:
scheduler.submit(trajs)


Out[34]:
[<adaptivemd.task.Task at 0x12184aa10>,
 <adaptivemd.task.Task at 0x1219628d0>,
 <adaptivemd.task.Task at 0x1216e6050>,
 <adaptivemd.task.Task at 0x12151d950>]

Once the trajectories exist these objects will be saved to the database. It might be a little confusing to have objects before they exist, but this way you can actually work with these trajectories like referencing even before they exist.

This would allow to write now a function that triggers when the trajectory comes into existance. But we are not doing this right now.

Wait is dangerous since it is blocking and you cannot do anything until all tasks are finished. Normally you do not need it. Especially in interactive sessions.


In [35]:
scheduler.wait()

Look at all the files our project now contains.


In [36]:
print '# of files', len(project.files)


# of files 18

Great! That was easy (I hope you agree).

Next we want to run a simple analysis.


In [37]:
t = modeller.execute(list(project.trajectories))

In [38]:
scheduler(t)


Out[38]:
[<adaptivemd.task.PythonTask at 0x12199a1d0>]

In [39]:
scheduler.wait()

Let's look at the model we generated


In [43]:
print project.models.last.data.keys()


['clustering', 'input', 'msm', 'input_trajectories', 'tica']

And pick some information


In [45]:
print project.models.last.data['msm']['P']


[[ 0.81400438  0.0147784   0.00405625  0.07977394  0.08738703]
 [ 0.0129096   0.97701148  0.          0.00149321  0.00858571]
 [ 0.02517036  0.          0.864       0.01026794  0.1005617 ]
 [ 0.12381115  0.00265298  0.00256813  0.87096774  0.        ]
 [ 0.09210144  0.01035882  0.01707997  0.          0.88045977]]

Next example will demonstrate on how to write a full adaptive loop

Events

A new concept. Tasks are great and do work for us. But so far we needed to submit tasks ourselves. In adaptive simulations we want this to happen automagically. To help with some of this events exist. This are basically a task_generator coupled with conditions on when to be executed.

Let's write a little task generator (in essence a function that returns tasks)


In [9]:
def task_generator():
    return [
        engine.task_run_trajectory(traj) for traj in
        project.new_ml_trajectory(100, 4)]

In [10]:
task_generator()


Out[10]:
[<adaptivemd.task.Task at 0x121367a50>,
 <adaptivemd.task.Task at 0x121435350>,
 <adaptivemd.task.Task at 0x121435e50>,
 <adaptivemd.task.Task at 0x121460190>]

Now create an event.


In [27]:
ev = Event().on(project.on_ntraj(range(20,22,2))).do(task_generator)

.on specifies when something should be executed. In our case when the project has a number of 20 trajectories. This is not yet the case so this event will not do anything unless we simulation more trajectories.

.do specifies the function to be called.

The concept is borrowed from event based languages like often used in JavaScript.

You can build quite complex execution patterns with this. An event for example also knows when it is finished and this can be used as another trigger.


In [28]:
def hello():
    print 'DONE!!!'
    return []  # todo: allow for None here

finished = Event().on(ev.on_done).do(hello)

In [29]:
scheduler.add_event(ev)
scheduler.add_event(finished)


DONE!!!
Out[29]:
<adaptivemd.event.Event at 0x12156d050>

All events and tasks run parallel or at least get submitted and queue for execution in parallel. RP takes care of the actual execution.


In [34]:
print '# of files', len(project.files)


# of files 34

So for now lets run more trajectories and schedule computation of models in regular intervals.


In [35]:
ev1 = Event().on(project.on_ntraj(range(30, 70, 4))).do(task_generator)
ev2 = Event().on(project.on_ntraj(38)).do(lambda: modeller.execute(list(project.trajectories))).repeat().until(ev1.on_done)
scheduler.add_event(ev1)
scheduler.add_event(ev2)


Out[35]:
<adaptivemd.event.Event at 0x121528ad0>

In [87]:
len(project.trajectories)


Out[87]:
43

In [94]:
len(project.models)


Out[94]:
0

.repeat means to redo the same task when the last is finished (it will just append an infinite list of conditions to keep on running).

.until specifies a termination condition. The event will not be executed once this condition is met. Makes most sense if you use .repeat or if the trigger condition and stopping should be independent. You might say, run 100 times unless you have a good enough model.


In [92]:
print project.files


<StoredBundle with 70 file(s) @ 0x12056f3d0>

Strategies (aka the brain)

The brain is just a collection of events. This makes it reuseable and easy to extend.


In [93]:
project.close()

In [ ]: