In [1]:
import collections
In [3]:
Foo = collections.namedtuple('Foo', ('bar', 'baz'))
foo = Foo(None, None)
foo
Out[3]:
In [4]:
bool(-1)
Out[4]:
In [6]:
-3 % 3
Out[6]:
In [129]:
from __future__ import division
import numpy as np
import petl.interactive as etl
import petlx.all
import collections
import operator
import pyfasta
from Bio.Seq import Seq
CodingPosition = collections.namedtuple('CodingPosition',
('gene_id', 'gene_start', 'gene_stop',
'transcript_id', 'transcript_start', 'transcript_stop',
'exon_id', 'exon_start', 'exon_stop',
'strand',
'chrom',
'pos',
'ref',
'ref_start',
'ref_stop',
'ref_cds_start',
'ref_cds_stop'))
CodonChange = collections.namedtuple('CodonChange',
CodingPosition._fields + ('alt', 'ref_codon', 'alt_codon', 'codon_change'))
AminoAcidChange = collections.namedtuple('AminoAcidChange',
CodonChange._fields + ('aa_pos', 'ref_aa', 'alt_aa', 'aa_change'))
class EffectPredictor(object):
def __init__(self, fasta_fn, gff3_fn,
gff3_id_attribute='ID',
gff3_parent_attribute='Parent',
gff3_gene_types=['gene', 'pseudogene'],
gff3_transcript_types=['mRNA', 'pseudogenic_transcript'],
gff3_exon_types=['CDS', 'pseudogenic_exon']):
"""Create a variant effect predictor.
Parameters
----------
fasta_fn : string
Path to reference genome FASTA file.
gff3_fn : string
Path to genome annotations GFF3 file.
gff3_id_attribute : string, optional
Name of attribute containing feature identifier.
gff3_parent_attribute : string, optional
Name of attribute containing parent feature identifier.
gff3_gene_types : list of strings, optional
Feature types to consider as genes.
gff3_transcript_types : list of strings, optional
Feature types to consider as transcripts.
gff3_exon_types : list of strings, optional
Feature types to consider as exons.
"""
# store parameters
self._fasta_fn = fasta_fn
self._gff3_fn = gff3_fn
self._gff3_id_attribute = gff3_id_attribute
self._gff3_parent_attribute = gff3_parent_attribute
self._gff3_gene_types = gff3_gene_types
self._gff3_transcript_types = gff3_transcript_types
self._gff3_exon_types = gff3_exon_types
# setup reference sequence
self._fasta = pyfasta.Fasta(fasta_fn)
# setup access to GFF3 as a table
self._tbl_features = (etl
.fromgff3(gff3_fn)
.unpackdict('attributes', [gff3_id_attribute, gff3_parent_attribute])
.rename({gff3_id_attribute: 'feature_id', gff3_parent_attribute: 'parent_id'})
)
# TODO remove hack for testing
self._tbl_features = self._tbl_features.eq('seqid', 'Pf3D7_01_v3')
# index features by ID
self._lkp_feature = self._tbl_features.recordlookupone('feature_id')
# index features by parent ID
self._lkp_children = self._tbl_features.recordlookup('parent_id')
# index features by genomic location
self._interval_trees = self._tbl_features.facetintervalrecordlookup('seqid', 'start', 'end', proximity=1)
def get_feature(self, feature_id):
"""Look up a feature by ID."""
return self._lkp_feature[feature_id]
def get_feature_children(self, feature_id):
"""Look up the children of a feature."""
return self._lkp_children[feature_id]
def find_features(self, chrom, start, stop=None):
"""Look up features overlapping a given genomic location."""
if stop is None:
stop = start
return self._interval_trees[chrom].find(start, stop)
def get_coding_positions(self, chrom, pos, ref):
"""Calculate position of the reference allele relative to the start
of gene coding sequences.
Parameters
----------
chrom : string
Chromosome.
pos : int
Position.
ref : string
Reference allele.
Returns
-------
Generator of `CodingPosition`s.
"""
# ensure types and case
ref = str(ref).upper()
# convenience
fasta = self._fasta
# obtain start and end coordinates of the reference allele
# N.B., use one-based inclusive coordinate system (like GFF3) throughout
ref_start = pos
ref_stop = pos + len(ref) - 1
# check the reference allele matches the reference sequence
ref_seq = fasta.sequence({'chr': chrom,
'start': ref_start,
'stop': ref_stop})
ref_seq = str(ref_seq).lower()
assert ref_seq == ref.lower(), 'reference allele does not match reference sequence, expected %r, found %r' % (ref_seq, ref.lower())
# find overlapping features
overlapping_features = self.find_features(chrom, ref_start, ref_stop)
# filter to find overlapping exons
overlapping_exons = [f for f in overlapping_features
if f.type in self._gff3_exon_types]
for overlapping_exon in overlapping_exons:
strand = overlapping_exon.strand
transcript = self.get_feature(overlapping_exon.parent_id)
gene = self.get_feature(transcript.parent_id)
# find all exons in transcript
exons = self.get_feature_children(transcript.feature_id)
if strand == '+':
# sort exons
exons = sorted(exons, key=operator.itemgetter('start'))
# find index of overlapping exons in all exons
exon_ids = [exon.feature_id for exon in exons]
exon_index = exon_ids.index(overlapping_exon.feature_id)
# find offset
offset = sum([exon.end - exon.start + 1 for exon in exons[:exon_index]])
# find ref cds start position relative to overlapping exon
if ref_start < overlapping_exon.start:
ref_cds_start = -1
else:
ref_cds_start = offset + (ref_start - overlapping_exon.start)
# find ref cds stop position relative to overlapping exon
if ref_stop > overlapping_exon.end:
ref_cds_stop = -1
else:
ref_cds_stop = offset + (ref_stop - overlapping_exon.start)
else:
# sort exons (backwards this time)
exons = sorted(exons, key=operator.itemgetter('end'), reverse=True)
# find index of overlapping exons in all exons
exon_ids = [exon.feature_id for exon in exons]
exon_index = exon_ids.index(overlapping_exon.feature_id)
# find offset
offset = sum([exon.end - exon.start + 1 for exon in exons[:exon_index]])
# find ref cds start position relative to overlapping exon
# N.B., have to think back-to-front
if ref_stop > overlapping_exon.end:
ref_cds_start = -1
else:
ref_cds_start = offset + (overlapping_exon.end - ref_stop)
# find ref cds stop position relative to overlapping exon
# N.B., have to think back-to-front
if ref_start < overlapping_exon.start:
ref_cds_stop = -1
else:
ref_cds_stop = offset + (overlapping_exon.end - ref_start)
yield CodingPosition(gene.feature_id, gene.start, gene.end,
transcript.feature_id, transcript.start, transcript.end,
overlapping_exon.feature_id, overlapping_exon.start, overlapping_exon.end,
strand,
chrom,
pos,
ref,
ref_start,
ref_stop,
ref_cds_start,
ref_cds_stop)
def get_codon_changes(self, chrom, pos, ref, alt):
"""Calculate the codon changes resulting from a given variant.
Parameters
----------
chrom : string
Chromosome.
pos : int
Position.
ref : string
Reference allele.
alt : string
Alternate allele.
Returns
-------
Generator of `CodonChange`s.
"""
# ensure types and case
ref = str(ref).upper()
alt = str(alt).upper()
# convenience
fasta = self._fasta
for coding_position in self.get_coding_positions(chrom, pos, ref):
# convenience
strand = coding_position.strand
ref_start = coding_position.ref_start
ref_stop = coding_position.ref_stop
ref_cds_start = coding_position.ref_cds_start
ref_cds_stop = coding_position.ref_cds_stop
if ref_cds_start >= 0 and ref_cds_stop >= 0:
# lookup exon
exon = self.get_feature(coding_position.exon_id)
if strand == '+':
# calculate position of reference allele start within codon
ref_start_phase = ref_cds_start % 3
# obtain any previous nucleotides to complete the first codon
prefix = fasta.sequence({'chr': chrom,
'start': ref_start - ref_start_phase,
'stop': ref_start - 1})
prefix = str(prefix).lower()
# begin constructing reference and alternate codon sequences
ref_codon = prefix + ref
alt_codon = prefix + alt
# obtain any subsequence nucleotides to complete the last codon
if len(ref_codon) % 3:
ref_stop_phase = len(ref_codon) % 3
suffix = fasta.sequence({'chr': chrom,
'start': ref_stop + 1,
'stop': ref_stop + 3 - ref_stop_phase})
suffix = str(suffix).lower()
ref_codon += suffix
if len(alt_codon) % 3:
alt_stop_phase = len(alt_codon) % 3
suffix = fasta.sequence({'chr': chrom,
'start': ref_stop + 1,
'stop': ref_stop + 3 - alt_stop_phase})
suffix = str(suffix).lower()
alt_codon += suffix
else:
# N.B., we are on the reverse strand, so position reported for
# variant is actually position at the *end* of the reference allele
# which is particularly important for deletions
# we will construct everything for the forward strand (i.e., back-to-front)
# then take reverse complement afterwards at the end of this code block
# calculate position of reference allele start within codon
ref_start_phase = ref_cds_start % 3
# obtain any previous nucleotides to complete the first codon
prefix = fasta.sequence({'chr': chrom,
'start': ref_stop + 1,
'stop': ref_stop + ref_start_phase})
prefix = str(prefix).lower()
# begin constructing reference and alternate codon sequences
ref_codon = ref + prefix
alt_codon = alt + prefix
# obtain any subsequence nucleotides to complete the last codon
if len(ref_codon) % 3:
ref_stop_phase = len(ref_codon) % 3
suffix = fasta.sequence({'chr': chrom,
'start': ref_start - 3 + ref_stop_phase,
'stop': ref_start - 1})
suffix = str(suffix).lower()
ref_codon = suffix + ref_codon
if len(alt_codon) % 3:
alt_stop_phase = len(alt_codon) % 3
suffix = fasta.sequence({'chr': chrom,
'start': ref_start - 3 + alt_stop_phase,
'stop': ref_start - 1})
suffix = str(suffix).lower()
alt_codon = suffix + alt_codon
# take reverse complement
ref_codon = str(Seq(ref_codon).reverse_complement())
alt_codon = str(Seq(alt_codon).reverse_complement())
codon_change = '%s/%s' % (ref_codon, alt_codon)
yield CodonChange(*(coding_position + (alt, ref_codon, alt_codon, codon_change)))
def get_amino_acid_changes(self, chrom, pos, ref, alt):
"""Calculate the amino acid changes resulting from a given variant.
Parameters
----------
chrom : string
Chromosome.
pos : int
Position.
ref : string
Reference allele.
alt : string
Alternate allele.
Returns
-------
Generator of `AminoAcidChange`s.
"""
for codon_change in self.get_codon_changes(chrom, pos, ref, alt):
ref_aa = str(Seq(codon_change.ref_codon).translate())
alt_aa = str(Seq(codon_change.alt_codon).translate())
aa_pos = (codon_change.ref_cds_start // 3) + 1
aa_change = '%s%s%s' % (ref_aa, aa_pos, alt_aa)
yield AminoAcidChange(*(codon_change + (aa_pos, ref_aa, alt_aa, aa_change)))
def get_amino_acid_change(self, chrom, pos, ref, alt, transcript_id):
"""TODO"""
coding_pos = self.get_coding_
# ensure types and case
ref = str(ref).upper()
alt = str(alt).upper()
# convenience
fasta = self._fasta
# look up transcript
transcript = self.get_feature(transcript_id)
# obtain start and end coordinates of the reference allele
# N.B., use one-based inclusive coordinate system (like GFF3) throughout
ref_start = pos
ref_stop = pos + len(ref) - 1
# check the reference allele matches the reference sequence
ref_seq = fasta.sequence({'chr': chrom,
'start': ref_start,
'stop': ref_stop})
ref_seq = str(ref_seq).lower()
assert ref_seq == ref.lower(), 'reference allele does not match reference sequence, expected %r, found %r' % (ref_seq, ref.lower())
if ref_start < transcript.start or ref_stop > transcript.end:
raise Exception('reference allele does not overlap transcript or overlaps transcript boundary')
else:
def get_effects(chrom, pos, ref, alt):
"""TODO"""
# ensure types and case
ref = str(ref).upper()
alt = str(alt).upper()
# convenience
fasta = self._fasta
# obtain start and end coordinates of the reference allele
# N.B., use one-based inclusive coordinate system (like GFF3) throughout
ref_start = pos
ref_stop = pos + len(ref) - 1
# check the reference allele matches the reference sequence
ref_seq = fasta.sequence({'chr': chrom,
'start': ref_start,
'stop': ref_stop})
ref_seq = str(ref_seq).lower()
assert ref_seq == ref.lower(), 'reference allele does not match reference sequence, expected %r, found %r' % (ref_seq, ref.lower())
# find overlapping genome features
overlapping_features = self.find_features(chrom, ref_start, ref_stop)
# filter to find overlapping genes
genes = [f for f in overlapping_features
if f.type in self._gff3_gene_types]
if not genes:
yield EffectIntergenic()
else:
for gene in genes:
transcripts = self.get_children(gene.feature_id)
if not transcripts:
yield EffectIntragenic(gene.feature_id)
else:
for transcript in transcripts:
if ref_start >= transcript.start and ref_stop <= transcript.end:
aa_change = self._get_amino_acid_change(chrom, pos, ref, alt, transcript)
if aa_change.ref_cds_start < 0 and aa_change.ref_cds_stop < 0:
yield EffectIntronic(*aa_change)
elif ref_cds_start < 0 or ref_cds_stop < 0:
yield EffectNotImplemented(*aa_change)
else:
if len(ref) == len(alt) == 1:
# SNPs
if aa_change.ref_aa == aa_change.alt_aa:
yield EffectSynonymousCoding(*aa_change)
elif aa_change.ref_aa == 'M' and ref_cds_start == 0:
yield EffectStartLost(*aa_change)
elif aa_change.ref_aa == '*':
yield EffectStopLost(*aa_change)
elif aa_change.alt_aa == '*':
yield EffectStopGained(*aa_change)
else:
yield EffectNonSynonymousCoding(*aa_change)
else:
# TODO INDELs
else:
yield EffectNotImplemented(gene.feature_id, transcript.feature_id)
In [229]:
eff._tbl_features.counts('type').displayall()
In [130]:
fasta_fn = '../../../data/genome/sanger/version3/September_2012/Pf3D7_v3.fa'
gff3_fn = '../../../data/genome/sanger/version3/September_2012/Pf3D7_v3.gff'
eff = EffectPredictor(fasta_fn, gff3_fn)
In [131]:
import pprint
pp = pprint.PrettyPrinter(indent=1, width=80, depth=3)
def dbg(results):
pp.pprint([x._asdict().items() for x in results])
In [132]:
eff._tbl_features.eq('type', 'gene').rowslice(15, 19)
Out[132]:
In [133]:
eff._fasta['Pf3D7_01_v3'][98815:98824]
Out[133]:
In [134]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 98818, 'TAT'))
In [135]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 98819, 'A'))
In [136]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 98820, 'T'))
In [137]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 98821, 'G'))
In [138]:
eff._fasta['Pf3D7_01_v3'][99010:99016]
Out[138]:
In [139]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 99013, 'T'))
In [140]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 99013, 'TG'))
In [141]:
eff._fasta['Pf3D7_01_v3'][115796:115799]
Out[141]:
In [142]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 115799, 'T'))
In [143]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 115798, 'A'))
In [144]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 115797, 'C'))
In [145]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 115799, 'TA'))
In [146]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 111338, 'T'))
In [147]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 111337, 'C'))
In [148]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 111337, 'CT'))
In [149]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 98819, 'A', 'T'))
In [150]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 98820, 'T', 'C'))
In [152]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 98821, 'G', 'C'))
In [153]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 98822, 'A', 'C'))
In [154]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 98819, 'A', 'ATTT'))
In [155]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 98820, 'T', 'TAAA'))
In [156]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 98819, 'ATG', 'A'))
In [158]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 98820, 'TGAG', 'T'))
In [159]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 98819, 'AT', 'GG'))
In [160]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 98820, 'TG', 'CC'))
In [161]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 98821, 'GA', 'CC'))
In [199]:
eff._fasta['Pf3D7_01_v3'][257823:257830]
Out[199]:
In [198]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 257826, 'G', 'GATTATTATTATTATT'))
In [197]:
vcf_fn = '../../../data/public/20141022/3d7_hb3.combined.final.vcf.gz'
tbl = (etl
.fromvcf(vcf_fn, samples=None)
.unpackinfo('EFF')
.addfield('my_eff', lambda row: list(eff.get_amino_acid_changes(row.CHROM, row.POS, row.REF, row.ALT[0])))
.convert(['EFF', 'my_eff'], lambda v: v[0] if v else None)
.addfield('eff_effect', lambda row: row.EFF.split('(')[0] if row.EFF is not None else None)
.addfield('eff_codon_change', lambda row: row.EFF.split('|')[2] if row.EFF is not None else None)
.addfield('my_codon_change', lambda row: row.my_eff.codon_change if row.my_eff is not None else None)
.addfield('eff_a_change', lambda row: row.EFF.split('|')[3] if row.EFF is not None else None)
.addfield('my_aa_change', lambda row: row.my_eff.aa_change if row.my_eff is not None else None)
.cutout('ID', 'my_eff', 'QUAL', 'FILTER', 'EFF')
.selectnotin('eff_effect', ['INTERGENIC', 'INTRON'])
)
tbl.display(200)
In [200]:
eff._tbl_features.eq('type', 'gene').display(20)
In [201]:
Seq(eff._fasta['Pf3D7_01_v3'][115793:115802])
Out[201]:
In [202]:
Seq(eff._fasta['Pf3D7_01_v3'][115793:115802]).reverse_complement()
Out[202]:
In [206]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 115799, 'T'))
In [208]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 115798, 'A'))
In [209]:
dbg(eff.get_coding_positions('Pf3D7_01_v3', 115797, 'C'))
In [212]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115799, 'T', 'C'))
In [213]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115798, 'A', 'X'))
In [214]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115797, 'C', 'X'))
In [216]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115796, 'T', 'C'))
In [218]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115799, 'T', 'TCGG'))
In [219]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115798, 'A', 'AN'))
In [220]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115798, 'A', 'AXX'))
In [221]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115798, 'A', 'AXXX'))
In [222]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115796, 'TCAT', 'T'))
In [223]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115796, 'TCAT', 'TCA'))
In [224]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115796, 'TCAT', 'TC'))
In [225]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115796, 'TCA', 'T'))
In [226]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115796, 'TC', 'T'))
In [227]:
dbg(eff.get_amino_acid_changes('Pf3D7_01_v3', 115795, 'TTCA', 'T'))