Run eQTL Analysis

This notebook coordinates and executes the eQTL analysis. This notebook is specialized for the Frazer lab cluster. Since running the entire analysis is time consuming, I generally run it "by hand," starting jobs for groups of genes at different times. I've included instructions at various points below.


In [1]:
import cPickle
import datetime
import glob
import gzip
import os
import random
import re
import shutil
import subprocess
import time
import uuid

import cdpybio as cpb
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pybedtools as pbt
import scipy.stats as stats
import seaborn as sns
import statsmodels.api as sm
import vcf as pyvcf

import cardipspy as cpy
import ciepy

%matplotlib inline
%load_ext rpy2.ipython

dy_name = 'run_eqtl_analysis'

import socket
if socket.gethostname() == 'fl-hn1' or socket.gethostname() == 'fl-hn2':
    dy = os.path.join(ciepy.root, 'sandbox', dy_name)
    cpy.makedir(dy)
    pbt.set_tempdir(dy)
    
outdir = os.path.join(ciepy.root, 'output', dy_name)
cpy.makedir(outdir)

private_outdir = os.path.join(ciepy.root, 'private_output', dy_name)
cpy.makedir(private_outdir)

random.seed(20150605)

In [2]:
gene_info = pd.read_table(cpy.gencode_gene_info, index_col=0)

fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'gene_to_regions.p')
gene_to_regions = cPickle.load(open(fn, 'rb'))

exp = pd.read_table(os.path.join(ciepy.root, 'output', 'eqtl_input', 
                                 'tpm_log_filtered_phe_std_norm_peer_resid.tsv'), index_col=0)

fn = os.path.join(ciepy.root, 'output', 'input_data', 'rnaseq_metadata.tsv')
rna_meta = pd.read_table(fn, index_col=0)

Run Analysis

The run_emmax_sge method will submit a job for a gene. I currently ask for 16Gb of RAM per job and four cores. If you ask for less cores, more jobs will run per node but all of the IO seems to slow the jobs down. Many genes probably need less than 16Gb of RAM but some need more. The mem_needed method was my attempt at estimating how much memory a job would need but it wasn't working well. I think the memory needed scales with the number of variants (which num_variants can tell you), so I could go and monitor the amount of memory used versus the number of variants. However, I didn't have a big problem with jobs failing when using 16Gb of memory (it seems like ~430 genes failed and had to be run again).


In [219]:
def num_variants(vcf, gene_id, tempdir, regions, samples, bcftools_path):
    """
    Count the number of variants to test for a gene. This currently 
    doesn't include CNVs but there aren't many of those.
    """
    import ciepy

    samples = pd.read_table(samples, header=None, squeeze=True)
    fn = os.path.join(tempdir, '{}.vcf.gz'.format(gene_id))
    c = ('{} view {} -q 0.01:minor -m2 -M2 -r {} -s {} -O u | '
         '{} filter -m x -O v | grep -v \\# | wc -l'.format(
             bcftools_path,
             vcf,
             regions,
             ','.join(samples.values),
             bcftools_path,
             fn))
    num = int(subprocess.check_output(c, shell=True).strip())
    return num

def make_variant_cov(res_files, out):
    """
    Make a covariate (.cov) file for a gene including a covariate for each lead variant
    in the EMMAX output files res_files.
    """
    vcf = os.path.join(ciepy.root, 'private_output/eqtl_input/filtered_all/0000.vcf.gz')
    gs_vcf = os.path.join(ciepy.root, 'private_output', 'cnv_processing', 'gs_emmax_sorted.vcf.gz')
    lumpy_vcf = os.path.join(ciepy.root, 'private_output', 'cnv_processing', 'lumpy_emmax_sorted.vcf.gz')
    cov = os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_sex_only.tsv')
    covariates = pd.read_table(cov, index_col=0, header=None, squeeze=True)
    new_covariates = pd.DataFrame({'sex':covariates})
    for i,fn in enumerate(res_files):
        res = ciepy.read_emmax_output(fn)
        res = res[res.PVALUE == res.PVALUE.min()]
        i = res.index[0]
        if 'CNV' in res.ix[i, 'MARKER_ID']:
            vcf_reader = pyvcf.Reader(open(gs_vcf))
        elif 'DUP' in res.ix[i, 'MARKER_ID'] or 'DEL' in res.ix[i, 'MARKER_ID']:
            vcf_reader = pyvcf.Reader(open(lumpy_vcf))
        else:
            vcf_reader = pyvcf.Reader(open(vcf))
        res = vcf_reader.fetch(res.ix[i, 'CHROM'], res.ix[i, 'BEG'], res.ix[i,'END'])
        r = res.next()
        new_covariates[i] = 0
        hets = set([x.sample for x in r.get_hets()]) & set(new_covariates.index)
        halts = set([x.sample for x in r.get_hom_alts()]) & set(new_covariates.index)
        new_covariates.ix[hets, i] = 1
        new_covariates.ix[halts, i] = 2
    new_covariates.to_csv(out, sep='\t', header=None)

def run_emmax_sge(
    gene_id, 
    out_dy,
    ncores=4,
    mem=16, 
    queue=None, 
    res_files=None,
    samples=os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_samples.tsv'),
    cov=os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_sex_only.tsv'),
    exp=os.path.join(ciepy.root, 'output', 'eqtl_input', 'tpm_log_filtered_phe_std_norm_peer_resid.tsv'),
    min_perm=1000,
    max_perm=10000,
):
    """
    Run EMMAX for a given gene.
    """
    vcf = os.path.join(ciepy.root, 'private_output/eqtl_input/filtered_all/0000.vcf.gz')
    gs_vcf = os.path.join(ciepy.root, 'private_output', 'cnv_processing', 'gs_emmax_sorted.vcf.gz')
    lumpy_vcf = os.path.join(ciepy.root, 'private_output', 'cnv_processing', 'lumpy_emmax_sorted.vcf.gz')
    #samples = os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_samples.tsv')
    regions = ','.join([x[3:] for x in gene_to_regions[gene_id]])
    
    kin = os.path.join(ciepy.root, 'output', 'eqtl_input', 'wgs.kin')
    toutdir = os.path.join(out_dy, 'gene_results', gene_id)
    cpy.makedir(toutdir)
    
    # If one or more emmax results files are provided, we'll get the most significant
    # variant from each file and add that as a covariate. We'll write the new covariate
    # file in the gene's output directory.
    if res_files:
        covariates = pd.read_table(cov, index_col=0, header=None, squeeze=True)
        for fn in res_files:
            cov = os.path.join(toutdir, '{}.cov'.format(gene_id))
            make_variant_cov(res_files, cov)

    res = datetime.datetime.now()
    date = re.sub(r'\D', '_', str(res))
    name = 'emmax_{}_{}_{}'.format(gene_id, date, os.path.split(out_dy)[1])
    fn = os.path.join(out_dy, 'sge_scripts', '{}.sh'.format(name))
    with open(fn, 'w') as f:
        f.write('#!/bin/bash\n\n')
        f.write('#$ -N {}\n'.format(name))
        if queue:
            f.write('#$ -l {}\n'.format(queue))
        f.write('#$ -l h_vmem={}G\n'.format(mem / ncores))
        f.write('#$ -pe smp {}\n'.format(ncores))
        f.write('#$ -S /bin/bash\n')
        f.write('#$ -o {}/{}.out\n'.format(
                os.path.join(out_dy, 'logs'), name))
        f.write('#$ -e {}/{}.err\n\n'.format(
                os.path.join(out_dy, 'logs'), name))
        f.write('module load cardips/1\n')
        f.write('source activate cie\n\n')
        
        c = 'python {} \\\n\t'.format(os.path.join(ciepy.root, 'scripts', 'run_emmax.py'))
        c += ' \\\n\t'.join([
                gene_id,
                '{},{},{}'.format(vcf, gs_vcf, lumpy_vcf),
                regions,
                exp,
                samples,
                kin,
                toutdir,
                '-c {}'.format(cov),
                '-i {}'.format(min_perm),
                '-a {}'.format(max_perm),
                '-m 0.01',
            ])
        f.write(c + '\n\n')
    subprocess.check_call('qsub {}'.format(fn), shell=True)
    #print(fn)
    
def get_jobs():
    running = !qstat -r
    running = [x for x in running if 'emmax_' in x]
    if len(running) > 0:
        jnames = [x.split()[-1] for x in running if 'jobname' in x]
        status = [x.split()[4] for x in running if 'cdeboever' in x]
        queue = [x.split()[-2].split('@')[0] for x in running if 'cdeboever' in x]
        genes = [x.split('_')[1] for x in jnames]
        out_dys = ['_'.join(x.split('_')[9:]) for x in jnames]
        submit_time = ['_'.join(x.split('_')[2:-1]) for x in jnames]
        gene_out_dy = ['{}_{}'.format(genes[x], out_dys[x]) for x in range(len(genes))]
        jobs = pd.DataFrame({'status':status, 'gene_id':genes, 'out_dy':out_dys, 'out_dy':out_dys,
                             'submit_time':submit_time, 'queue':queue, 'gene_out_dy':gene_out_dy}, 
                            index=jnames)
        jobs.ix[jobs.status != 'r', 'queue'] = np.nan
        jobs.sort_values(by='submit_time', inplace=True)
        return jobs
    else:
        return None

def get_gene_status(jobs, out_dy):
    dy = os.path.split(out_dy)[1]
    fns = glob.glob(os.path.join(out_dy, 'gene_results', '*'))
    exist = ['{}_{}'.format(os.path.split(x)[1], dy) for x in fns]
    fns = glob.glob(os.path.join(out_dy, 'gene_results', '*', 'permuted_reml.tsv'))
    finished = ['{}_{}'.format(x.split('/')[-2], dy) for x in fns]
    finished = pd.DataFrame(True, columns=['gene_finished'], index=finished)
    exist = pd.DataFrame(True, columns=['gene_exist'], index=exist)
    if jobs is not None:
        jobs = jobs.merge(finished, left_on='gene_out_dy', right_index=True, how='left')
        jobs.ix[jobs.gene_finished.isnull(), 'gene_finished'] = False

    gstatus = exist.join(finished, how='outer')
    gstatus.ix[gstatus.gene_finished.isnull(), 'gene_finished'] = False
    gstatus['job_running'] = False
    if jobs is not None:
        gstatus.ix[set(gstatus.index) & set(jobs.gene_out_dy), 'job_running'] = True
    return jobs, gstatus

def submit_jobs(
    todo, 
    num_to_submit, 
    out_dy, 
    res_dys=None, 
    submit_failed=True, 
    queue=None,
    ncores=4,
    mem=16,
    samples=os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_samples.tsv'),
    cov=os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_sex_only.tsv'),
    exp=os.path.join(ciepy.root, 'output', 'eqtl_input', 'tpm_log_filtered_phe_std_norm_peer_resid.tsv'),
    min_perm=1000,
    max_perm=10000,
):
    """out_dy is the otuput directory where the per-gene output directories
    will be stored. If failed == True, re-submit failed jobs."""
    # Set up output directories
    cpy.makedir(os.path.join(out_dy, 'gene_results'))
    cpy.makedir(os.path.join(out_dy, 'sge_scripts'))
    cpy.makedir(os.path.join(out_dy, 'logs'))
    
    # Get current jobs and gene statuses.
    jobs = get_jobs()
    jobs, gstatus = get_gene_status(jobs, out_dy)
    exist = glob.glob(os.path.join(out_dy, 'gene_results', '*'))
    # Find failed genes, clean them up, and resubmit if desired.
    failed = gstatus[(gstatus.gene_finished == False) & (gstatus.job_running == False)]
    failed = failed.ix[[x for x in failed.index if os.path.split(out_dy)[1] in x]]
    if failed.shape[0] > 0:
        failed_genes = [x.split('_')[0] for x in failed.index]
        process_failed(failed_genes, out_dy)
    if submit_failed:
        submit_failed_genes(out_dy, exist, queue=queue)
    # Don't submit genes whose output directory already exists.
    exist = set([os.path.split(x)[1] for x in exist])
    todo = set(todo) - exist
    # Remove failed genes from list to submit.
    failed_fn = os.path.join(out_dy, 'failed.tsv')
    if os.path.exists(failed_fn):
        failed = pd.read_table(failed_fn, squeeze=True, 
                               header=None, index_col=0)
        todo = list(set(todo) - set(failed.index))

    # Submit new jobs.
    todo = list(todo)
    ind = 0
    while (ind < num_to_submit) and (ind < len(todo)):
        # Get prior results to use as covariate if needed.
        if res_dys:
            res_files = []
            for dy in res_dys:
                if os.path.exists(os.path.join(dy, 'gene_results', todo[ind])):
                    res_files.append(os.path.join(dy, 'gene_results', todo[ind], todo[ind] + '.tsv'))
        else:
            res_files = None
        run_emmax_sge(todo[ind], out_dy, queue=queue, res_files=res_files, ncores=ncores,
                      mem=mem, samples=samples, cov=cov, exp=exp, min_perm=min_perm, max_perm=max_perm)
        ind += 1

def submit_failed_genes(out_dy, exist, queue=None):
    # Submit failed genes with more memory.
    failed_fn = os.path.join(out_dy, 'failed.tsv')
    if os.path.exists(failed_fn):
        failed = pd.read_table(failed_fn, index_col=0, 
                               header=None, squeeze=True)
        todo = list(set(failed.index) - set([os.path.split(x)[1] for x in exist]))
        for gene in todo:
            mem = (failed[gene] + 1) * 16
            run_emmax_sge(gene, out_dy, mem=mem, queue=queue)

def process_failed(failed_genes, out_dy):
    # Compare new failed genes to file (if it exists) and write to file.
    # The file keeps track of how many times a gene has failed so we can boost
    # the memory appropriately.
    failed_fn = os.path.join(out_dy, 'failed.tsv')
    if os.path.exists(failed_fn):
        failed = pd.read_table(failed_fn, squeeze=True, 
                               header=None, index_col=0)
        if len(set(failed.index) & set(failed_genes)) > 0:
            failed[list(set(failed.index) & set(failed_genes))] += 1
        t = pd.Series(1, index=list(set(failed_genes) - set(failed.index)))
        failed = pd.concat([t, failed])
    else:
        failed = pd.Series(1, index=failed_genes)
    failed.to_csv(failed_fn, header=None, sep='\t')

    # Remove output directories.
    dys = [os.path.join(out_dy, 'gene_results', g) for g in failed_genes]
    c = ' ; '.join(['if [ -d "{0}" ]; then rm -r {0} ; fi'.format(dy) for dy in dys])
    subprocess.check_call(c, shell=True)

    # Delete temp directories if they exist.
    dys = ['/dev/shm/{}'.format(g) for g in failed_genes]
    s = ' ; '.join(['if [ -d "{0}" ]; then rm -r {0} ; fi'.format(dy) for dy in dys])
    c = 'pdsh -g n "{}"'.format(s)
    subprocess.check_call(c, shell=True)

def status(out_dy):
    jobs = get_jobs()
    jobs,gstatus = get_gene_status(jobs, out_dy)
    jobs['job_name'] = jobs.index
    jobs.index = jobs.gene_out_dy
    print(pd.crosstab(gstatus.gene_finished, gstatus.job_running))

First analysis


In [65]:
out_dy = os.path.join(private_outdir, 'eqtls01')
with open(os.path.join(ciepy.root, 'output', 'eqtl_input', 'eqtl_genes.tsv')) as f:
    todo = [x.strip() for x in f.readlines()]
res_dys = None

# num_to_submit = 1100
# submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue=None)
num_to_submit = 400
submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue='opt', mem=8)

In [10]:
2 +


  File "<ipython-input-10-2b0d43f017d6>", line 1
    2 +
        ^
SyntaxError: invalid syntax

Second analysis

It's a little slower to submit jobs here because I have to parse stuff from the first round.


In [199]:
qvalues = pd.read_table(os.path.join(ciepy.root, 'output', 'eqtl_processing', 'eqtls01',
                                     'qvalues.tsv'), index_col=0)
sig = qvalues[qvalues.perm_sig]
out_dy = os.path.join(private_outdir, 'eqtls02')
res_dys = [os.path.join(private_outdir, 'eqtls01')]
cpy.makedir(out_dy)
todo = list(set(sig.index))

# num_to_submit = 2200 - 20
# submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue=None)
num_to_submit = 20
submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue='opt', mem=8)

In [426]:
2 +


  File "<ipython-input-426-9371e255fe97>", line 1
    2 +
       ^
SyntaxError: invalid syntax

Third analysis


In [222]:
qvalues = pd.read_table(os.path.join(ciepy.root, 'output', 'eqtl_processing', 'eqtls02',
                                     'qvalues.tsv'), index_col=0)
sig = qvalues[qvalues.perm_sig]
out_dy = os.path.join(private_outdir, 'eqtls03')
res_dys = [os.path.join(private_outdir, 'eqtls01'), os.path.join(private_outdir, 'eqtls02')]
cpy.makedir(out_dy)
todo = list(set(sig.index) - 
            set([os.path.split(x)[1] for x in glob.glob(os.path.join(out_dy, '*'))]))

# num_to_submit = 200
# submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue=None)
num_to_submit = 50
submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue='opt', mem=8)

In [422]:
3 +


  File "<ipython-input-422-4715fb1d5390>", line 1
    3 +
        ^
SyntaxError: invalid syntax

Fourth analysis


In [15]:
qvalues = pd.read_table(os.path.join(ciepy.root, 'output', 'eqtl_processing', 'eqtls03',
                                     'qvalues.tsv'), index_col=0)
sig = qvalues[qvalues.perm_sig]
out_dy = os.path.join(private_outdir, 'eqtls04')
res_dys = [os.path.join(private_outdir, 'eqtls01'), os.path.join(private_outdir, 'eqtls02'),
           os.path.join(private_outdir, 'eqtls02')]
cpy.makedir(out_dy)
todo = list(set(sig.index) - 
            set([os.path.split(x)[1] for x in glob.glob(os.path.join(out_dy, '*'))]))

num_to_submit = 60
submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue=None)
num_to_submit = 51
submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue='opt', mem=8)

In [ ]:
2 +

No PEER residuals, just covariates

I'll do with and without standard normal transformation.


In [226]:
out_dy = out_dy = os.path.join(private_outdir, 'no_peer_no_std_norm01')
cpy.makedir(out_dy)
# with open(os.path.join(ciepy.root, 'output', 'eqtl_input', 'eqtl_genes.tsv')) as f:
#     todo = [x.strip() for x in f.readlines()]

# pgenes = ['IDO1', 'LCK', 'POU5F1', 'CXCL5', 'BCL9', 'FGFR1']
pgenes = ['POU5F1', 'CXCL5', 'BCL9', 'FGFR1', 'IDO1']
todo = []
for g in pgenes:
    i = gene_info[gene_info.gene_name == g].index[0]
    todo.append(i)
res_dys = None

cov_fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_full.tsv')
exp_fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'tpm_log_filtered_phe.tsv')

num_to_submit = len(todo)
submit_jobs(todo, num_to_submit, out_dy, submit_failed=True, 
            queue='opt', cov=cov_fn, exp=exp_fn, min_perm=0, max_perm=0, ncores=4)
out_dy = out_dy = os.path.join(private_outdir, 'no_peer01') cpy.makedir(out_dy) with open(os.path.join(ciepy.root, 'output', 'eqtl_input', 'eqtl_genes.tsv')) as f: todo = [x.strip() for x in f.readlines()] res_dys = None cov_fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_full.tsv') exp_fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'tpm_log_filtered_phe_std_norm.tsv') num_to_submit = len(todo) submit_jobs(todo, num_to_submit, out_dy, submit_failed=True, queue='opt', cov=cov_fn, exp=exp_fn, min_perm=0, max_perm=0, ncores=4)

Unrelated individuals

I want to run the analysis using unrelated individuals so I can compare to GTEx more accurately.


In [213]:
out_dy = os.path.join(private_outdir, 'unrelated_eqtls01')
with open(os.path.join(ciepy.root, 'output', 'eqtl_input', 'eqtl_genes.tsv')) as f:
    todo = [x.strip() for x in f.readlines()]
res_dys = None
samples_fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_samples_unrelated.tsv')
cov_fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_sex_only_unrelated.tsv')

num_to_submit = 700
submit_jobs(todo, num_to_submit, out_dy, samples=samples_fn, cov=cov_fn,
            res_dys=res_dys, submit_failed=True, queue=None)
# num_to_submit = 500
# submit_jobs(todo, num_to_submit, out_dy, samples=samples_fn, cov=cov_fn,
#             res_dys=res_dys, submit_failed=True, queue='opt', mem=8)

In [ ]:
2 +

In [ ]: