This example should guide you to build your own simple generator.
In [ ]:
from adaptivemd import (
Project, Task, File, PythonTask
)
project = Project('tutorial')
engine = project.generators['openmm']
modeller = project.generators['pyemma']
pdb_file = project.files['initial_pdb']
We assume that you have completed at least some of the previous examples and have a general idea of how adaptiveMD works. Still, let's recapitulate what we think is the typical way of a simulation.
To execute something you need
Task
object. Once you have this you can,Scheduler
which will interpret the Task
into some code that the computer understands. It handles all the little things you expect from the task, like registering generated file, etc... And to do so, the Scheduler
needsResource
description which acts like a config for the schedulerWhen you have a Scheduler
(with Resource
) you let it execute Task
objects. If you know how to build these you are done. That is all you need.
Generator
s?Build a task can be cumbersome and often repetative, and a factory for Task
objects is extremely useful. These are called Generator
s (maybe TaskFactory
) is a better name?!?
In your final scheme where you observe all generated objects and want to build new tasks accordingly you will (almost) never build a Task
yourself. You use a generator.
A typical example is an Engine
. It will generate tasks, that simulate new trajectories, extend existing ones, etc... Basic stuff. The second big class is Analysis
. It will use trajectories to generate models or properties of interest to guide your decisions for new trajectories.
In this example we will build a simple generator for a task, that uses the mdtraj
package to compute some features and store these in the database and in a file.
MDTrajFeaturizer
generatorFirst, we think about how this featurizer works if we would not use adaptivemd
. The reason is, that we have basically two choices for designing a Task
(see example 4 about Task
objects).
Since we want to call mdtraj
functions we use the 2nd and start with a skeleton for this type and store it under my_generator.py
In [ ]:
%%file my_generator.py
# This is an example for building your own generator
# This file must be added to the project so that it is loaded
# when you import `adaptivemd`. Otherwise your workers don't know
# about the class!
from adaptivemd import Generator
class MDTrajFeaturizer(Generator):
def __init__(self, {things we always need}):
super(PyEMMAAnalysis, self).__init__()
# stage file you want to reuse (optional)
# self['pdb_file'] = pdb_file
# stage = pdb_file.transfer('staging:///')
# self['pdb_file_stage'] = stage.target
# self.initial_staging.append(stage)
@staticmethod
def then_func(project, task, data, inputs):
# add the output for later reference
project.data.add(data)
def execute(self, {options per task}):
t = PythonTask(self)
# get your staged files (optional)
# input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')
# add the python function call to your script (there can be only one!)
t.call(
my_script,
param1,
param2,
...
)
return t
def my_script(param1, param2, ...):
return {"whatever you want to return"}
Mdtraj needs a topology unless it is already present. Interestingly, our Trajectory
objects know about their topology so we could access these, if our function is to process a Trajectory
. This requires the Trajectory
to be the input. If we want to process any file, then we might need a topology.
The decision if we want the generator to work for a fixed topology is yours. To show how this would work, we do this here. We use a fixed topology per generator that applies to File
objects.
Second is the feature we want to compute. This is tricky and so we hard code this now. You can think of a better way to represent this. But let's pick the tertiary stucture prediction
In [ ]:
def __init__(self, pdb_file=None):
super(PyEMMAAnalysis, self).__init__()
# if we provide a pdb_file it should be used
if pdb_file is not None:
# stage file you want to reuse (optional)
# give the file an internal name
self['pdb_file'] = pdb_file
# create the transfer from local to staging:
stage = pdb_file.transfer('staging:///')
# give the staged file an internal name
self['pdb_file_stage'] = stage.target
# append the transfer action to the initial staging action list
self.initial_staging.append(stage)
In [ ]:
def execute(self, file_to_analyze):
assert(isinstance(file_to_analyze, File))
t = PythonTask(self)
# get your staged files (optional)
if self.get('pdb_file_stage'):
input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')
else:
input_pdb = None
# add the python function call to your script (there can be only one!)
t.call(
my_script,
file_to_analyze,
input_pdb
)
return t
This script is executed on the HPC for you. And requires mdtraj to be installed on it.
In [ ]:
def my_script(file_to_analyze, input_pdb):
import mdtraj as md
traj = md.load(file_to_analyze, top=input_pdb)
features = traj.compute_xyz()
return features
That's it. At least in the simplest form. When you use this to create a Task
In [ ]:
my_generator = MDTrajFeaturizer(pdb_file)
task = my_generator.execute(traj.file('master.dcd'))
project.queue(task)
We wait and then the Task
object has a .output
property which now contains the returned result.
This can now be used in your execution plans...
In [ ]:
def strategy():
# generate some structures...
# yield wait ...
# get a traj object
task = my_generator.execute(traj.outputs('master'))
# wait until the task is done
yield task.is_done
# print the output
output = task.output
# do something with the result, store in the DB, etc...
Next, we look at improvements
Often you want to save the output from your function in the DB in some form or another. Though the output is stored, it is not conviniently accessed unless you know the task that was used.
For this reason there is a callback function you can set, that can take care of doing a custom handling of the output. The function to be called needs to be a method of the generator and you can give the task the name of the method. The name (str) of the funtion can be set using the then()
command. An the default name is then_func
.
In [ ]:
def execute(self, ...):
t = PythonTask(self)
t.then('handle_my_output')
@staticmethod
def handle_my_output(project, task, data, inputs):
print 'Saving data from task', task, 'into model'
m = Model(data)
project.model.add(m)
The function takes exactly 4 parameters
project
: the project in which the task was run. Is used to access the database, etctask
: the actual task object that produced the outputdata
: the output returned by the functioninputs
: the input to the python function call (internally). The data actually transmitted to the worker to runLike in the above example you can do whatever you want with your data, store it, alter it, write it to a file, etc. In case you do not want to additionally save the output (data
) in the DB as an object, you can tell the trask not to by setting
In [ ]:
def execute(self, ...):
t = PythonTask(self)
t.then('handle_my_output')
t.store_output = False # default is `True`
in that case .output
will stay None
even after execution
Note that you always have to write file generation and file analysis/reading that matches. We only store some very general properties of objects with them, e.g. a stride for trajectories. This means you cannot arbitrarily mix code for these.
Now we want that this works
In [ ]:
my_generator.execute(traj)
This is rather simple: All you need to do is to extract the actual files from the trajectory object.
In [ ]:
def __init__(self, outtype, pdb_file=None):
super(PyEMMAAnalysis, self).__init__()
# we store a str that holds the name of the outputtype
# this must match the definition
self.outtype = outtype
# ...
def execute(self, traj, *args, **kwargs):
t = PythonTask(self)
# ...
file_location = traj.outputs(self.outtype) # get the trajectory file matching outtype
# use the file_location.
# ...
Import! You have no access to the Trajectory
object in our remove function. These will be converted
to a real path relative to the working directory. This makes sure that you will not have to deal with
prefixes, etc. This might change in the future, but. The scripts are considered independent of adaptivemd!
This is not complicated but you need to briefly learn about the mechanism to store complex Python objects in the DB. The general way to Store an instance of a class requires you to subclass from adaptivemd.mongodb.StorableMixin
. This provides the class with a __uuid__
attribute that is a unique number for each storable object that is given at creation time. (If we would just store objects using pymongo we would get a number like this, but later). Secondly, it add two functions
to_dict()
: this converts the (immutable) state of the object into a dictionary that is simple enough that it can be stored. Simple enought means, that you can have Python primitives, things like numpy arrays or even other storable objects, but not arbitrary objects in it, like lambda constructs (these are possible but need special treatment)from_dict()
: The reverse. It takes the dictionary from to_dict
and must return an equivalent object!So, you can do
clone = obj.__class__.from_dict(obj.to_dict())
and get an equal object in that it has the same attributes. You could also say a deep copy.
This is not always trivial and there exists a default implementation, which will make an additional assumption:
All necessary attributes have the same parameters in __init__
. So, this would correspond to this rule
In [ ]:
class MyStorableObject(StorableMixin):
def __init__(self, state):
self.state = state
while this would not work
In [ ]:
class MyStorableObject(StorableMixin):
def __init__(self, initial_state):
self.state = initial_state
In the second case you need to overwrite the default function. All of these will work
In [ ]:
# fix `to_dict` to match default `from_dict`
class MyStorableObject(StorableMixin):
def __init__(self, initial_state):
self.state = initial_state
def to_dict(self):
return {
'initial_state': self.state
}
In [ ]:
# fix `from_dict` to match default `to_dict`
class MyStorableObject(StorableMixin):
def __init__(self, initial_state):
self.state = initial_state
@classmethod
def from_dict(cls, dct):
return cls(initial_state=dct['state'])
In [ ]:
# fix both `from_dict` and `to_dict`
class MyStorableObject(StorableMixin):
def __init__(self, initial_state):
self.state = initial_state
def to_dict(self):
return {
'my_state': self.state
}
@classmethod
def from_dict(cls, dct):
return cls(initial_state=dct['my_state'])
If you do that, make sure that you really capture all variables. Especially if you subclass from an existing one. You can use super to access the result from the parent class
In [ ]:
class MyStorableObject(StorableMixin):
@classmethod
def from_dict(cls, dct):
obj = super(MyStorableObject, cls).from_dict(dct)
obj.missing_attr1 = dct['missing_attr_key1']
return obj
def to_dict(self):
dct = super(MyStorableObject, self).to_dict(self)
dct.update({
'missing_attr_key1': self.missing_attr1
})
return dct
This is the recommended way to build your custom functions. For completeness we show here what the base TaskGenerator
class will do
In [ ]:
@classmethod
def from_dict(cls, dct):
obj = cls.__new__(cls)
StorableMixin.__init__(obj)
obj._items = dct['_items']
obj.initial_staging = dct['initial_staging']
return obj
def to_dict(self):
return {
'_items': self._items,
'initial_staging': self.initial_staging
}
The only unfamiliar part is the
obj = cls.__new__(cls)
StorableMixin.__init__(obj)
which needs a little explanation.
In most __init__
functions for a TaskGenerator
you will construct the initial_staging
attribute with some functions. If you would reconstruct by just calling the constructor with the same parameters again, this would result in an equal object as expected and that would work, but not in all regards as expected: The problem is that if you generate objects that can be stored, these will get new UUIDs and hence are considered different from the ones that you wanted to store. In short, the construction in the __init__
prevents you from getting the real old object back, you always construct something new.
This can be solved by not using __init__
but creating an empty object using __new__
and then fixing all attributes to the original state. This is very similar to __setstate__
which we do not use in general to still allow using __init__
which makes sense in most cases where not storable objects are generated.
In the following we discuss an existing generator
In [ ]:
A word about this example. While a Task
can be created and configured a new class in adaptivemd
needs to be part of the project. So we will write discuss the essential parts of the existing code.
A generator is in essence a factory to create Task
objects with a single command. A generator can be initialized with certain files that the created tasks will always need, like an engine will need a topology for each task, etc. It also (as explained briefly before in Example 4) knows about certain callback behaviour of their tasks. Last, a generator allows you to assign a worker only to tasks that were created by a generator.
Let's look at the code of the PyEMMAAnalysis
class PyEMMAAnalysis(Analysis):
def __init__(self, pdb_file):
super(PyEMMAAnalysis, self).__init__()
self['pdb_file'] = pdb_file
stage = pdb_file.transfer('staging:///')
self['pdb_file_stage'] = stage.target
self.initial_staging.append(stage)
@staticmethod
def then_func(project, task, model, inputs):
# add the input arguments for later reference
model.data['input']['trajectories'] = inputs['files']
model.data['input']['pdb'] = inputs['topfile']
project.models.add(model)
def execute(
self,
trajectories,
tica_lag=2,
tica_dim=2,
msm_states=5,
msm_lag=2,
stride=1):
t = PythonTask(self)
input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')
t.call(
remote_analysis,
trajectories=list(trajectories),
topfile=input_pdb,
tica_lag=tica_lag,
tica_dim=tica_dim,
msm_states=msm_states,
msm_lag=msm_lag,
stride=stride
)
return t
def __init__(self, pdb_file):
# don't forget to call super
super(PyEMMAAnalysis, self).__init__()
# a generator also acts like a dictionary for files
# this way you can later access certain files you might need
# save the pdb_file under the same name
self['pdb_file'] = pdb_file
# this creates a transfer action like it is used in tasks
# and moves the passed pdb_file (usually on the local machein)
# to the staging_area root directory
stage = pdb_file.transfer('staging:///')
# and the new target file (which is also like the original)
# on the staging_area is saved unter `pdb_file_stage`
# so, we can access both files if we wanted to
# note that the original file most likely is in the DB
# so we could just skip the stage transfer completely
self['pdb_file_stage'] = stage.target
# last we add this transfer to the initial_staging which
# is done only once per used generator
self.initial_staging.append(stage)
# the kwargs is to keep the exmaple short, you should use explicit
# parameters and add appropriate docs
def execute(self, trajectories, **kwargs):
# create the task and set the generator to self, our new generator
t = PythonTask(self)
# we want to copy the staged file to the worker directory
# and name it `input.pdb`
input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')
# if you chose not to use the staging file and copy it directly you
# would use in analogy
# input_pdb = t.link(self['pdb_file'], 'input.pdb')
# finally we use `.call` and want to call the `remote_analysis` function
# which we imported earlier from somewhere
t.call(
remote_analysis,
trajectories=list(trajectories),
**kwargs
)
return t
And finally a call_back function. The name then_func
is the default function name to be called.
# we use a static method, but you can of course write a normal method
@staticmethod
# the call_backs take these arguments in this order
# the second parameter is actually a `Model` object in this case
# which has a `.data` attribute
def then_func(project, task, model, inputs):
# add the input arguments for later reference to the model
model.data['input']['trajectories'] = inputs['kwargs']['files']
model.data['input']['pdb'] = inputs['kwargs']['topfile']
# and save the model in the project
project.models.add(model)
A brief summary and things you need to set to make your generator work
class MyGenerator(Analysis):
def __init__(self, {things your generator always needs}):
super(MyGenerator, self).__init__()
# Add input files to self
self['file1'] = file1
# stage all files to the staging area of you want to keep these
# files on the HPC
for fn in ['file1', 'file2', ...]:
stage = self[fn].transfer('staging:///')
self[fn + '_stage'] = stage.target
self.initial_staging.append(stage)
@staticmethod
def then_func(project, task, outputs, inputs):
# do something with input and outputs
# store something in your project
def task_using_python_rpc(
self,
{arguments}):
t = PythonTask(self)
# set any task dependencies if you need
t.dependencies = []
input1 = t.link(self['file1'], 'alternative_name1')
input2 = t.link(self['file2'], 'alternative_name2')
...
# add whatever bash stuff you need BEFORE the function call
t.append('some bash command')
...
# use input1, etc in your function call if you like. It will
# be converted to a regular file location you can use
t.call(
{my_remote_python_function},
files=list(files),
)
# add whatever bash stuff you need AFTER the function call
t.append('some bash command')
...
return t
def task_using_bash_argument_call(
self,
{arguments}):
t = Task(self)
# set any task dependencies if you need
t.dependencies = []
input1 = t.link(self['file1'], 'alternative_name1')
input2 = t.link(self['file2'], 'alternative_name2')
...
# add more staging
t.append({action})
...
# add whatever bash stuff you want to do
t.append('some bash command')
...
# add whatever staging stuff you need AFTER the function call
t.append({action})
...
return t
The simplified code for the OpenMMEngine
class OpenMMEngine(Engine):
trajectory_ext = 'dcd'
def __init__(self, system_file, integrator_file, pdb_file, args=None):
super(OpenMMEngine, self).__init__()
self['pdb_file'] = pdb_file
self['system_file'] = system_file
self['integrator_file'] = integrator_file
self['_executable_file'] = exec_file
for fn in self.files:
stage = self[fn].transfer(Location('staging:///'))
self[name + '_stage'] = stage.target
self.initial_staging.append(stage)
if args is None:
args = '-p CPU --store-interval 1'
self.args = args
# this one only works if you start from a file
def task_run_trajectory_from_file(self, target):
# we create a special Task, that has some additional functionality
t = TrajectoryGenerationTask(self, target)
# link all the files we require
initial_pdb = t.link(self['pdb_file_stage'], Location('initial.pdb'))
t.link(self['system_file_stage'])
t.link(self['integrator_file_stage'])
t.link(self['_executable_file_stage'])
# use the initial PDB to be used
input_pdb = t.get(target.frame, 'coordinates.pdb')
# this represents our output trajectory
output = Trajectory('traj/', target.frame, length=target.length, engine=self)
# create the directory so openmmrun can write to it
t.touch(output)
# build the actual bash command
cmd = 'python openmmrun.py {args} -t {pdb} --length {length} {output}'.format(
pdb=input_pdb,
length=target.length,
output=output,
args=self.args,
)
t.append(cmd)
# copy the resulting trajectory directory back to the staging area
t.put(output, target)
return t
In [ ]:
project.close()