Data generation: using Python to sweep over methods and parameters

In this notebook, we illustrate how to use Python to perform parameter sweeps for a taxonomic assigner and integrate the results into the TAX CREdiT framework.

Environment preparation


In [1]:
from os.path import join, exists, split, sep, expandvars 
from os import makedirs, getpid
from glob import glob
from shutil import rmtree
import csv
import json
import tempfile
from itertools import product

from qiime2.plugins import feature_classifier
from qiime2 import Artifact
from joblib import Parallel, delayed
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import HashingVectorizer

from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

from q2_feature_classifier.classifier import spec_from_pipeline
from q2_types.feature_data import DNAIterator
from pandas import DataFrame

from tax_credit.framework_functions import (
    gen_param_sweep, generate_per_method_biom_tables, move_results_to_repository)

In [2]:
project_dir = expandvars('$HOME/Desktop/projects/short-read-tax-assignment/')
analysis_name = 'mock-community'
data_dir = join(project_dir, 'data', analysis_name)

reference_database_dir = expandvars("$HOME/Desktop/ref_dbs/")
results_dir = expandvars("$HOME/Desktop/projects/mock-community/")

Utility Methods

The below methods are used to load the data, prepare the data, parse the classifier and classification parameters, and fit and run the classifier. They should probably be moved to tax_credit.framework_functions.


In [45]:
# *** one glaring flaw here is that generate_pipeline_sweep iterates
# *** through all method_parameters_combinations and reference_dbs 
# *** and hence will generate training sets for each combo even if
# *** not all are called by commands in sweep. This is not an issue
# *** if sweep uses all classifiers but is inconvenient if attempting
# *** to test on a subset of sweep. Need to explicitly set all inputs!

def train_and_run_classifier(method_parameters_combinations, reference_dbs,
                             pipelines, sweep, verbose=False, n_jobs=4):
    '''Train and run q2-feature-classifier across a parameter sweep.
    method_parameters_combinations: dict of dicts of lists
        Classifier methods to run and their parameters/values to sweep
        Format: {method_name: {'parameter_name': [parameter_values]}}
    reference_dbs: dict of tuples
        Reference databases to use for classifier training.
        Format: {database_name: (ref_seqs, ref_taxonomy)}
    pipelines: dict
        Classifier pipelines to use for training each method.
        Format: {method_name: sklearn.pipeline.Pipeline}
    sweep: list of tuples
        output of gen_param_sweep(), format:
        (parameter_output_dir, input_dir, reference_seqs, reference_tax, method, params)
    n_jobs: number of jobs to run in parallel.
    '''
    # train classifier once for each pipeline param combo
    for method, db, pipeline_param, subsweep in generate_pipeline_sweep(
            method_parameters_combinations, reference_dbs, sweep):
        ref_reads, ref_taxa = reference_dbs[db]
        # train classifier
        classifier = train_classifier(
            ref_reads, ref_taxa, pipeline_param, pipelines[method], verbose=verbose)
        # run classifier. Only run in parallel once classifier is trained,
        # to minimize memory usage (don't want to train large refs in parallel)
        Parallel(n_jobs=n_jobs)(delayed(run_classifier)(
            classifier, output_dir, input_dir, split_params(params)[0], verbose=verbose)
            for output_dir, input_dir, rs, rt, mt, params in subsweep)

            
def generate_pipeline_sweep(method_parameters_combinations, reference_dbs, sweep):
    '''Generate pipeline parameters for each classifier training step'''
    # iterate over parameters
    for method, params in method_parameters_combinations.items():
        # split out pipeline parameters
        classifier_params, pipeline_params = split_params(params)
        # iterate over reference dbs
        for db, refs in reference_dbs.items():
            # iterate over all pipeline parameter combinations
            for param_product in product(*[params[id_] for id_ in pipeline_params]):
                # yield parameter combinations to use for a each classifier
                pipeline_param = dict(zip(pipeline_params, param_product))
                subsweep = [p for p in sweep if split_params(p[5])[1] 
                            == pipeline_param and p[2] == refs[0]]
                yield method, db, pipeline_param, subsweep


def train_classifier(ref_reads, ref_taxa, params, pipeline, verbose=False):
    ref_reads = Artifact.load(ref_reads)
    ref_taxa = Artifact.load(ref_taxa)
    pipeline.set_params(**params)
    spec = json.dumps(spec_from_pipeline(pipeline))
    if verbose:
        print(spec)
    classifier = feature_classifier.methods.fit_classifier(ref_reads, ref_taxa, spec)
    #return classifier.classifier


def run_classifier(classifier, output_dir, input_dir, params, verbose=False):    
    # Classify the sequences
    rep_seqs = Artifact.load(join(input_dir, 'rep_seqs.qza'))
    if verbose:
        print(output_dir)
    classification = feature_classifier.methods.classify(rep_seqs, classifier, **params)
    
    # Save the results
    makedirs(output_dir, exist_ok=True)
    output_file = join(output_dir, 'rep_set_tax_assignments.txt')
    dataframe = classification.classification.view(DataFrame)
    dataframe.to_csv(output_file, sep='\t', header=False)

    
def split_params(params):
    classifier_params = feature_classifier.methods.\
                        classify.signature.parameters.keys()
    pipeline_params = {k:v for k, v in params.items()
                        if k not in classifier_params}
    classifier_params = {k:v for k, v in params.items() 
                         if k in classifier_params}
    return classifier_params, pipeline_params

Preparing data set sweep

First, we're going to define the data sets that we'll sweep over. The following cell does not need to be modified unless if you wish to change the datasets or reference databases used in the sweep.


In [33]:
dataset_reference_combinations = [
 ('mock-1', 'gg_13_8_otus'), # formerly S16S-1
 ('mock-2', 'gg_13_8_otus'), # formerly S16S-2
 ('mock-3', 'gg_13_8_otus'), # formerly Broad-1
 ('mock-4', 'gg_13_8_otus'), # formerly Broad-2
 ('mock-5', 'gg_13_8_otus'), # formerly Broad-3
 # ('mock-6', 'gg_13_8_otus'), # formerly Turnbaugh-1
 ('mock-7', 'gg_13_8_otus'), # formerly Turnbaugh-2
 ('mock-8', 'gg_13_8_otus'), # formerly Turnbaugh-3
 ('mock-9', 'unite_20.11.2016_clean_fullITS'), # formerly ITS1
 ('mock-10', 'unite_20.11.2016_clean_fullITS'), # formerly ITS2-SAG
 ('mock-12', 'gg_13_8_otus'), # Extreme
 # ('mock-13', 'gg_13_8_otus_full16S'), # kozich-1
 # ('mock-14', 'gg_13_8_otus_full16S'), # kozich-2
 # ('mock-15', 'gg_13_8_otus_full16S'), # kozich-3
 ('mock-16', 'gg_13_8_otus'), # schirmer-1
]

reference_dbs = {'gg_13_8_otus' : (join(reference_database_dir, 'gg_13_8_otus/rep_set/99_otus_515f-806r.qza'), 
                                   join(reference_database_dir, 'gg_13_8_otus/taxonomy/99_otu_taxonomy.qza')),
                 # 'gg_13_8_otus_full16S' : (join(reference_database_dir, 'gg_13_8_otus/rep_set/99_otus.qza'), 
                 #                  join(reference_database_dir, 'gg_13_8_otus/taxonomy/99_otu_taxonomy.qza')),
                 'unite_20.11.2016_clean_fullITS' : (join(reference_database_dir, 'sh_qiime_release_20.11.2016/developer/sh_refs_qiime_ver7_99_20.11.2016_dev_clean.qza'), 
                                   join(reference_database_dir, 'sh_qiime_release_20.11.2016/developer/sh_taxonomy_qiime_ver7_99_20.11.2016_dev_clean.qza')),
                 # 'unite_20.11.2016' : (join(reference_database_dir, 'sh_qiime_release_20.11.2016/developer/sh_refs_qiime_ver7_99_20.11.2016_dev_BITSf-B58S3r_trim250.qza'), 
                 #                      join(reference_database_dir, 'sh_qiime_release_20.11.2016/developer/sh_taxonomy_qiime_ver7_99_20.11.2016_dev.qza'))
                }

Preparing the method/parameter combinations and generating commands

Now we set the methods and method-specific parameters that we want to sweep. Modify to sweep other methods.


In [34]:
method_parameters_combinations = {
    'q2-multinomialNB': {'confidence': [0.0, 0.2, 0.4, 0.6, 0.8],
                         'classify__alpha': [0.001, 0.01, 0.1],
                         'feat_ext__ngram_range': [[8,8], [12,12], [20,20]]},
    'q2-logisticregression': {'classify__solver': ['newton-cg', 'lbfgs', 'liblinear', 'sag']},
    'q2-randomforest': {'classify__max_features': ['sqrt', 'None'],
                        'classify__n_estimators': [5, 10, 100]}
    }

Preparing the pipelines

The below pipelines are used to specify the scikit-learn classifiers that are used for assignment. At the moment we only include Naïve Bayes but the collection will expand.


In [35]:
# pipeline params common to all classifiers are set here
hash_params = dict(
    analyzer='char_wb', n_features=8192, non_negative=True, ngram_range=[8, 8])

# any params common to all classifiers can be set here
classify_params = dict()

def build_pipeline(classifier, hash_params, classify_params):
    return Pipeline([
        ('feat_ext', HashingVectorizer(**hash_params)),
        ('classify', classifier(**classify_params))])
    
# Now fit the pipelines.
pipelines = {'q2-multinomialNB': build_pipeline(
                 MultinomialNB, hash_params, {'fit_prior': False}),
             'q2-logisticregression': build_pipeline(
                 LogisticRegression, hash_params, classify_params),
             'q2-randomforest': build_pipeline(
                 RandomForestClassifier, hash_params, classify_params)}

Test


In [36]:
dataset_reference_combinations = [
 ('mock-3', 'gg_13_8_otus'), # formerly Broad-1
]

method_parameters_combinations = {
    'q2-randomforest': {'classify__max_features': ['sqrt'],
                        'classify__n_estimators': [5]}
    }

reference_dbs = {'gg_13_8_otus' : (join(reference_database_dir, 'gg_13_8_otus/rep_set/99_otus_515f-806r.qza'), 
                                   join(reference_database_dir, 'gg_13_8_otus/taxonomy/99_otu_taxonomy.qza'))}

Do the Sweep


In [37]:
sweep = gen_param_sweep(data_dir, results_dir, reference_dbs,
                        dataset_reference_combinations,
                        method_parameters_combinations)
sweep = list(sweep)

A quick sanity check never hurt anyone...


In [38]:
print(len(sweep))
sweep[0]


1
Out[38]:
('/Users/nbokulich/Desktop/projects/mock-community/mock-3/gg_13_8_otus/q2-randomforest/sqrt:5',
 '/Users/nbokulich/Desktop/projects/short-read-tax-assignment/data/mock-community/mock-3',
 '/Users/nbokulich/Desktop/ref_dbs/gg_13_8_otus/rep_set/99_otus_515f-806r.qza',
 '/Users/nbokulich/Desktop/ref_dbs/gg_13_8_otus/taxonomy/99_otu_taxonomy.qza',
 'q2-randomforest',
 {'classify__max_features': 'sqrt', 'classify__n_estimators': 5})

In [44]:
train_and_run_classifier(method_parameters_combinations, reference_dbs, pipelines, sweep, verbose=True, n_jobs=4)


<artifact: FeatureData[Sequence] uuid: 57ec23ea-7618-4de0-abeb-cc5850264040> <artifact: FeatureData[Taxonomy] uuid: 8e1cb0b7-b1ff-45df-a41f-b7726a4a21a2> [["feat_ext", {"strip_accents": null, "__type__": "feature_extraction.text.HashingVectorizer", "tokenizer": null, "decode_error": "strict", "encoding": "utf-8", "norm": "l2", "non_negative": true, "token_pattern": "(?u)\\b\\w\\w+\\b", "stop_words": null, "binary": false, "analyzer": "char_wb", "preprocessor": null, "input": "content", "ngram_range": [8, 8], "lowercase": true, "n_features": 8192}], ["classify", {"__type__": "ensemble.forest.RandomForestClassifier", "n_jobs": 1, "class_weight": null, "bootstrap": true, "min_samples_split": 2, "oob_score": false, "min_impurity_split": 1e-07, "random_state": null, "max_leaf_nodes": null, "min_samples_leaf": 1, "min_weight_fraction_leaf": 0.0, "max_features": "sqrt", "verbose": 0, "max_depth": null, "warm_start": false, "criterion": "gini", "n_estimators": 5}]]
<artifact: FeatureData[Sequence] uuid: cded1920-4dae-496f-8e4d-2e74006831c9> None {} /Users/nbokulich/Desktop/projects/mock-community/mock-3/gg_13_8_otus/q2-randomforest/sqrt:5
<artifact: FeatureData[Sequence] uuid: 9c7c70b2-66d0-4fce-971d-b3db1f40873c> <artifact: FeatureData[Taxonomy] uuid: dea9dfe1-2d69-4240-a2c9-73a35e969ee6> [["feat_ext", {"strip_accents": null, "__type__": "feature_extraction.text.HashingVectorizer", "tokenizer": null, "decode_error": "strict", "encoding": "utf-8", "norm": "l2", "non_negative": true, "token_pattern": "(?u)\\b\\w\\w+\\b", "stop_words": null, "binary": false, "analyzer": "char_wb", "preprocessor": null, "input": "content", "ngram_range": [8, 8], "lowercase": true, "n_features": 8192}], ["classify", {"__type__": "ensemble.forest.RandomForestClassifier", "n_jobs": 1, "class_weight": null, "bootstrap": true, "min_samples_split": 2, "oob_score": false, "min_impurity_split": 1e-07, "random_state": null, "max_leaf_nodes": null, "min_samples_leaf": 1, "min_weight_fraction_leaf": 0.0, "max_features": "sqrt", "verbose": 0, "max_depth": null, "warm_start": false, "criterion": "gini", "n_estimators": 5}]]

Generate per-method biom tables

Modify the taxonomy_glob below to point to the taxonomy assignments that were generated above. This may be necessary if filepaths were altered in the preceding cells.


In [10]:
taxonomy_glob = join(results_dir, '*', '*', '*', '*', 'rep_set_tax_assignments.txt')
generate_per_method_biom_tables(taxonomy_glob, data_dir)

Move result files to repository

Add results to the short-read-taxa-assignment directory (e.g., to push these results to the repository or compare with other precomputed results in downstream analysis steps). The precomputed_results_dir path and methods_dirs glob below should not need to be changed unless if substantial changes were made to filepaths in the preceding cells.


In [11]:
precomputed_results_dir = join(project_dir, "data", "precomputed-results", analysis_name)
method_dirs = glob(join(results_dir, '*', '*', '*', '*'))
move_results_to_repository(method_dirs, precomputed_results_dir)