In [1]:
import sys, os
# verbose = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = 'ERROR'
from adaptivemd import (
AllegroCluster,
Project,
File, Directory,
Event, FunctionalEvent,
LocalCluster
Engine, Analysis, Model)
from adaptivemd.engine.openmm import OpenMMEngine
from adaptivemd.mongodb import UUIDObjectJSON, ObjectJSON, ObjectStore
from adaptivemd.analysis.pyemma import PyEMMAAnalysis
import numpy as np
In [2]:
import adaptivemd as amd
import adaptivemd.engine.openmm as omm
Let's open a project with a UNIQUE name. This will be the name used in the DB so make sure it is new.
In [5]:
project = Project('test')
Set up the project and pick a resource. This should be done only the first time, when the project is created.
In [6]:
print project.files
print project.generators
print project.models
In [7]:
engine = project.generators.c(Engine).one
modeller = project.generators.c(Analysis).one
pdb_file = project.files.f('*.pdb').one
Opening a project will open the DB and create a RP session to be used
In [17]:
print project.trajectories
# for f in project.files:
# print f.url
In [18]:
t = engine.task_import_trajectory_folder('/Users/jan-hendrikprinz/radical.pilot.sandbox/rp.session.Stevie.fritz.box.jan-hendrikprinz.017219.0004-pilot.0000/staging_area/trajs/*.dcd')
In [19]:
scheduler = project.get_scheduler()
In [20]:
scheduler.submit(t)
Out[20]:
In [23]:
print project.files
In [37]:
import os
from adaptivemd.reducer import _replace_prefix
In [41]:
print _replace_prefix(project.files.one.url)
print _replace_prefix(project.files.last.url)
In [47]:
os.path.relpath(
'rp.session.Stevie.fritz.box.jan-hendrikprinz.017219.0004-pilot.0000/staging_area/trajs/00000000.dcd',
'rp.session.Stevie.fritz.box.jan-hendrikprinz.017219.0031-pilot.0000/unit.000000')
Out[47]:
In [43]:
os.getcwd()
Out[43]:
In [9]:
from adaptivemd import Archiver
In [10]:
ar = Archiver(Directory('shared://delete'))
In [11]:
scheduler = project.get_scheduler()
In [12]:
t = ar.task_archive(list(project.trajectories))
In [13]:
scheduler.submit(t)
Out[13]:
In [15]:
project.close()
To actually run simulations you need to have a scheduler (maybe a better name?). This instance can execute tasks or more precise you can use it to submit tasks which will be converted to ComputeUnitDescriptions and executed on the cluster previously chosen.
In [64]:
scheduler = project.get_scheduler(cores=2)
In [65]:
scheduler.submit(project.new_ml_trajectory(100, 1))
Out[65]:
In [84]:
project.new_ml_trajectory(100, 1)
Out[84]:
In [170]:
print project.files.last
In [175]:
t = modeller.execute(list(project.trajectories))
In [176]:
scheduler.submit(t)
Out[176]:
In [87]:
import numpy as np
In [186]:
print project.models.last.data['msm']['C']
print (lambda x: x / sum(x))(np.sum(project.models.last.data['msm']['C'], axis=1))
In [53]:
project.close()
Now we are good to go and can run a first simulation
This works by creating a Trajectory object with a filename, a length and an initial frame. Then the engine will take this information and create a real trajectory with exactly this name, this initil frame and the given length.
Since this is such a common task you can also submit just a Trajectory
without the need tp convert it to a Task
first (which the engine can also do).
Out project can create new names automatically and so we want 4 new trajectories of length 100 and starting at the existing pdb_file we use to initialize the engine.
Great! That was easy (I hope you agree).
A new concept. Tasks are great and do work for us. But so far we needed to submit tasks ourselves. In adaptive simulations we want this to happen automatically. To help with some of this events exist. This are basically a task_generator coupled with conditions on when to be executed.
Let's write a little task generator (in essence a function that returns tasks)
In [51]:
def task_generator():
return [
engine.task_run_trajectory(traj) for traj in
project.new_ml_trajectory(100, 2)]
In [52]:
task_generator()
Out[52]:
Now create an event.
In [53]:
ev = Event().on(project.on_ntraj(range(4, 12, 2))).do(task_generator)
.on
specifies when something should be executed. In our case when the project has a number of trajectories (ntraj) of being one of range(4, 24, 2)
which is [4, 6, 8, 10, ...]
.do
specifies the function to be called.
The concept is borrowed from event based languages like often used in JavaScript.
You can build quite complex execution patterns with this. An event for example also knows when it is finished and this can be used as another trigger.
In [54]:
def hello():
print 'DONE!!!'
return [] # todo: allow for None here
finished = Event().on(ev.on_tasks_finished()).do(hello)
In [55]:
scheduler.add_event(ev)
scheduler.add_event(finished)
Out[55]:
All events and tasks run parallel or at least get submitted and queue for execution in parallel. RP takes care of the actual execution.
So for now lets run more trajectories and schedule computation of models in regular intervals.
In [74]:
scheduler.add_event(
Event().on(project.on_ntraj(range(4, 50, 2))).do(task_generator)
)
scheduler.add_event(
Event()
.on(project.on_ntraj(10))
.do(modeller.task_run_msm)
.repeat().until(project.on_ntraj(20))) # todo: change that this will stop when the first event is done
Out[74]:
.repeat
means to redo the same task when the last is finished (it will just append an infinite list of conditions to keep on running).
.until
specifies a termination condition. The event will not be executed once this condition is met. Makes most sense if you use .repeat
or if the trigger condition and stopping should be independent. You might say, run 100 times unless you have a good enough model.
In [61]:
from adaptivemd import FunctionalEvent
In [59]:
def strategy():
# create a new scheduler
local_scheduler = project.get_scheduler(cores=2)
# run 10 trajs of length 100 in parallel
tasks = scheduler.submit(project.new_ml_trajectory(
length=100, number=10))
# wait until this is finished and specify a condition on when to
# continue (all tasks need to be done)
yield [t.is_done for t in tasks]
# close scheduler when job is done
local_scheduler.exit()
# yield a condition on when to be done. Nothing since we are
# done already
yield []
In [63]:
scheduler.add_event(FunctionalEvent(strategy))
Out[63]:
In [71]:
scheduler.add_event(FunctionalEvent(strategy))
Out[71]:
In [74]:
ev = FunctionalEvent(strategy)
In [76]:
ev._current_when
In [68]:
project.schedulers
Out[68]:
In [40]:
print project.files
print len(project.models)
The brain is just a collection of events. This makes it reuseable and easy to extend.
In [13]:
project.close()
In [ ]: