In [1]:
import sys, os
In [2]:
from adaptivemd import (
Project, Task, File, PythonTask
)
Let's open our test
project by its name. If you completed the first examples this should all work out of the box.
In [3]:
project = Project('tutorial')
Open all connections to the MongoDB
and Session
so we can get started.
Let's see again where we are. These numbers will depend on whether you run this notebook for the first time or just continue again. Unless you delete your project it will accumulate models and files over time, as is our ultimate goal.
In [4]:
print project.files
print project.generators
print project.models
Now restore our old ways to generate tasks by loading the previously used generators.
In [5]:
engine = project.generators['openmm']
modeller = project.generators['pyemma']
pdb_file = project.files['initial_pdb']
A task is in essence a bash script-like description of what should be executed by the worker. It has details about files to be linked to the working directory, bash commands to be executed and some meta information about what should happen in case we succeed or fail.
Let's first explain briefly how a task is executed and what its components are. This was originally build so that it is compatible with radical.pilot and still is. So, if you are familiar with it, all of the following information should sould very familiar.
A task is executed from within a unique directory that only exists for this particular task. These are located in adaptivemd/workers/
and look like
worker.0x5dcccd05097611e7829b000000000072L/
the long number is a hex representation of the UUID of the task. Just if you are curious type
print hex(my_task.__uuid__)
Then we change directory to this folder write a running.sh
bash script and execute it. This script is created from the task definition and also depends on your resource setting (which basically only contain the path to the workers directory, etc)
The script is divided into 1 or 3 parts depending on which Task
class you use. The main Task
uses a single list of commands, while PrePostTask
has the following structure
Pre-Exec: Things to happen before the main command (optional)
Main: the main commands are executed
Post-Exec: Things to happen after the main command (optional)
Okay, lots of theory, now some real code for running a task that generated a trajectory
In [6]:
task = engine.task_run_trajectory(project.new_trajectory(pdb_file, 100))
In [7]:
task.script
Out[7]:
We are linking a lot of files to the worker directory and change the name for the .pdb in the process. Then call the actual python
script that runs openmm. And finally move the output.dcd
and the restart file back tp the trajectory folder.
There is a way to list lot's of things about tasks and we will use it a lot to see our modifications.
In [8]:
print task.description
As long as a task is not saved and hence placed in the queue, it can be altered in any way. All of the 3 / 5 phases can be changed separately. You can add things to the staging phases or bash phases or change the command. So, let's do that now
First, a Task
is very similar to a list of bash commands and you can simply append (or prepend) a command. A text line will be interpreted as a bash command.
In [9]:
task.append('echo "This new line is pointless"')
In [10]:
print task.description
As expected this line was added to the end of the script.
To set staging is more difficult. The reason is, that you normally have no idea where files are located and hence writing a copy or move is impossible. This is why the staging commands are not bash lines but objects that hold information about the actual file transaction to be done. There are some task methods that help you move files but also files itself can generate this commands for you.
Let's move one trajectory (directory) around a little more as an example
In [11]:
traj = project.trajectories.one
In [12]:
transaction = traj.copy()
print transaction
This looks like in the script. The default for a copy is to move a file or folder to the worker directory under the same name, but you can give it another name/location if you use that as an argument. Note that since trajectories are a directory you need to give a directory name (which end in a /
)
In [13]:
transaction = traj.copy('new_traj/')
print transaction
If you want to move it not to the worker directory you have to specify the location and you can do so with the prefixes (shared://
, sandbox://
, staging://
as explained in the previous examples)
In [14]:
transaction = traj.copy('staging:///cached_trajs/')
print transaction
Besides .copy
you can also .move
or .link
files.
In [15]:
transaction = pdb_file.copy('staging:///delete.pdb')
print transaction
transaction = pdb_file.move('staging:///delete.pdb')
print transaction
transaction = pdb_file.link('staging:///delete.pdb')
print transaction
Let's mention these because they require special treatment. We cannot (like RP can) copy files to the HPC, we need to store them in the DB first.
In [16]:
new_pdb = File('file://../files/ntl9/ntl9.pdb').load()
Make sure you use file://
to indicate that you are using a local file. The above example uses a relative path which will be replaced by an absolute one, otherwise we ran into trouble once we open the project at a different directory.
In [17]:
print new_pdb.location
Note that now there are 3 /
in the filename, two from the ://
and one from the root directory of your machine
The load()
at the end really loads the file and when you save this File
now it will contain the content of the file. You can access this content as seen in the previous example.
In [18]:
print new_pdb.get_file()[:300]
For local files you normally use .transfer
, but copy
, move
or link
work as well. Still, there is no difference since the file only exists in the DB now and copying from the DB to a place on the HPC results in a simple file creation.
Now, we want to add a command to the staging and see what happens.
In [19]:
transaction = new_pdb.transfer()
print transaction
In [20]:
task.append(transaction)
In [21]:
print task.description
We now have one more transfer command. But something else has changed. There is one more files listed as required. So, the task can only run, if that file exists, but since we loaded it into the DB, it exists (for us). For example the newly created trajectory 25.dcd
does not exist yet. Would that be a requirement the task would fail. But let's check that it exists.
In [22]:
new_pdb.exists
Out[22]:
Okay, we have now the PDB file staged and so any real bash commands could work with a file ntl9.pdb
. Alright, so let's output its stats.
In [23]:
task.append('stat ntl9.pdb')
Note that usually you place these stage commands at the top or your script.
Now we could run this task, as before and see, if it works. (Make sure you still have a worker running)
In [24]:
project.queue(task)
And check, that the task is running
In [33]:
task.state
Out[33]:
If we did not screw up the task, it should have succeeded and we can look at the STDOUT.
In [34]:
print task.stdout
Well, great, we have the pointless output and the stats of the newly staged file ntl9.pdb
Just for fun let's create the same scheduler that the adaptivemdworker
uses, but from inside this notebook.
In [35]:
from adaptivemd import WorkerScheduler
In [36]:
sc = WorkerScheduler(project.resource)
If you really wanted to use the worker you need to initialize it and it will create directories and stage files for the generators, etc. For that you need to call sc.enter(project)
, but since we only want it to parse our tasks, we only set the project without invoking initialization. You should normally not do that.
In [37]:
sc.project = project
Now we can use a function .task_to_script
that will parse a task into a bash script. So this is really what would be run on your machine now.
In [38]:
print '\n'.join(sc.task_to_script(task))
Now you see that all file paths have been properly interpreted to work. See that there is a comment about a temporary file from the DB that is then renamed. This is a little trick to be compatible with RPs way of handling files. (TODO: We might change this to just write to the target file. Need to check if that is still consistent)
One problem with bash scripts is that when you create the tasks you have no concept on where the files actually are located. To get around this the created bash script will be scanned for paths, that contain prefixed like we are used to and are interpreted in the context of the worker / scheduler. The worker is the only instance to know all that is necessary so this is the place to fix that problem.
Let's see that in a little example, where we create an empty file in the staging area.
In [39]:
task = Task()
task.append('touch staging:///my_file.txt')
In [40]:
print '\n'.join(sc.task_to_script(task))
And voila, the path has changed to a relative path from the working directory of the worker. Note that you see here the line we added in the very beginning of example 1 to our resource!
If you want to start a new task you can begin with
In [41]:
task = Task()
as we did before.
Just start adding staging and bash commands and you are done. When you create a task you can assign it a generator, then the system will assume that this task was generated by that generator, so don't do it for you custom tasks, unless you generated them in a generator. Setting this allows you to tell a worker only to run tasks of certain types.
The tasks so far a very powerful, but they lack the possibility to call a python function. Since we are using python here, it would be great to really pretend to call a python function from here and not taking the detour of writing a python bash executable with arguments, etc... An example for this is the PyEmma generator which uses this capability.
Let's do an example of this as well. Assume we have a python function in a file (you need to have your code in a file so far so that we can copy the file to the HPC if necessary). Let's create the .py
file now.
In [42]:
%%file my_rpc_function.py
def my_func(f):
import os
print f
return os.path.getsize(f)
Now create a PythonTask instead
In [43]:
task = PythonTask()
and the call function has changed. Note that also now you can still add all the bash and stage commands as before. A PythonTask is also a subclass of PrePostTask
so we have a .pre
and .post
phase available.
In [44]:
from my_rpc_function import my_func
We call the function my_func
with one argument
In [45]:
task.call(my_func, f=project.trajectories.one)
In [46]:
print task.description
Well, interesting. What this actually does is to write the input arguments to the function into a temporary .json
file on the worker, (in RP on the local machine and then transfers it to remote), rename it to input.json
and read it in the _run_.py
. This is still a little clumsy, but needs to be this way to be RP compatible which only works with files! Look at the actual script.
You see, that we really copy the .py
file that contains the source code to the worker directory. All that is done automatically. A little caution on this. You can either write a function in a single file or use any installed package, but in this case the same package needs to be installed on the remote machine as well!
Let's run it and see what happens.
In [47]:
project.queue(task)
And wait until the task is done
In [48]:
project.wait_until(task.is_done)
The default settings will automatically save the content from the resulting output.json in the DB an you can access the data that was returned from the task at .output
. In our example the result was just the size of a the file in bytes
In [49]:
task.output
Out[49]:
And you can use this information in an adaptive script to make decisions.
The last thing we did not talk about is the possibility to also call a function with the returned data automatically on successful execution. Since this function is executed on the worker we (so far) only support function calls with the following restrictions.
PythonTask(generator)
task.then_func_name
. So you can write a generator class with several possible outcomes and chose the function for each task.Generator
needs to be part of adaptivemd
So in the case of modeller.execute
we create a PythonTask
that references the following functions
In [50]:
task = modeller.execute(project.trajectories)
In [51]:
task.then_func_name
Out[51]:
So we will call the default then_func
of modeller or the class modeller is of.
In [52]:
help(modeller.then_func)
These callbacks are called with the current project, the resulting data (which is in the modeller case a Model
object) and array of initial inputs.
This is the actual code of the callback
@staticmethod
def then_func(project, task, model, inputs):
# add the input arguments for later reference
model.data['input']['trajectories'] = inputs['kwargs']['files']
model.data['input']['pdb'] = inputs['kwargs']['topfile']
project.models.add(model)
All it does is to add some of the input parameters to the model for later reference and then store the model in the project. You are free to define all sorts of actions here, even queue new tasks.
Next, we will talk about the factories for Task
objects, called generators
. There we will actually write a new class that does some stuff with the results.
In [53]:
project.close()
In [ ]: