This notebook coordinates and executes the eQTL analysis. This notebook is specialized for the Frazer lab cluster. Since running the entire analysis is time consuming, I generally run it "by hand," starting jobs for groups of genes at different times. I've included instructions at various points below.
In [1]:
import cPickle
import datetime
import glob
import gzip
import os
import random
import re
import shutil
import subprocess
import time
import uuid
import cdpybio as cpb
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pybedtools as pbt
import scipy.stats as stats
import seaborn as sns
import statsmodels.api as sm
import vcf as pyvcf
import cardipspy as cpy
import ciepy
%matplotlib inline
%load_ext rpy2.ipython
dy_name = 'run_eqtl_analysis'
import socket
if socket.gethostname() == 'fl-hn1' or socket.gethostname() == 'fl-hn2':
dy = os.path.join(ciepy.root, 'sandbox', dy_name)
cpy.makedir(dy)
pbt.set_tempdir(dy)
outdir = os.path.join(ciepy.root, 'output', dy_name)
cpy.makedir(outdir)
private_outdir = os.path.join(ciepy.root, 'private_output', dy_name)
cpy.makedir(private_outdir)
random.seed(20150605)
In [2]:
gene_info = pd.read_table(cpy.gencode_gene_info, index_col=0)
fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'gene_to_regions.p')
gene_to_regions = cPickle.load(open(fn, 'rb'))
exp = pd.read_table(os.path.join(ciepy.root, 'output', 'eqtl_input',
'tpm_log_filtered_phe_std_norm_peer_resid.tsv'), index_col=0)
fn = os.path.join(ciepy.root, 'output', 'input_data', 'rnaseq_metadata.tsv')
rna_meta = pd.read_table(fn, index_col=0)
The run_emmax_sge method will submit a job for a gene. I currently ask for
16Gb of RAM per job and four cores. If you ask for less cores, more jobs will run
per node but all of the IO seems to slow the jobs down. Many genes probably need
less than 16Gb of RAM but some need more. The mem_needed method was my attempt
at estimating how much memory a job would need but it wasn't working well.
I think the memory needed scales with the number of variants (which num_variants
can tell you), so I could go and monitor the amount of memory used versus
the number of variants. However, I didn't have a big problem with jobs failing when
using 16Gb of memory (it seems like ~430 genes failed and had to be run again).
In [219]:
def num_variants(vcf, gene_id, tempdir, regions, samples, bcftools_path):
"""
Count the number of variants to test for a gene. This currently
doesn't include CNVs but there aren't many of those.
"""
import ciepy
samples = pd.read_table(samples, header=None, squeeze=True)
fn = os.path.join(tempdir, '{}.vcf.gz'.format(gene_id))
c = ('{} view {} -q 0.01:minor -m2 -M2 -r {} -s {} -O u | '
'{} filter -m x -O v | grep -v \\# | wc -l'.format(
bcftools_path,
vcf,
regions,
','.join(samples.values),
bcftools_path,
fn))
num = int(subprocess.check_output(c, shell=True).strip())
return num
def make_variant_cov(res_files, out):
"""
Make a covariate (.cov) file for a gene including a covariate for each lead variant
in the EMMAX output files res_files.
"""
vcf = os.path.join(ciepy.root, 'private_output/eqtl_input/filtered_all/0000.vcf.gz')
gs_vcf = os.path.join(ciepy.root, 'private_output', 'cnv_processing', 'gs_emmax_sorted.vcf.gz')
lumpy_vcf = os.path.join(ciepy.root, 'private_output', 'cnv_processing', 'lumpy_emmax_sorted.vcf.gz')
cov = os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_sex_only.tsv')
covariates = pd.read_table(cov, index_col=0, header=None, squeeze=True)
new_covariates = pd.DataFrame({'sex':covariates})
for i,fn in enumerate(res_files):
res = ciepy.read_emmax_output(fn)
res = res[res.PVALUE == res.PVALUE.min()]
i = res.index[0]
if 'CNV' in res.ix[i, 'MARKER_ID']:
vcf_reader = pyvcf.Reader(open(gs_vcf))
elif 'DUP' in res.ix[i, 'MARKER_ID'] or 'DEL' in res.ix[i, 'MARKER_ID']:
vcf_reader = pyvcf.Reader(open(lumpy_vcf))
else:
vcf_reader = pyvcf.Reader(open(vcf))
res = vcf_reader.fetch(res.ix[i, 'CHROM'], res.ix[i, 'BEG'], res.ix[i,'END'])
r = res.next()
new_covariates[i] = 0
hets = set([x.sample for x in r.get_hets()]) & set(new_covariates.index)
halts = set([x.sample for x in r.get_hom_alts()]) & set(new_covariates.index)
new_covariates.ix[hets, i] = 1
new_covariates.ix[halts, i] = 2
new_covariates.to_csv(out, sep='\t', header=None)
def run_emmax_sge(
gene_id,
out_dy,
ncores=4,
mem=16,
queue=None,
res_files=None,
samples=os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_samples.tsv'),
cov=os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_sex_only.tsv'),
exp=os.path.join(ciepy.root, 'output', 'eqtl_input', 'tpm_log_filtered_phe_std_norm_peer_resid.tsv'),
min_perm=1000,
max_perm=10000,
):
"""
Run EMMAX for a given gene.
"""
vcf = os.path.join(ciepy.root, 'private_output/eqtl_input/filtered_all/0000.vcf.gz')
gs_vcf = os.path.join(ciepy.root, 'private_output', 'cnv_processing', 'gs_emmax_sorted.vcf.gz')
lumpy_vcf = os.path.join(ciepy.root, 'private_output', 'cnv_processing', 'lumpy_emmax_sorted.vcf.gz')
#samples = os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_samples.tsv')
regions = ','.join([x[3:] for x in gene_to_regions[gene_id]])
kin = os.path.join(ciepy.root, 'output', 'eqtl_input', 'wgs.kin')
toutdir = os.path.join(out_dy, 'gene_results', gene_id)
cpy.makedir(toutdir)
# If one or more emmax results files are provided, we'll get the most significant
# variant from each file and add that as a covariate. We'll write the new covariate
# file in the gene's output directory.
if res_files:
covariates = pd.read_table(cov, index_col=0, header=None, squeeze=True)
for fn in res_files:
cov = os.path.join(toutdir, '{}.cov'.format(gene_id))
make_variant_cov(res_files, cov)
res = datetime.datetime.now()
date = re.sub(r'\D', '_', str(res))
name = 'emmax_{}_{}_{}'.format(gene_id, date, os.path.split(out_dy)[1])
fn = os.path.join(out_dy, 'sge_scripts', '{}.sh'.format(name))
with open(fn, 'w') as f:
f.write('#!/bin/bash\n\n')
f.write('#$ -N {}\n'.format(name))
if queue:
f.write('#$ -l {}\n'.format(queue))
f.write('#$ -l h_vmem={}G\n'.format(mem / ncores))
f.write('#$ -pe smp {}\n'.format(ncores))
f.write('#$ -S /bin/bash\n')
f.write('#$ -o {}/{}.out\n'.format(
os.path.join(out_dy, 'logs'), name))
f.write('#$ -e {}/{}.err\n\n'.format(
os.path.join(out_dy, 'logs'), name))
f.write('module load cardips/1\n')
f.write('source activate cie\n\n')
c = 'python {} \\\n\t'.format(os.path.join(ciepy.root, 'scripts', 'run_emmax.py'))
c += ' \\\n\t'.join([
gene_id,
'{},{},{}'.format(vcf, gs_vcf, lumpy_vcf),
regions,
exp,
samples,
kin,
toutdir,
'-c {}'.format(cov),
'-i {}'.format(min_perm),
'-a {}'.format(max_perm),
'-m 0.01',
])
f.write(c + '\n\n')
subprocess.check_call('qsub {}'.format(fn), shell=True)
#print(fn)
def get_jobs():
running = !qstat -r
running = [x for x in running if 'emmax_' in x]
if len(running) > 0:
jnames = [x.split()[-1] for x in running if 'jobname' in x]
status = [x.split()[4] for x in running if 'cdeboever' in x]
queue = [x.split()[-2].split('@')[0] for x in running if 'cdeboever' in x]
genes = [x.split('_')[1] for x in jnames]
out_dys = ['_'.join(x.split('_')[9:]) for x in jnames]
submit_time = ['_'.join(x.split('_')[2:-1]) for x in jnames]
gene_out_dy = ['{}_{}'.format(genes[x], out_dys[x]) for x in range(len(genes))]
jobs = pd.DataFrame({'status':status, 'gene_id':genes, 'out_dy':out_dys, 'out_dy':out_dys,
'submit_time':submit_time, 'queue':queue, 'gene_out_dy':gene_out_dy},
index=jnames)
jobs.ix[jobs.status != 'r', 'queue'] = np.nan
jobs.sort_values(by='submit_time', inplace=True)
return jobs
else:
return None
def get_gene_status(jobs, out_dy):
dy = os.path.split(out_dy)[1]
fns = glob.glob(os.path.join(out_dy, 'gene_results', '*'))
exist = ['{}_{}'.format(os.path.split(x)[1], dy) for x in fns]
fns = glob.glob(os.path.join(out_dy, 'gene_results', '*', 'permuted_reml.tsv'))
finished = ['{}_{}'.format(x.split('/')[-2], dy) for x in fns]
finished = pd.DataFrame(True, columns=['gene_finished'], index=finished)
exist = pd.DataFrame(True, columns=['gene_exist'], index=exist)
if jobs is not None:
jobs = jobs.merge(finished, left_on='gene_out_dy', right_index=True, how='left')
jobs.ix[jobs.gene_finished.isnull(), 'gene_finished'] = False
gstatus = exist.join(finished, how='outer')
gstatus.ix[gstatus.gene_finished.isnull(), 'gene_finished'] = False
gstatus['job_running'] = False
if jobs is not None:
gstatus.ix[set(gstatus.index) & set(jobs.gene_out_dy), 'job_running'] = True
return jobs, gstatus
def submit_jobs(
todo,
num_to_submit,
out_dy,
res_dys=None,
submit_failed=True,
queue=None,
ncores=4,
mem=16,
samples=os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_samples.tsv'),
cov=os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_sex_only.tsv'),
exp=os.path.join(ciepy.root, 'output', 'eqtl_input', 'tpm_log_filtered_phe_std_norm_peer_resid.tsv'),
min_perm=1000,
max_perm=10000,
):
"""out_dy is the otuput directory where the per-gene output directories
will be stored. If failed == True, re-submit failed jobs."""
# Set up output directories
cpy.makedir(os.path.join(out_dy, 'gene_results'))
cpy.makedir(os.path.join(out_dy, 'sge_scripts'))
cpy.makedir(os.path.join(out_dy, 'logs'))
# Get current jobs and gene statuses.
jobs = get_jobs()
jobs, gstatus = get_gene_status(jobs, out_dy)
exist = glob.glob(os.path.join(out_dy, 'gene_results', '*'))
# Find failed genes, clean them up, and resubmit if desired.
failed = gstatus[(gstatus.gene_finished == False) & (gstatus.job_running == False)]
failed = failed.ix[[x for x in failed.index if os.path.split(out_dy)[1] in x]]
if failed.shape[0] > 0:
failed_genes = [x.split('_')[0] for x in failed.index]
process_failed(failed_genes, out_dy)
if submit_failed:
submit_failed_genes(out_dy, exist, queue=queue)
# Don't submit genes whose output directory already exists.
exist = set([os.path.split(x)[1] for x in exist])
todo = set(todo) - exist
# Remove failed genes from list to submit.
failed_fn = os.path.join(out_dy, 'failed.tsv')
if os.path.exists(failed_fn):
failed = pd.read_table(failed_fn, squeeze=True,
header=None, index_col=0)
todo = list(set(todo) - set(failed.index))
# Submit new jobs.
todo = list(todo)
ind = 0
while (ind < num_to_submit) and (ind < len(todo)):
# Get prior results to use as covariate if needed.
if res_dys:
res_files = []
for dy in res_dys:
if os.path.exists(os.path.join(dy, 'gene_results', todo[ind])):
res_files.append(os.path.join(dy, 'gene_results', todo[ind], todo[ind] + '.tsv'))
else:
res_files = None
run_emmax_sge(todo[ind], out_dy, queue=queue, res_files=res_files, ncores=ncores,
mem=mem, samples=samples, cov=cov, exp=exp, min_perm=min_perm, max_perm=max_perm)
ind += 1
def submit_failed_genes(out_dy, exist, queue=None):
# Submit failed genes with more memory.
failed_fn = os.path.join(out_dy, 'failed.tsv')
if os.path.exists(failed_fn):
failed = pd.read_table(failed_fn, index_col=0,
header=None, squeeze=True)
todo = list(set(failed.index) - set([os.path.split(x)[1] for x in exist]))
for gene in todo:
mem = (failed[gene] + 1) * 16
run_emmax_sge(gene, out_dy, mem=mem, queue=queue)
def process_failed(failed_genes, out_dy):
# Compare new failed genes to file (if it exists) and write to file.
# The file keeps track of how many times a gene has failed so we can boost
# the memory appropriately.
failed_fn = os.path.join(out_dy, 'failed.tsv')
if os.path.exists(failed_fn):
failed = pd.read_table(failed_fn, squeeze=True,
header=None, index_col=0)
if len(set(failed.index) & set(failed_genes)) > 0:
failed[list(set(failed.index) & set(failed_genes))] += 1
t = pd.Series(1, index=list(set(failed_genes) - set(failed.index)))
failed = pd.concat([t, failed])
else:
failed = pd.Series(1, index=failed_genes)
failed.to_csv(failed_fn, header=None, sep='\t')
# Remove output directories.
dys = [os.path.join(out_dy, 'gene_results', g) for g in failed_genes]
c = ' ; '.join(['if [ -d "{0}" ]; then rm -r {0} ; fi'.format(dy) for dy in dys])
subprocess.check_call(c, shell=True)
# Delete temp directories if they exist.
dys = ['/dev/shm/{}'.format(g) for g in failed_genes]
s = ' ; '.join(['if [ -d "{0}" ]; then rm -r {0} ; fi'.format(dy) for dy in dys])
c = 'pdsh -g n "{}"'.format(s)
subprocess.check_call(c, shell=True)
def status(out_dy):
jobs = get_jobs()
jobs,gstatus = get_gene_status(jobs, out_dy)
jobs['job_name'] = jobs.index
jobs.index = jobs.gene_out_dy
print(pd.crosstab(gstatus.gene_finished, gstatus.job_running))
In [65]:
out_dy = os.path.join(private_outdir, 'eqtls01')
with open(os.path.join(ciepy.root, 'output', 'eqtl_input', 'eqtl_genes.tsv')) as f:
todo = [x.strip() for x in f.readlines()]
res_dys = None
# num_to_submit = 1100
# submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue=None)
num_to_submit = 400
submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue='opt', mem=8)
In [10]:
2 +
In [199]:
qvalues = pd.read_table(os.path.join(ciepy.root, 'output', 'eqtl_processing', 'eqtls01',
'qvalues.tsv'), index_col=0)
sig = qvalues[qvalues.perm_sig]
out_dy = os.path.join(private_outdir, 'eqtls02')
res_dys = [os.path.join(private_outdir, 'eqtls01')]
cpy.makedir(out_dy)
todo = list(set(sig.index))
# num_to_submit = 2200 - 20
# submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue=None)
num_to_submit = 20
submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue='opt', mem=8)
In [426]:
2 +
In [222]:
qvalues = pd.read_table(os.path.join(ciepy.root, 'output', 'eqtl_processing', 'eqtls02',
'qvalues.tsv'), index_col=0)
sig = qvalues[qvalues.perm_sig]
out_dy = os.path.join(private_outdir, 'eqtls03')
res_dys = [os.path.join(private_outdir, 'eqtls01'), os.path.join(private_outdir, 'eqtls02')]
cpy.makedir(out_dy)
todo = list(set(sig.index) -
set([os.path.split(x)[1] for x in glob.glob(os.path.join(out_dy, '*'))]))
# num_to_submit = 200
# submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue=None)
num_to_submit = 50
submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue='opt', mem=8)
In [422]:
3 +
In [15]:
qvalues = pd.read_table(os.path.join(ciepy.root, 'output', 'eqtl_processing', 'eqtls03',
'qvalues.tsv'), index_col=0)
sig = qvalues[qvalues.perm_sig]
out_dy = os.path.join(private_outdir, 'eqtls04')
res_dys = [os.path.join(private_outdir, 'eqtls01'), os.path.join(private_outdir, 'eqtls02'),
os.path.join(private_outdir, 'eqtls02')]
cpy.makedir(out_dy)
todo = list(set(sig.index) -
set([os.path.split(x)[1] for x in glob.glob(os.path.join(out_dy, '*'))]))
num_to_submit = 60
submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue=None)
num_to_submit = 51
submit_jobs(todo, num_to_submit, out_dy, res_dys=res_dys, submit_failed=True, queue='opt', mem=8)
In [ ]:
2 +
In [226]:
out_dy = out_dy = os.path.join(private_outdir, 'no_peer_no_std_norm01')
cpy.makedir(out_dy)
# with open(os.path.join(ciepy.root, 'output', 'eqtl_input', 'eqtl_genes.tsv')) as f:
# todo = [x.strip() for x in f.readlines()]
# pgenes = ['IDO1', 'LCK', 'POU5F1', 'CXCL5', 'BCL9', 'FGFR1']
pgenes = ['POU5F1', 'CXCL5', 'BCL9', 'FGFR1', 'IDO1']
todo = []
for g in pgenes:
i = gene_info[gene_info.gene_name == g].index[0]
todo.append(i)
res_dys = None
cov_fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_full.tsv')
exp_fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'tpm_log_filtered_phe.tsv')
num_to_submit = len(todo)
submit_jobs(todo, num_to_submit, out_dy, submit_failed=True,
queue='opt', cov=cov_fn, exp=exp_fn, min_perm=0, max_perm=0, ncores=4)
In [213]:
out_dy = os.path.join(private_outdir, 'unrelated_eqtls01')
with open(os.path.join(ciepy.root, 'output', 'eqtl_input', 'eqtl_genes.tsv')) as f:
todo = [x.strip() for x in f.readlines()]
res_dys = None
samples_fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_samples_unrelated.tsv')
cov_fn = os.path.join(ciepy.root, 'output', 'eqtl_input', 'emmax_sex_only_unrelated.tsv')
num_to_submit = 700
submit_jobs(todo, num_to_submit, out_dy, samples=samples_fn, cov=cov_fn,
res_dys=res_dys, submit_failed=True, queue=None)
# num_to_submit = 500
# submit_jobs(todo, num_to_submit, out_dy, samples=samples_fn, cov=cov_fn,
# res_dys=res_dys, submit_failed=True, queue='opt', mem=8)
In [ ]:
2 +
In [ ]: