New experimental MS2LDA workflow

Based on Luigi, a Python-based pipeline engine. Also see the slides here.

Pros:

  • Modular
  • Neat?
  • Any pipeline we code from scratch is probably going to end up like this eventually, so this perhaps saves some time.
  • Future-proof --- has a lot of support for big data stuff like hdfs, map-reduce etc, although we can ignore them for now.

Cons:

  • Over-engineered?
  • Introduces another dependency on the pipeline engine. But unlike celery, this is very easy to install, just pip install luigi and that's all.
  • Error messages not very informative

In [1]:
import luigi as lg
import json
import pickle

import sys
basedir = '/Users/joewandy/git/lda/code/'
sys.path.append(basedir)

from multifile_feature import SparseFeatureExtractor
from lda import MultiFileVariationalLDA

These are what we want from the new pipeline.

  1. Extracting spectra from the mzml, mzxml files
  2. Grouping features and exporting in a format that MS2LDA can deal with
  3. Running MS2LDA
  4. Visualising

with possible variations on step (1), (2) and maybe even (3) too.

Example Step 1

Below we define an example class to load existing CSV files that have been created before from an mzxml/mzml (method 3) pair. However you can imagine that here we provide different implementations to load mztab, MSP-style files, whatever.


In [2]:
class ExtractSpectra(lg.Task):

    datadir = lg.Parameter()
    prefix = lg.Parameter()

    def run(self):
        # we could actually extract the spectra from mzxml, mzml files here
        print 'Processing %s and %s' % (datadir, prefix)
    
    def output(self):        
        out_dict = {
            'ms1': lg.LocalTarget(self.datadir + self.prefix + '_ms1.csv'), 
            'ms2': lg.LocalTarget(self.datadir + self.prefix + '_ms2.csv') 
        }
        return out_dict

Example Step 2

Similarly here we define a class to take the output of the ExtractSpectra class above (the dependency is defined in the requires method below), performs the grouping by detecting gaps along the groups (defined in the run method) and produces the output to a pickled file (defined in the output method).

It would be easy to provide different implementations here based on other methods of grouping as well.

I just copied and pasted .. the code below could be shorter.


In [3]:
class GroupFeatures(lg.Task):
    
    scaling_factor = lg.IntParameter(default=1000)
    fragment_grouping_tol = lg.IntParameter(default=7)
    loss_grouping_tol = lg.IntParameter(default=7)
    loss_threshold_min_count = lg.IntParameter(default=5)
    loss_threshold_max_val = lg.IntParameter(default=200)
    loss_threshold_min_val = lg.IntParameter(default=0)

    datadir = lg.Parameter()
    prefixes = lg.ListParameter()
    
    def requires(self):
        return [ExtractSpectra(datadir=datadir, prefix=prefix) for prefix in self.prefixes]
    
    def run(self):

        # input_set is a list of tuples of (ms1, ms2)
        input_set = []
        for out_dict in self.input():
            ms1 = out_dict['ms1'].path
            ms2 = out_dict['ms2'].path
            items = (ms1, ms2)
            input_set.append(items)

        # performs the grouping here
        extractor = SparseFeatureExtractor(input_set, self.fragment_grouping_tol, self.loss_grouping_tol, 
                                           self.loss_threshold_min_count, self.loss_threshold_max_val,
                                           self.loss_threshold_min_val,
                                           input_type='filename')

        fragment_q = extractor.make_fragment_queue()
        fragment_groups = extractor.group_features(fragment_q, extractor.fragment_grouping_tol)

        loss_q = extractor.make_loss_queue()
        loss_groups = extractor.group_features(loss_q, extractor.loss_grouping_tol, 
                                               check_threshold=True)

        extractor.create_counts(fragment_groups, loss_groups, self.scaling_factor)
        mat, vocab, ms1, ms2 = extractor.get_entry(0)
            
        global_word_index = {}
        for i,v in enumerate(vocab):
            global_word_index[v] = i
            
        corpus_dictionary = {}    
        for f in range(extractor.F):
            print "Processing file {}".format(f)
            corpus = {}
            mat, vocab, ms1, ms2 = extractor.get_entry(f)
            n_docs,n_words = mat.shape
            print n_docs,n_words
            d_pos = 0
            for d in ms1.iterrows():
                doc_name = "{}_{}".format(d[1]['mz'],d[1]['rt'])
                corpus[doc_name] = {}
                for word_index,count in zip(mat[d_pos,:].rows[0],mat[d_pos,:].data[0]):
                    if count > 0:
                        corpus[doc_name][vocab[word_index]] = count
                d_pos += 1

            # Added by Simon
            name = input_set[f][0].split('/')[-1].split('ms1')[0][:-1]
            corpus_dictionary[name] = corpus
            
        output_dict = {}
        output_dict['global_word_index'] = global_word_index
        output_dict['corpus_dictionary'] = corpus_dictionary
        with self.output().open('w') as f:
            pickle.dump(output_dict, f)            
            
    def output(self):
        return lg.LocalTarget('output_dict.p')

Example Step 3

Finally here we define a RunLDA task that depends on the output of the grouping class above.


In [4]:
class RunLDA(lg.Task):

    n_its = lg.IntParameter(default=10)
    K = lg.IntParameter(default=300)
    alpha = lg.FloatParameter(default=1)
    eta = lg.FloatParameter(default=0.1)
    update_alpha = lg.BoolParameter(default=True)
    
    datadir = lg.Parameter()
    prefixes = lg.ListParameter()    

    def requires(self):
        return GroupFeatures(datadir=self.datadir, prefixes=self.prefixes)
    
    def run(self):
        with self.input().open('r') as f:
            output_dict = pickle.load(f)            
        global_word_index = output_dict['global_word_index']
        corpus_dictionary = output_dict['corpus_dictionary']
        mf_lda = MultiFileVariationalLDA(corpus_dictionary, word_index=global_word_index,
                                         K=self.K, alpha=self.alpha, eta=self.eta, 
                                         update_alpha=self.update_alpha)
        mf_lda.run_vb(parallel=False, n_its=self.n_its, initialise=True)

Run the pipeline

Set up the initial parameters


In [5]:
datadir = '/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/'
prefixes = [
    'Urine_StrokeDrugs_02_T10_POS',
    'Urine_StrokeDrugs_03_T10_POS',
    'Urine_StrokeDrugs_08_T10_POS',
    'Urine_StrokeDrugs_09_T10_POS',
]
prefixes_json = json.dumps(prefixes)

And run the pipeline


In [6]:
lg.run(['RunLDA', '--workers', '1', '--local-scheduler', '--datadir', datadir, '--prefixes', prefixes_json])


DEBUG: Checking if RunLDA(n_its=10, K=300, alpha=1, eta=0.1, update_alpha=True, datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefixes=["Urine_StrokeDrugs_02_T10_POS", "Urine_StrokeDrugs_03_T10_POS", "Urine_StrokeDrugs_08_T10_POS", "Urine_StrokeDrugs_09_T10_POS"]) is complete
/Users/joewandy/anaconda/lib/python2.7/site-packages/luigi/worker.py:305: UserWarning: Task RunLDA(n_its=10, K=300, alpha=1, eta=0.1, update_alpha=True, datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefixes=["Urine_StrokeDrugs_02_T10_POS", "Urine_StrokeDrugs_03_T10_POS", "Urine_StrokeDrugs_08_T10_POS", "Urine_StrokeDrugs_09_T10_POS"]) without outputs has no custom complete() method
  is_complete = task.complete()
DEBUG: Checking if GroupFeatures(scaling_factor=1000, fragment_grouping_tol=7, loss_grouping_tol=7, loss_threshold_min_count=5, loss_threshold_max_val=200, loss_threshold_min_val=0, datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefixes=["Urine_StrokeDrugs_02_T10_POS", "Urine_StrokeDrugs_03_T10_POS", "Urine_StrokeDrugs_08_T10_POS", "Urine_StrokeDrugs_09_T10_POS"]) is complete
INFO: Informed scheduler that task   RunLDA_300_1__Users_joewandy__2bb33be5da   has status   PENDING
DEBUG: Checking if ExtractSpectra(datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefix=Urine_StrokeDrugs_02_T10_POS) is complete
DEBUG: Checking if ExtractSpectra(datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefix=Urine_StrokeDrugs_03_T10_POS) is complete
DEBUG: Checking if ExtractSpectra(datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefix=Urine_StrokeDrugs_08_T10_POS) is complete
DEBUG: Checking if ExtractSpectra(datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefix=Urine_StrokeDrugs_09_T10_POS) is complete
INFO: Informed scheduler that task   GroupFeatures__Users_joewandy__7_7_0056f025eb   has status   PENDING
INFO: Informed scheduler that task   ExtractSpectra__Users_joewandy__Urine_StrokeDrug_387606b629   has status   DONE
INFO: Informed scheduler that task   ExtractSpectra__Users_joewandy__Urine_StrokeDrug_b91accbee3   has status   DONE
INFO: Informed scheduler that task   ExtractSpectra__Users_joewandy__Urine_StrokeDrug_0c4355dc83   has status   DONE
INFO: Informed scheduler that task   ExtractSpectra__Users_joewandy__Urine_StrokeDrug_cae3ae0711   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10009] Worker Worker(salt=234252635, workers=1, host=C02JF2MEDKQ5.local, username=joewandy, pid=10009) running   GroupFeatures(scaling_factor=1000, fragment_grouping_tol=7, loss_grouping_tol=7, loss_threshold_min_count=5, loss_threshold_max_val=200, loss_threshold_min_val=0, datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefixes=["Urine_StrokeDrugs_02_T10_POS", "Urine_StrokeDrugs_03_T10_POS", "Urine_StrokeDrugs_08_T10_POS", "Urine_StrokeDrugs_09_T10_POS"])
Loading /Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/Urine_StrokeDrugs_02_T10_POS_ms1.csv
Loading /Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/Urine_StrokeDrugs_02_T10_POS_ms2.csv
Loading /Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/Urine_StrokeDrugs_03_T10_POS_ms1.csv
Loading /Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/Urine_StrokeDrugs_03_T10_POS_ms2.csv
Loading /Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/Urine_StrokeDrugs_08_T10_POS_ms1.csv
Loading /Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/Urine_StrokeDrugs_08_T10_POS_ms2.csv
Loading /Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/Urine_StrokeDrugs_09_T10_POS_ms1.csv
Loading /Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/Urine_StrokeDrugs_09_T10_POS_ms2.csv
Processing fragments for file 0
Processing fragments for file 1
Processing fragments for file 2
Processing fragments for file 3
Total groups=2415
Processing losses for file 0
Processing losses for file 1
Processing losses for file 2
Processing losses for file 3
Total groups=780
Initialising sparse matrix 0
Initialising sparse matrix 1
Initialising sparse matrix 2
Initialising sparse matrix 3
Populating the counts
Populating counts for fragment group 0/2415
Populating counts for fragment group 100/2415
Populating counts for fragment group 200/2415
Populating counts for fragment group 300/2415
Populating counts for fragment group 400/2415
Populating counts for fragment group 500/2415
Populating counts for fragment group 600/2415
Populating counts for fragment group 700/2415
Populating counts for fragment group 800/2415
Populating counts for fragment group 900/2415
Populating counts for fragment group 1000/2415
Populating counts for fragment group 1100/2415
Populating counts for fragment group 1200/2415
Populating counts for fragment group 1300/2415
Populating counts for fragment group 1400/2415
Populating counts for fragment group 1500/2415
Populating counts for fragment group 1600/2415
Populating counts for fragment group 1700/2415
Populating counts for fragment group 1800/2415
Populating counts for fragment group 1900/2415
Populating counts for fragment group 2000/2415
Populating counts for fragment group 2100/2415
Populating counts for fragment group 2200/2415
Populating counts for fragment group 2300/2415
Populating counts for fragment group 2400/2415
Populating counts for loss group 0/780
Populating counts for loss group 100/780
Populating counts for loss group 200/780
Populating counts for loss group 300/780
Populating counts for loss group 400/780
Populating counts for loss group 500/780
Populating counts for loss group 600/780
Populating counts for loss group 700/780
Normalising dataframe 0
file 0 normalising
file 0 normalised csc shape (818, 3195)
Normalising dataframe 1
file 1 normalising
file 1 normalised csc shape (685, 3195)
Normalising dataframe 2
file 2 normalising
file 2 normalised csc shape (740, 3195)
Normalising dataframe 3
file 3 normalising
file 3 normalised csc shape (533, 3195)
Processing file 0
818 3195
Processing file 1
685 3195
Processing file 2
740 3195
Processing file 3
533 3195
INFO: [pid 10009] Worker Worker(salt=234252635, workers=1, host=C02JF2MEDKQ5.local, username=joewandy, pid=10009) done      GroupFeatures(scaling_factor=1000, fragment_grouping_tol=7, loss_grouping_tol=7, loss_threshold_min_count=5, loss_threshold_max_val=200, loss_threshold_min_val=0, datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefixes=["Urine_StrokeDrugs_02_T10_POS", "Urine_StrokeDrugs_03_T10_POS", "Urine_StrokeDrugs_08_T10_POS", "Urine_StrokeDrugs_09_T10_POS"])
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   GroupFeatures__Users_joewandy__7_7_0056f025eb   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10009] Worker Worker(salt=234252635, workers=1, host=C02JF2MEDKQ5.local, username=joewandy, pid=10009) running   RunLDA(n_its=10, K=300, alpha=1, eta=0.1, update_alpha=True, datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefixes=["Urine_StrokeDrugs_02_T10_POS", "Urine_StrokeDrugs_03_T10_POS", "Urine_StrokeDrugs_08_T10_POS", "Urine_StrokeDrugs_09_T10_POS"])
Object created with 740 documents
Object created with 818 documents
Object created with 533 documents
Object created with 685 documents
serial processing
Iteration: 0
415.20895074
Iteration: 1
22.3113009506
Iteration: 2
22.4062464857
Iteration: 3
22.2457084829
Iteration: 4
23.6348062139
Iteration: 5
27.7334569719
Iteration: 6
36.9445439187
Iteration: 7
54.9165275244
Iteration: 8
81.7462496313
Iteration: 9
INFO: [pid 10009] Worker Worker(salt=234252635, workers=1, host=C02JF2MEDKQ5.local, username=joewandy, pid=10009) done      RunLDA(n_its=10, K=300, alpha=1, eta=0.1, update_alpha=True, datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefixes=["Urine_StrokeDrugs_02_T10_POS", "Urine_StrokeDrugs_03_T10_POS", "Urine_StrokeDrugs_08_T10_POS", "Urine_StrokeDrugs_09_T10_POS"])
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   RunLDA_300_1__Users_joewandy__2bb33be5da   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=234252635, workers=1, host=C02JF2MEDKQ5.local, username=joewandy, pid=10009) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 6 tasks of which:
* 4 present dependencies were encountered:
    - 4 ExtractSpectra(datadir=/Users/joewandy/Dropbox/Meta_clustering/MS2LDA/large_study/Urine_mzXML_large_study/method_1/POS/, prefix=Urine_StrokeDrugs_02_T10_POS) ...
* 2 ran successfully:
    - 1 GroupFeatures(...)
    - 1 RunLDA(...)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

110.188543666
Out[6]:
True