This notebook aims to show, through a series of examples, how to set up a metabolomics workflow using the Chronos REST API. As benchmark case we use a R-based pipeline by the Kultima lab. The aim of this pipeline is to:
The code snippets in this notebook use Python to consume the Chronos REST API, in order to set up a Direct Acyclic Graph (DAG), which defines the workflow. Each node in the DAG represents a microservice that performs a specific task. Once the DAG is properly setup, Chronos will figure out the dependencies between the various microservices, running them in the correct order, and keeping them alive only for the time they are needed. Furthermore, independent microservices will be run in parallel.
Chronos REST API calls define nodes in the DAG. REST calls are performed through HTTP requests on some well-defined URLs, which contain arguments in JSON format. The Chronos REST API is documented in this page.
In [ ]:
control=input()
In [ ]:
import getpass
password=getpass.getpass()
In [ ]:
import urllib.request
urllib.request.urlretrieve(
"https://raw.githubusercontent.com/phnmnl/workflow-demo/master/data/inputdata_workshop.xls", # mdownload URL
"inputdata_workshop.xls" # local path
)
In [ ]:
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # suppress warnings
When perfoming any mass spectrometry (MS) study, it happens that you get plastic or other contaminent within your samples. When you furher analyze your samples with MS, these contaminant will be recorded together with the metabolites. To be able to detect and filter these out you often add blank samples (samples with only DMSO in them) in between the normal samples in the runorder.
In this step we aim to remove the contaminants detected in the blanks, from the rest of our samples. The theory behind it is to remove everything that has an intensity of X in the blanks compared to the samples. For example, everything that, in the blanks, has an intensity of 1% or higher of the intensities in the other samples.
The input data in the prerequisites will be used here as input. Hence, we need to mount the Jupyther working directory (/mnt/container-volumes/jupyter) as a volume in the docker container. Please go through the following code snippet, and use the Chronos REST API documentation, to figure out the meaning of the JSON data that is sent with the HTTP request. Once you are done with that please run the snippet and check the Chronos interface (which you can access throug the MANTL UI).
In [ ]:
url="https://admin:"+password+"@"+control+"/chronos/scheduler/iso8601"
json="""
{
"schedule" : "R0/2030-01-01T12:00:00Z/PT1H",
"cpus": "0.25",
"mem": "128",
"epsilon" : "PT10M",
"name" : "blank-filter",
"container": {
"type": "DOCKER",
"image": "farmbio/blankfilter",
"volumes": [{
"hostPath": "/mnt/container-volumes/jupyter",
"containerPath": "/data",
"mode": "RW"
}]
},
"command" : "Rscript BlankFilter.r /data/inputdata_workshop.xls /data/output_BlankFilter.xls",
"owner" : "user@example.com"
}
"""
response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)
print("HTTP response code: " + str(response.status_code))
N.B. You can ignore any warning about unverified HTTP requests. Response code 204 means that the REST call succeed.
In MS studies you may, if you have many samples, prepare the samples in batches. By doing so you introduce a risk of having features that are unique within a batch, which is not desirable.
In this step we remove features that have a coverage of 80% within one batch, but not in any other. In other words, here we remove batch specific features.
The input data of this step comes form the blank filter, hence in the JSON parameters we will set the previous step as parent. In this way Chronos will make sure to run the microservices in the correct order. Please go through the following code snippet, run it and check the Chronos interface.
In [ ]:
url="https://admin:"+password+"@"+control+"/chronos/scheduler/dependency"
json="""
{
"parents" : ["blank-filter"],
"cpus": "0.25",
"mem": "128",
"epsilon" : "PT10M",
"name" : "batchfeature-removal",
"container": {
"type": "DOCKER",
"image": "farmbio/batchfeatureremoval",
"volumes": [{
"hostPath": "/mnt/container-volumes/jupyter",
"containerPath": "/data",
"mode": "RW"
}]
},
"command" : "Rscript BatchfeatureRemoval.r /data/output_BlankFilter.xls /data/output_BatchfeatureRemoval.xls",
"owner" : "user@example.com"
}
"""
response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)
print("HTTP response code: " + str(response.status_code))
In [ ]:
url="https://admin:"+password+"@"+control+"/chronos/scheduler/dependency"
json="""
{
"parents" : ["batchfeature-removal"],
"cpus": "0.25",
"mem": "128",
"epsilon" : "PT10M",
"name" : "log2-transformation",
"container": {
"type": "DOCKER",
"image": "farmbio/log2transformation",
"volumes": [{
"hostPath": "/mnt/container-volumes/jupyter",
"containerPath": "/data",
"mode": "RW"
}]
},
"command" : "Rscript log2transformation.r /data/output_BatchfeatureRemoval.xls /data/output_log2transformation.xls",
"owner" : "user@example.com"
}
"""
response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)
print("HTTP response code: " + str(response.status_code))
In order to parallelize microservices, we need to split the data into the subsets that will be processed simultaneously. In this step we divide the data basing on the first five letters of the sample names.
Please go through the following code snippet, run it and check the Chronos interface.
In [ ]:
url="https://admin:"+password+"@"+control+"/chronos/scheduler/dependency"
json="""
{
"parents" : ["log2-transformation"],
"cpus": "0.25",
"mem": "128",
"epsilon" : "PT10M",
"name" : "splitter",
"container": {
"type": "DOCKER",
"image": "farmbio/splitter",
"volumes": [{
"hostPath": "/mnt/container-volumes/jupyter",
"containerPath": "/data",
"mode": "RW"
}]
},
"command" : "Rscript Splitter.r /data/output_log2transformation.xls /data/output_splitter",
"owner" : "user@example.com"
}
"""
response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)
print("HTTP response code: " + str(response.status_code))
In this step we calculate the coefficient of variation for each feature present within the samples.
In the previous step we divided the working set by sample. It is very convenient now to run multiple instances of the CV microservice in parallel, in order to save running time. In the code snippet we use the header of the inputdata_workshop.xls file to figure out the file names that will come out of the previous step. Then for each file we submit a new job to Chronos.
Please go through the following code snippet, run it and check the Chronos interface.
In [ ]:
import os
os.mkdir("output_cv") # Create a folder for CV output
# Figure out samples from the header of the input file
with open('inputdata_workshop.xls', 'r') as f: header = f.readline()
samples = list(set(map(lambda s: s[1:-1][:5],header.split("\t"))))
samples.remove('BLANK') # BLANK doesn't need to be processed
# Create a microservice for each sample
url="https://admin:"+password+"@"+control+"/chronos/scheduler/dependency"
for s in samples:
json="""
{
"parents" : ["splitter"],
"cpus": "0.25",
"mem": "128",
"epsilon" : "PT10M",
"name" : "cv-%(jobname)s",
"container": {
"type": "DOCKER",
"image": "farmbio/cv",
"volumes": [{
"hostPath": "/mnt/container-volumes/jupyter",
"containerPath": "/data",
"mode": "RW"
}]
},
"command" : "Rscript CV.r /data/output_splitter/%(sample)s.xls /data/output_cv/%(sample)s_cv.xls",
"owner" : "user@example.com"
}
""" % {"sample" : s, "jobname": s.replace(".", "_")}
response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)
print(s + " HTTP response code: " + str(response.status_code))
In step 5 we processed many samples in parallel, and before proceding to the feature selection we need to merge them again in a single file. This step can be run only after all of the jobs that have been generated by the previous step are finished. Hence, in the JSON we will specify all the jobs from step 5 in the parent field.
Please go through the following code snippet, run it and check the Chronos interface.
In [ ]:
url="https://admin:"+password+"@"+control+"/chronos/scheduler/dependency"
# Format job names from step 5 in JSON array format
jobNames = "[" + ",".join(map(lambda s: '"cv-'+s.replace(".", "_")+'"',samples)) + "]"
json="""
{
"parents" : %(parents)s,
"cpus": "0.25",
"mem": "128",
"epsilon" : "PT10M",
"name" : "merger",
"container": {
"type": "DOCKER",
"image": "farmbio/merger",
"volumes": [{
"hostPath": "/mnt/container-volumes/jupyter",
"containerPath": "/data",
"mode": "RW"
}]
},
"command" : "Rscript Merger.r /data/output_cv /data/output_Merger.xls",
"owner" : "user@example.com"
}
""" % {"parents" : jobNames}
#print("HTTP response code: " + json)
response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)
print("HTTP response code: " + str(response.status_code))
In [ ]:
url="https://admin:"+password+"@"+control+"/chronos/scheduler/dependency"
json="""
{
"parents" : ["merger"],
"cpus": "0.25",
"mem": "128",
"epsilon" : "PT10M",
"name" : "feature-selection",
"container": {
"type": "DOCKER",
"image": "farmbio/featureselection",
"volumes": [{
"hostPath": "/mnt/container-volumes/jupyter",
"containerPath": "/data",
"mode": "RW"
}]
},
"command" : "Rscript FeatureSelection.r /data/output_log2transformation.xls /data/output_Merger.xls /data/output_FeatureSelection.xls",
"owner" : "user@example.com"
}
"""
response=requests.post(url, headers = {'content-type' : 'application/json'}, data=json, verify=False)
print("HTTP response code: " + str(response.status_code))