In [4]:
from collections import defaultdict
import csv
import re
import math
In [5]:
#Files to annotate
genes_gff = "TcIV-X10825.v2.MAKER.gff"
genes_faa = "TcIV-X10825.v2.all.maker.proteins.fasta"
#Annotation files
interpro_gff3 = "TcIV-X10825.v2.InterProScan.gff3"
blastp_tsv = "TcIV-X10825.v2.all.maker.proteins.blastp"
#Outfiles
annotated_gff = "TcIV-X10825.v2.MAKER_ANNOTATED.gff"
annotated_faa = "TcIV-X10825.v2.all.maker.proteins_ANNOTATED.fasta"
In [6]:
#GFF3 constants
SEQID = 0 #Gene ID
SOURCE = 1 #Annotation tool
TYPE = 2
START = 3
END = 4
SCORE = 5
STRAND = 6
PHASE = 7
ATTRIBUTES = 8 #key value pairs
In [7]:
#Extract key_value pairs from the Attributes column
def extract_gff3_attributes(attribs,tag_line):
values = [""] * len(attribs)
tag_fields = tag_line.split(";")
for field in tag_fields:
for idx,attrib in enumerate(attribs):
if attrib in field:
value_start = field.find("=")
values[idx] = field[value_start+1:]
return values
In [8]:
interpro_annot = {}
attributes_to_extract=["signature_desc","Ontology_term","Dbxref"]
#Extract the signature description from Interpro annotation using
with open(interpro_gff3,"r") as interpro_fh:
for line in interpro_fh:
if line[0] != "#" : #ignore comment lines
line_fields = line.rstrip("\n").split("\t")
#Extract from Pfam lines the
if line_fields[SOURCE].lower() == "pfam":
attribs = extract_gff3_attributes(attributes_to_extract,line_fields[ATTRIBUTES])
#Remove the -mRNA(.*)
suffix_position = line_fields[SEQID].rfind("-mRNA")
if suffix_position != -1:
line_fields[SEQID] = line_fields[SEQID][:suffix_position]
interpro_annot[ line_fields[SEQID] ] = attribs
elif line.rstrip("\n") == "##FASTA":
break
In [9]:
k = interpro_annot.keys()[450]
print k
In [10]:
#BLAST column constants
BLAST_QUERY = 0
BLAST_SUBJ = 1
BLAST_PCT_ID = 2
BLAST_ALI_LEN = 3
BLAST_MISMATCH =4 #Mismatches
BLAST_GAPS =5
BLAST_Q_START =6
BLAST_Q_END = 7
BLAST_S_START =8
BLAST_S_END = 9
BLAST_EVAL = 10
BLAST_BITSCORE = 11
blast_col_types = (str,str,float,float,int,int,int,int,int,int,float,float)
In [11]:
def parse_blast_hit(fields):
#Remove "-RNA1" prefix from Query ID
fields[0] = fields[0].split("-mRNA")[0]
#Convert each column to the adequate typet
parsed_fields = [ blast_col_types[idx](f) for idx,f in enumerate(fields) ]
return parsed_fields
In [12]:
def is_a_better_hit(new_hit, original_hit):
return new_hit[BLAST_EVAL] < original_hit[BLAST_EVAL]
def hit_good_enough(blast_hit):
pct_id = blast_hit[BLAST_PCT_ID]
q_coverage = 100.0 * (blast_hit[BLAST_ALI_LEN] - blast_hit[BLAST_GAPS]) / abs( blast_hit[BLAST_Q_END] - blast_hit[BLAST_Q_START] )
e_value = blast_hit[BLAST_EVAL]
good_enough = pct_id > 85.0 and q_coverage > 80.0 and e_value < 0.00001
return good_enough
In [13]:
blast_annot = {}
with open(blastp_tsv,"r") as blastp_fh:
for blast_fields in csv.reader(blastp_fh,delimiter="\t",quotechar='|'):
parsed_hit = parse_blast_hit(blast_fields)
if hit_good_enough(parsed_hit):
if parsed_hit[0] not in blast_annot or is_a_better_hit(parsed_hit,blast_annot[parsed_hit[0]]):
blast_annot[parsed_hit[0]] = parsed_hit
In [14]:
k = blast_annot.keys()[450]
print k
print blast_annot[k]
In [15]:
put_prot_counter = 0
def assign_gene_name(gene_id, interpro_annot, blast_annot):
global put_prot_counter
go = kegg = ""
gene_name = "putative_protein"+str(put_prot_counter)
if gene_id in interpro_annot:
gene_name,go,kegg = interpro_annot[gene_id]
elif gene_id in blast_annot:
gene_name = blast_annot[gene_id][BLAST_SUBJ]
else:
put_prot_counter += 1
return gene_name, go, kegg
In [16]:
def extractGeneIDfromGFFline(line):
gene_id = False
name_match = re.search( r"Name=(.+?)(-mRNA-[0-9]+?(:(exon|cds))?)?(;|$)", line)
if name_match:
gene_id = name_match.group(1)
return gene_id
#Substitutes the Name tag for Interpro or Blast annotation
#Returns
def processGFFLine(line, interpro_annot, blast_annot):
global put_prot_id
line_to_write = line
gene_id = False
gene_annotation = {}
if "\tgene\t" in line:
fields = line.split("\t")
gene_annotation["contig"] = fields[SEQID]
gene_annotation["position"] = str(fields[START])+"-"+str(fields[END])
gene_id = extractGeneIDfromGFFline(fields[ATTRIBUTES])
assert gene_id
gene_annotation["gene_name"],gene_annotation["go"],gene_annotation["kegg"] = assign_gene_name(gene_id,interpro_annot,blast_annot)
line_to_write = re.sub(r"Name=(.+?)(;|$)", "Name='"+gene_annotation["gene_name"]+r"'\g<2>",line)
return (line_to_write, gene_id, gene_annotation)
Extract Contig and position information of the gene
In [17]:
final_annotation = {}
with open(genes_gff,"r") as genes_gff_fh, open(annotated_gff,"w") as gff_out_fh:
for line in genes_gff_fh:
if line[0] == "#":
gff_out_fh.write(line)
else:
new_gff_line, gene_id, gene_annot = processGFFLine(line, interpro_annot,blast_annot)
gff_out_fh.write(new_gff_line)
if gene_id:
final_annotation[gene_id] = gene_annot
gff_out_fh.close()
In [18]:
def extractGeneIDfromFasta(line):
space_pos = line.find(" ")
if space_pos != -1:
gene_id = line[1:space_pos].split("-mRNA")[0]
else:
gene_id = line[1:].split("-mRNA")[0]
return gene_id
def is_fasta_header(line):
return line.startswith(">")
In [19]:
with open(genes_faa,"r") as genes_faa_fh, open(annotated_faa,"w") as faa_out_fh:
for line in genes_faa_fh:
if is_fasta_header(line):
gene_id = extractGeneIDfromFasta(line)
assert gene_id in final_annotation
gene_annotation = final_annotation[gene_id]
new_header = ">"+gene_annotation["contig"]+":"+gene_annotation["position"]+";"+gene_annotation["gene_name"]+";"+gene_annotation["go"]+";"+gene_annotation["kegg"]+"\n"
faa_out_fh.write( new_header )
else:
faa_out_fh.write(line)
faa_out_fh.close()
In [19]: