Custom Generator objects

This example should guide you to build your own simple generator.


In [ ]:
from adaptivemd import (
    Project, Task, File, PythonTask
)

project = Project('tutorial')

engine = project.generators['openmm']
modeller = project.generators['pyemma']
pdb_file = project.files['initial_pdb']

Basic knowledge

We assume that you have completed at least some of the previous examples and have a general idea of how adaptiveMD works. Still, let's recapitulate what we think is the typical way of a simulation.

How to execute something

To execute something you need

  1. a description of the task to be done. This is the Task object. Once you have this you can,
  2. use it in a Scheduler which will interpret the Task into some code that the computer understands. It handles all the little things you expect from the task, like registering generated file, etc... And to do so, the Scheduler needs
  3. your Resource description which acts like a config for the scheduler

When you have a Scheduler (with Resource) you let it execute Task objects. If you know how to build these you are done. That is all you need.

What are Generators?

Build a task can be cumbersome and often repetative, and a factory for Task objects is extremely useful. These are called Generators (maybe TaskFactory) is a better name?!?

In your final scheme where you observe all generated objects and want to build new tasks accordingly you will (almost) never build a Task yourself. You use a generator.

A typical example is an Engine. It will generate tasks, that simulate new trajectories, extend existing ones, etc... Basic stuff. The second big class is Analysis. It will use trajectories to generate models or properties of interest to guide your decisions for new trajectories.

In this example we will build a simple generator for a task, that uses the mdtraj package to compute some features and store these in the database and in a file.

The MDTrajFeaturizer generator

First, we think about how this featurizer works if we would not use adaptivemd. The reason is, that we have basically two choices for designing a Task (see example 4 about Task objects).

  1. A task that calls bash commands for you
  2. A task that calls a python function for you

Since we want to call mdtraj functions we use the 2nd and start with a skeleton for this type and store it under my_generator.py


In [ ]:
%%file my_generator.py
# This is an example for building your own generator
# This file must be added to the project so that it is loaded
# when you import `adaptivemd`. Otherwise your workers don't know
# about the class!

from adaptivemd import Generator

class MDTrajFeaturizer(Generator):
    def __init__(self, {things we always need}):
        super(PyEMMAAnalysis, self).__init__()

        # stage file you want to reuse (optional)
        # self['pdb_file'] = pdb_file
        # stage = pdb_file.transfer('staging:///')
        # self['pdb_file_stage'] = stage.target
        # self.initial_staging.append(stage)

    @staticmethod
    def then_func(project, task, data, inputs):
        # add the output for later reference
        project.data.add(data)

    def execute(self, {options per task}):

        t = PythonTask(self)

        # get your staged files (optional)
        # input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')
        
        # add the python function call to your script (there can be only one!)
        t.call(
            my_script,
            param1,
            param2,
            ...
        )

        return t
        
def my_script(param1, param2, ...):
    return {"whatever you want to return"}

What input does our generator always need?

Mdtraj needs a topology unless it is already present. Interestingly, our Trajectory objects know about their topology so we could access these, if our function is to process a Trajectory. This requires the Trajectory to be the input. If we want to process any file, then we might need a topology.

The decision if we want the generator to work for a fixed topology is yours. To show how this would work, we do this here. We use a fixed topology per generator that applies to File objects.

Second is the feature we want to compute. This is tricky and so we hard code this now. You can think of a better way to represent this. But let's pick the tertiary stucture prediction


In [ ]:
def __init__(self, pdb_file=None):
    super(PyEMMAAnalysis, self).__init__()

    # if we provide a pdb_file it should be used
    if pdb_file is not None:
        # stage file you want to reuse (optional)

        # give the file an internal name
        self['pdb_file'] = pdb_file
        # create the transfer from local to staging:
        stage = pdb_file.transfer('staging:///')
        # give the staged file an internal name
        self['pdb_file_stage'] = stage.target
        # append the transfer action to the initial staging action list
        self.initial_staging.append(stage)

The task building


In [ ]:
def execute(self, file_to_analyze):
    
    assert(isinstance(file_to_analyze, File))

    t = PythonTask(self)

    # get your staged files (optional)
    if self.get('pdb_file_stage'):
        input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')
    else:
        input_pdb = None

    # add the python function call to your script (there can be only one!)
    t.call(
        my_script,
        file_to_analyze,
        input_pdb
    )

    return t

The actual script

This script is executed on the HPC for you. And requires mdtraj to be installed on it.


In [ ]:
def my_script(file_to_analyze, input_pdb):
    import mdtraj as md
    
    traj = md.load(file_to_analyze, top=input_pdb)
    features = traj.compute_xyz()
    
    return features

That's it. At least in the simplest form. When you use this to create a Task


In [ ]:
my_generator = MDTrajFeaturizer(pdb_file)
task = my_generator.execute(traj.file('master.dcd'))
project.queue(task)

We wait and then the Task object has a .output property which now contains the returned result.

This can now be used in your execution plans...


In [ ]:
def strategy():
    # generate some structures...
    # yield wait ...
    # get a traj object
    task = my_generator.execute(traj.outputs('master'))
    # wait until the task is done
    yield task.is_done
    # print the output
    output = task.output
    # do something with the result, store in the DB, etc...

Next, we look at improvements

Better storing of results

Often you want to save the output from your function in the DB in some form or another. Though the output is stored, it is not conviniently accessed unless you know the task that was used.

For this reason there is a callback function you can set, that can take care of doing a custom handling of the output. The function to be called needs to be a method of the generator and you can give the task the name of the method. The name (str) of the funtion can be set using the then() command. An the default name is then_func.


In [ ]:
def execute(self, ...):
    t = PythonTask(self)
    t.then('handle_my_output')
    
@staticmethod
def handle_my_output(project, task, data, inputs):
    print 'Saving data from task', task, 'into model'
    m = Model(data)
    project.model.add(m)

The function takes exactly 4 parameters

  1. project: the project in which the task was run. Is used to access the database, etc
  2. task: the actual task object that produced the output
  3. data: the output returned by the function
  4. inputs: the input to the python function call (internally). The data actually transmitted to the worker to run

Like in the above example you can do whatever you want with your data, store it, alter it, write it to a file, etc. In case you do not want to additionally save the output (data) in the DB as an object, you can tell the trask not to by setting


In [ ]:
def execute(self, ...):
    t = PythonTask(self)
    t.then('handle_my_output')
    t.store_output = False  # default is `True`

in that case .output will stay None even after execution

Working with Trajectory files and get their properties

Note that you always have to write file generation and file analysis/reading that matches. We only store some very general properties of objects with them, e.g. a stride for trajectories. This means you cannot arbitrarily mix code for these.

Now we want that this works


In [ ]:
my_generator.execute(traj)

This is rather simple: All you need to do is to extract the actual files from the trajectory object.


In [ ]:
def __init__(self, outtype, pdb_file=None):
    super(PyEMMAAnalysis, self).__init__()
    
    # we store a str that holds the name of the outputtype
    # this must match the definition 
    self.outtype = outtype

    # ...

def execute(self, traj, *args, **kwargs):
    t = PythonTask(self)
    # ...
    file_location = traj.outputs(self.outtype)  # get the trajectory file matching outtype
    # use the file_location.
    
    # ...

Import! You have no access to the Trajectory object in our remove function. These will be converted to a real path relative to the working directory. This makes sure that you will not have to deal with prefixes, etc. This might change in the future, but. The scripts are considered independent of adaptivemd!

Problem with saving your generator to the DB

This is not complicated but you need to briefly learn about the mechanism to store complex Python objects in the DB. The general way to Store an instance of a class requires you to subclass from adaptivemd.mongodb.StorableMixin. This provides the class with a __uuid__ attribute that is a unique number for each storable object that is given at creation time. (If we would just store objects using pymongo we would get a number like this, but later). Secondly, it add two functions

  1. to_dict(): this converts the (immutable) state of the object into a dictionary that is simple enough that it can be stored. Simple enought means, that you can have Python primitives, things like numpy arrays or even other storable objects, but not arbitrary objects in it, like lambda constructs (these are possible but need special treatment)
  2. from_dict(): The reverse. It takes the dictionary from to_dict and must return an equivalent object!

So, you can do

clone = obj.__class__.from_dict(obj.to_dict())

and get an equal object in that it has the same attributes. You could also say a deep copy.

This is not always trivial and there exists a default implementation, which will make an additional assumption:

All necessary attributes have the same parameters in __init__. So, this would correspond to this rule


In [ ]:
class MyStorableObject(StorableMixin):
    def __init__(self, state):
        self.state = state

while this would not work


In [ ]:
class MyStorableObject(StorableMixin):
    def __init__(self, initial_state):
        self.state = initial_state

In the second case you need to overwrite the default function. All of these will work


In [ ]:
# fix `to_dict` to match default `from_dict`
class MyStorableObject(StorableMixin):
    def __init__(self, initial_state):
        self.state = initial_state
        
    def to_dict(self):
        return {
            'initial_state': self.state 
        }

In [ ]:
# fix `from_dict` to match default `to_dict`
class MyStorableObject(StorableMixin):
    def __init__(self, initial_state):
        self.state = initial_state
        
    @classmethod
    def from_dict(cls, dct):
        return cls(initial_state=dct['state'])

In [ ]:
# fix both `from_dict` and `to_dict`
class MyStorableObject(StorableMixin):
    def __init__(self, initial_state):
        self.state = initial_state

    def to_dict(self):
        return {
            'my_state': self.state 
        }
        
    @classmethod
    def from_dict(cls, dct):
        return cls(initial_state=dct['my_state'])

If you do that, make sure that you really capture all variables. Especially if you subclass from an existing one. You can use super to access the result from the parent class


In [ ]:
class MyStorableObject(StorableMixin):
    @classmethod
    def from_dict(cls, dct):
        obj = super(MyStorableObject, cls).from_dict(dct)
        obj.missing_attr1 = dct['missing_attr_key1']
        return obj

    def to_dict(self):
        dct = super(MyStorableObject, self).to_dict(self)
        dct.update({
            'missing_attr_key1': self.missing_attr1
        })
        return dct

This is the recommended way to build your custom functions. For completeness we show here what the base TaskGenerator class will do


In [ ]:
@classmethod
def from_dict(cls, dct):
    obj = cls.__new__(cls)
    StorableMixin.__init__(obj)
    obj._items = dct['_items']
    obj.initial_staging = dct['initial_staging']
    return obj

def to_dict(self):
    return {
        '_items': self._items,
        'initial_staging': self.initial_staging
    }

The only unfamiliar part is the

obj = cls.__new__(cls)
StorableMixin.__init__(obj)

which needs a little explanation.

In most __init__ functions for a TaskGenerator you will construct the initial_staging attribute with some functions. If you would reconstruct by just calling the constructor with the same parameters again, this would result in an equal object as expected and that would work, but not in all regards as expected: The problem is that if you generate objects that can be stored, these will get new UUIDs and hence are considered different from the ones that you wanted to store. In short, the construction in the __init__ prevents you from getting the real old object back, you always construct something new.

This can be solved by not using __init__ but creating an empty object using __new__ and then fixing all attributes to the original state. This is very similar to __setstate__ which we do not use in general to still allow using __init__ which makes sense in most cases where not storable objects are generated.

In the following we discuss an existing generator

A simple generator


In [ ]:

A word about this example. While a Task can be created and configured a new class in adaptivemd needs to be part of the project. So we will write discuss the essential parts of the existing code.

A generator is in essence a factory to create Task objects with a single command. A generator can be initialized with certain files that the created tasks will always need, like an engine will need a topology for each task, etc. It also (as explained briefly before in Example 4) knows about certain callback behaviour of their tasks. Last, a generator allows you to assign a worker only to tasks that were created by a generator.

The execution structure

Let's look at the code of the PyEMMAAnalysis

class PyEMMAAnalysis(Analysis):
    def __init__(self, pdb_file):
        super(PyEMMAAnalysis, self).__init__()

        self['pdb_file'] = pdb_file
        stage = pdb_file.transfer('staging:///')

        self['pdb_file_stage'] = stage.target
        self.initial_staging.append(stage)

    @staticmethod
    def then_func(project, task, model, inputs):
        # add the input arguments for later reference
        model.data['input']['trajectories'] = inputs['files']
        model.data['input']['pdb'] = inputs['topfile']
        project.models.add(model)

    def execute(
            self,
            trajectories,
            tica_lag=2,
            tica_dim=2,
            msm_states=5,
            msm_lag=2,
            stride=1):

        t = PythonTask(self)

        input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')
        t.call(
            remote_analysis,
            trajectories=list(trajectories),
            topfile=input_pdb,
            tica_lag=tica_lag,
            tica_dim=tica_dim,
            msm_states=msm_states,
            msm_lag=msm_lag,
            stride=stride
        )

        return t
def __init__(self, pdb_file):
    # don't forget to call super
    super(PyEMMAAnalysis, self).__init__()  

    # a generator also acts like a dictionary for files
    # this way you can later access certain files you might need

    # save the pdb_file under the same name
    self['pdb_file'] = pdb_file  

    # this creates a transfer action like it is used in tasks
    # and moves the passed pdb_file (usually on the local machein)
    # to the staging_area root directory
    stage = pdb_file.transfer('staging:///')

    # and the new target file (which is also like the original) 
    # on the staging_area is saved unter `pdb_file_stage`
    # so, we can access both files if we wanted to
    # note that the original file most likely is in the DB
    # so we could just skip the stage transfer completely
    self['pdb_file_stage'] = stage.target

    # last we add this transfer to the initial_staging which
    # is done only once per used generator
    self.initial_staging.append(stage)
# the kwargs is to keep the exmaple short, you should use explicit
# parameters and add appropriate docs
def execute(self, trajectories, **kwargs):
    # create the task and set the generator to self, our new generator
    t = PythonTask(self)

    # we want to copy the staged file to the worker directory
    # and name it `input.pdb`
    input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')

    # if you chose not to use the staging file and copy it directly you
    # would use in analogy
    # input_pdb = t.link(self['pdb_file'], 'input.pdb')

    # finally we use `.call` and want to call the `remote_analysis` function
    # which we imported earlier from somewhere
    t.call(
        remote_analysis,
        trajectories=list(trajectories),
        **kwargs
    )

    return t

And finally a call_back function. The name then_func is the default function name to be called.

# we use a static method, but you can of course write a normal method
@staticmethod
# the call_backs take these arguments in this order
# the second parameter is actually a `Model` object in this case
# which has a `.data` attribute
def then_func(project, task, model, inputs):
    # add the input arguments for later reference to the model
    model.data['input']['trajectories'] = inputs['kwargs']['files']
    model.data['input']['pdb'] = inputs['kwargs']['topfile']
    # and save the model in the project
    project.models.add(model)

A brief summary and things you need to set to make your generator work

class MyGenerator(Analysis):
    def __init__(self, {things your generator always needs}):
        super(MyGenerator, self).__init__()

        # Add input files to self
        self['file1'] = file1

        # stage all files to the staging area of you want to keep these
        # files on the HPC
        for fn in ['file1', 'file2', ...]:
            stage = self[fn].transfer('staging:///')
            self[fn + '_stage'] = stage.target
            self.initial_staging.append(stage)

    @staticmethod
    def then_func(project, task, outputs, inputs):
        # do something with input and outputs
        # store something in your project

    def task_using_python_rpc(
            self,
            {arguments}):

        t = PythonTask(self)

        # set any task dependencies if you need
        t.dependencies = []

        input1 = t.link(self['file1'], 'alternative_name1')
        input2 = t.link(self['file2'], 'alternative_name2')
        ...

        # add whatever bash stuff you need BEFORE the function call
        t.append('some bash command')
        ...

        # use input1, etc in your function call if you like. It will
        # be converted to a regular file location you can use
        t.call(
            {my_remote_python_function},
            files=list(files),
        )

        # add whatever bash stuff you need AFTER the function call
        t.append('some bash command')
        ...

        return t

    def task_using_bash_argument_call(
            self,
            {arguments}):

        t = Task(self)

        # set any task dependencies if you need
        t.dependencies = []

        input1 = t.link(self['file1'], 'alternative_name1')
        input2 = t.link(self['file2'], 'alternative_name2')
        ...
        # add more staging
        t.append({action})
        ...

        # add whatever bash stuff you want to do
        t.append('some bash command')
        ...

        # add whatever staging stuff you need AFTER the function call
        t.append({action})
        ...

        return t

The simplified code for the OpenMMEngine

class OpenMMEngine(Engine):
    trajectory_ext = 'dcd'

    def __init__(self, system_file, integrator_file, pdb_file, args=None):
        super(OpenMMEngine, self).__init__()

        self['pdb_file'] = pdb_file
        self['system_file'] = system_file
        self['integrator_file'] = integrator_file
        self['_executable_file'] = exec_file

        for fn in self.files:
            stage = self[fn].transfer(Location('staging:///'))
            self[name + '_stage'] = stage.target
            self.initial_staging.append(stage)

        if args is None:
            args = '-p CPU --store-interval 1'

        self.args = args

    # this one only works if you start from a file
    def task_run_trajectory_from_file(self, target):
        # we create a special Task, that has some additional functionality
        t = TrajectoryGenerationTask(self, target)

        # link all the files we require
        initial_pdb = t.link(self['pdb_file_stage'], Location('initial.pdb'))
        t.link(self['system_file_stage'])
        t.link(self['integrator_file_stage'])
        t.link(self['_executable_file_stage'])

        # use the initial PDB to be used
        input_pdb = t.get(target.frame, 'coordinates.pdb')

        # this represents our output trajectory
        output = Trajectory('traj/', target.frame, length=target.length, engine=self)

        # create the directory so openmmrun can write to it
        t.touch(output)

        # build the actual bash command
        cmd = 'python openmmrun.py {args} -t {pdb} --length {length} {output}'.format(
            pdb=input_pdb,
            length=target.length,
            output=output,
            args=self.args,
        )
        t.append(cmd)

        # copy the resulting trajectory directory back to the staging area
        t.put(output, target)

        return t

In [ ]:
project.close()