In this notebook, we illustrate how to use Python to perform parameter sweeps for a taxonomic assigner and integrate the results into the TAX CREdiT framework.
In [1]:
from os.path import join, exists, split, sep, expandvars
from os import makedirs, getpid
from glob import glob
from shutil import rmtree
import csv
import json
import tempfile
from itertools import product
from qiime2.plugins import feature_classifier
from qiime2 import Artifact
from joblib import Parallel, delayed
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from q2_feature_classifier.classifier import spec_from_pipeline
from q2_types.feature_data import DNAIterator
from pandas import DataFrame
from tax_credit.framework_functions import (
gen_param_sweep, generate_per_method_biom_tables, move_results_to_repository)
In [2]:
project_dir = expandvars('$HOME/Desktop/projects/short-read-tax-assignment/')
analysis_name = 'mock-community'
data_dir = join(project_dir, 'data', analysis_name)
reference_database_dir = expandvars("$HOME/Desktop/ref_dbs/")
results_dir = expandvars("$HOME/Desktop/projects/mock-community/")
In [45]:
# *** one glaring flaw here is that generate_pipeline_sweep iterates
# *** through all method_parameters_combinations and reference_dbs
# *** and hence will generate training sets for each combo even if
# *** not all are called by commands in sweep. This is not an issue
# *** if sweep uses all classifiers but is inconvenient if attempting
# *** to test on a subset of sweep. Need to explicitly set all inputs!
def train_and_run_classifier(method_parameters_combinations, reference_dbs,
pipelines, sweep, verbose=False, n_jobs=4):
'''Train and run q2-feature-classifier across a parameter sweep.
method_parameters_combinations: dict of dicts of lists
Classifier methods to run and their parameters/values to sweep
Format: {method_name: {'parameter_name': [parameter_values]}}
reference_dbs: dict of tuples
Reference databases to use for classifier training.
Format: {database_name: (ref_seqs, ref_taxonomy)}
pipelines: dict
Classifier pipelines to use for training each method.
Format: {method_name: sklearn.pipeline.Pipeline}
sweep: list of tuples
output of gen_param_sweep(), format:
(parameter_output_dir, input_dir, reference_seqs, reference_tax, method, params)
n_jobs: number of jobs to run in parallel.
'''
# train classifier once for each pipeline param combo
for method, db, pipeline_param, subsweep in generate_pipeline_sweep(
method_parameters_combinations, reference_dbs, sweep):
ref_reads, ref_taxa = reference_dbs[db]
# train classifier
classifier = train_classifier(
ref_reads, ref_taxa, pipeline_param, pipelines[method], verbose=verbose)
# run classifier. Only run in parallel once classifier is trained,
# to minimize memory usage (don't want to train large refs in parallel)
Parallel(n_jobs=n_jobs)(delayed(run_classifier)(
classifier, output_dir, input_dir, split_params(params)[0], verbose=verbose)
for output_dir, input_dir, rs, rt, mt, params in subsweep)
def generate_pipeline_sweep(method_parameters_combinations, reference_dbs, sweep):
'''Generate pipeline parameters for each classifier training step'''
# iterate over parameters
for method, params in method_parameters_combinations.items():
# split out pipeline parameters
classifier_params, pipeline_params = split_params(params)
# iterate over reference dbs
for db, refs in reference_dbs.items():
# iterate over all pipeline parameter combinations
for param_product in product(*[params[id_] for id_ in pipeline_params]):
# yield parameter combinations to use for a each classifier
pipeline_param = dict(zip(pipeline_params, param_product))
subsweep = [p for p in sweep if split_params(p[5])[1]
== pipeline_param and p[2] == refs[0]]
yield method, db, pipeline_param, subsweep
def train_classifier(ref_reads, ref_taxa, params, pipeline, verbose=False):
ref_reads = Artifact.load(ref_reads)
ref_taxa = Artifact.load(ref_taxa)
pipeline.set_params(**params)
spec = json.dumps(spec_from_pipeline(pipeline))
if verbose:
print(spec)
classifier = feature_classifier.methods.fit_classifier(ref_reads, ref_taxa, spec)
#return classifier.classifier
def run_classifier(classifier, output_dir, input_dir, params, verbose=False):
# Classify the sequences
rep_seqs = Artifact.load(join(input_dir, 'rep_seqs.qza'))
if verbose:
print(output_dir)
classification = feature_classifier.methods.classify(rep_seqs, classifier, **params)
# Save the results
makedirs(output_dir, exist_ok=True)
output_file = join(output_dir, 'rep_set_tax_assignments.txt')
dataframe = classification.classification.view(DataFrame)
dataframe.to_csv(output_file, sep='\t', header=False)
def split_params(params):
classifier_params = feature_classifier.methods.\
classify.signature.parameters.keys()
pipeline_params = {k:v for k, v in params.items()
if k not in classifier_params}
classifier_params = {k:v for k, v in params.items()
if k in classifier_params}
return classifier_params, pipeline_params
First, we're going to define the data sets that we'll sweep over. The following cell does not need to be modified unless if you wish to change the datasets or reference databases used in the sweep.
In [33]:
dataset_reference_combinations = [
('mock-1', 'gg_13_8_otus'), # formerly S16S-1
('mock-2', 'gg_13_8_otus'), # formerly S16S-2
('mock-3', 'gg_13_8_otus'), # formerly Broad-1
('mock-4', 'gg_13_8_otus'), # formerly Broad-2
('mock-5', 'gg_13_8_otus'), # formerly Broad-3
# ('mock-6', 'gg_13_8_otus'), # formerly Turnbaugh-1
('mock-7', 'gg_13_8_otus'), # formerly Turnbaugh-2
('mock-8', 'gg_13_8_otus'), # formerly Turnbaugh-3
('mock-9', 'unite_20.11.2016_clean_fullITS'), # formerly ITS1
('mock-10', 'unite_20.11.2016_clean_fullITS'), # formerly ITS2-SAG
('mock-12', 'gg_13_8_otus'), # Extreme
# ('mock-13', 'gg_13_8_otus_full16S'), # kozich-1
# ('mock-14', 'gg_13_8_otus_full16S'), # kozich-2
# ('mock-15', 'gg_13_8_otus_full16S'), # kozich-3
('mock-16', 'gg_13_8_otus'), # schirmer-1
]
reference_dbs = {'gg_13_8_otus' : (join(reference_database_dir, 'gg_13_8_otus/rep_set/99_otus_515f-806r.qza'),
join(reference_database_dir, 'gg_13_8_otus/taxonomy/99_otu_taxonomy.qza')),
# 'gg_13_8_otus_full16S' : (join(reference_database_dir, 'gg_13_8_otus/rep_set/99_otus.qza'),
# join(reference_database_dir, 'gg_13_8_otus/taxonomy/99_otu_taxonomy.qza')),
'unite_20.11.2016_clean_fullITS' : (join(reference_database_dir, 'sh_qiime_release_20.11.2016/developer/sh_refs_qiime_ver7_99_20.11.2016_dev_clean.qza'),
join(reference_database_dir, 'sh_qiime_release_20.11.2016/developer/sh_taxonomy_qiime_ver7_99_20.11.2016_dev_clean.qza')),
# 'unite_20.11.2016' : (join(reference_database_dir, 'sh_qiime_release_20.11.2016/developer/sh_refs_qiime_ver7_99_20.11.2016_dev_BITSf-B58S3r_trim250.qza'),
# join(reference_database_dir, 'sh_qiime_release_20.11.2016/developer/sh_taxonomy_qiime_ver7_99_20.11.2016_dev.qza'))
}
In [34]:
method_parameters_combinations = {
'q2-multinomialNB': {'confidence': [0.0, 0.2, 0.4, 0.6, 0.8],
'classify__alpha': [0.001, 0.01, 0.1],
'feat_ext__ngram_range': [[8,8], [12,12], [20,20]]},
'q2-logisticregression': {'classify__solver': ['newton-cg', 'lbfgs', 'liblinear', 'sag']},
'q2-randomforest': {'classify__max_features': ['sqrt', 'None'],
'classify__n_estimators': [5, 10, 100]}
}
In [35]:
# pipeline params common to all classifiers are set here
hash_params = dict(
analyzer='char_wb', n_features=8192, non_negative=True, ngram_range=[8, 8])
# any params common to all classifiers can be set here
classify_params = dict()
def build_pipeline(classifier, hash_params, classify_params):
return Pipeline([
('feat_ext', HashingVectorizer(**hash_params)),
('classify', classifier(**classify_params))])
# Now fit the pipelines.
pipelines = {'q2-multinomialNB': build_pipeline(
MultinomialNB, hash_params, {'fit_prior': False}),
'q2-logisticregression': build_pipeline(
LogisticRegression, hash_params, classify_params),
'q2-randomforest': build_pipeline(
RandomForestClassifier, hash_params, classify_params)}
In [36]:
dataset_reference_combinations = [
('mock-3', 'gg_13_8_otus'), # formerly Broad-1
]
method_parameters_combinations = {
'q2-randomforest': {'classify__max_features': ['sqrt'],
'classify__n_estimators': [5]}
}
reference_dbs = {'gg_13_8_otus' : (join(reference_database_dir, 'gg_13_8_otus/rep_set/99_otus_515f-806r.qza'),
join(reference_database_dir, 'gg_13_8_otus/taxonomy/99_otu_taxonomy.qza'))}
In [37]:
sweep = gen_param_sweep(data_dir, results_dir, reference_dbs,
dataset_reference_combinations,
method_parameters_combinations)
sweep = list(sweep)
A quick sanity check never hurt anyone...
In [38]:
print(len(sweep))
sweep[0]
Out[38]:
In [44]:
train_and_run_classifier(method_parameters_combinations, reference_dbs, pipelines, sweep, verbose=True, n_jobs=4)
In [10]:
taxonomy_glob = join(results_dir, '*', '*', '*', '*', 'rep_set_tax_assignments.txt')
generate_per_method_biom_tables(taxonomy_glob, data_dir)
Add results to the short-read-taxa-assignment directory (e.g., to push these results to the repository or compare with other precomputed results in downstream analysis steps). The precomputed_results_dir path and methods_dirs glob below should not need to be changed unless if substantial changes were made to filepaths in the preceding cells.
In [11]:
precomputed_results_dir = join(project_dir, "data", "precomputed-results", analysis_name)
method_dirs = glob(join(results_dir, '*', '*', '*', '*'))
move_results_to_repository(method_dirs, precomputed_results_dir)