In [2]:
from collections import OrderedDict
class BAMHeader (object):
def __init__ (self):
# Init the header dict with a minimal header line
self.header = {'HD':{'VN': '1.5', 'SO':"unknown"}, 'SQ':[]}
# Init enpty dict fo conversion between rname and refid
self.name_to_id = OrderedDict()
self.id_to_name = OrderedDict()
self.current_id = 0
def add_header_line (self, line):
# Try except bloc to skip to the next line in case a non standard line is found
try:
split_line = line.strip().split("\t")
tag = split_line[0].lstrip("@")
# In case a HD tag is found will replace the minimal header
if tag == "HD":
for field in split_line[1:]:
key, value = field.split(":")
self.header["HD"][key] = value
# In case a SQ tag is found = build a dict to have correspondance between id and refname
elif tag == "SQ":
d={}
for field in split_line[1:]:
key, value = field.split(":")
d[key] = int(value) if value.isdigit() else value
# append to the list of SQ lines
self.header[tag].append(d)
# append to the dict of correspondance
self.name_to_id [d['SN']] = self.current_id
self.id_to_name [self.current_id] = d['SN']
self.current_id += 1
# For all the other tags, create an entry with a list of values found
else:
d ={}
for field in split_line[1:]:
key, value = field.split(":")
d[key] = int(value) if value.isdigit() else value
if tag not in self.header:
self.header[tag]=[d]
else:
self.header[tag].append(d)
except Exception as E:
print E
print "Invalid sequence line in bam: {}".format(line)
def rname_to_refid (self, name):
"""return the id corresponding to the reference sequence name (pysam compatible)"""
if name == "*":
return -1
else:
return self.name_to_id[name]
def refid_to_refname (self, id):
"""return the name corresponding to the reference sequence id (pysam compatible)"""
if id == -1:
return "*"
else:
return self.id_to_name[id]
In [13]:
import pysam
from collections import OrderedDict
from re import compile as recompile
class BAMSequenceParser (object):
# Init method
def __init__ (self, header, skip_secondary=True):
# Store defined variables
self.header = header
self.skip_secondary = skip_secondary
# counters
self.count = {"all":0, "invalid":0, "primary":0, "secondary":0}
# For cigar to tuple code conversion
self.cigar_to_code = {'M':0, 'I':1, 'D':2, 'N':3, 'S':4, 'H':5, 'P':6, '=':7, 'X':8}
self.cigar_regex = recompile("(\d+)([MIDNSHP=X])")
def __str__ (self):
return "".join(["{}\t{}\n".format(i,j) for i, j in self.count.items()])
def parse_line (self, line):
self.count["all"] += 1
# Try bloc to skip to the next line in case a non standard line is found
try:
# Split the line and verify the number of fields
split_line = line.strip().split("\t")
assert len (split_line) >= 11, "Invalid number of field in bam aligned sequence"
# Init an AlignedSegment
read = Read(
qname = split_line[0],
flag = int(split_line[1]),
rname = self.header.rname_to_refid(split_line[2]),
pos = int(split_line[3])-1,
mapq = int(split_line[4]),
cigar = self.cigarstring_to_tuple(split_line[5]),
rnext = self.header.rname_to_refid(split_line[6]),
pnext = int(split_line[7])-1,
seq = split_line[9],
qual = pysam.fromQualityString(split_line[10]),
tags = self.parse_tags(split_line [11:]) if len (split_line) >= 12 else tuple())
# skip the read if secondary and required
if read.is_secondary:
self.count["secondary"] += 1
if self.skip_secondary:
return None
# finally return the read
self.count["primary"] += 1
return read
except Exception as E:
print E
print "Invalid sequence line in bam: {}".format(line)
self.count["invalid"] += 1
return None
def cigarstring_to_tuple (self, cigarstring):
"""Transform the cigar string in a pysam compatible tuple"""
if cigarstring == "*":
return None
else:
return tuple([(self.cigar_to_code[y], int(x)) for x, y in self.cigar_regex.findall(cigarstring)])
def parse_tags (self, tags_split):
"""Parse the optional tag fields"""
tags = []
for tag in tags_split:
tag_split = tag.split(":")
key = tag_split[0]
value = int(tag_split[2]) if tag_split[1] == 'i' else tag_split[2]
tags.append((key, value,))
return tuple(tags)
class Read (object):
"""Wrapping class over a pysam AlignedSegment to transform its initialisation"""
def __init__ (self, qname, flag, rname, pos, mapq, cigar, rnext, pnext, seq, qual, tags):
"""Create a pysam.AlignedSegment object and define fields"""
self.read = pysam.AlignedSegment()
self.read.query_name = qname
self.read.flag = flag
self.read.reference_id = rname
self.read.reference_start = pos
self.read.mapping_quality = mapq
self.read.cigar = cigar
self.read.next_reference_id = rnext
self.read.next_reference_start = pnext
self.read.template_length = self.read.infer_query_length() if self.read.cigar else 0
self.read.query_sequence = seq
self.read.query_qualities = qual
self.read.tags = tags
self.is_secondary = self.read.is_secondary
def is_properly_mapped(self, min_mapq, min_match_size):
"""Is properly mapped if mapped on e sequence with a mapq score higher the min_mapq
and a template lenght higher than min_match_size"""
return (self.read.tid != -1 and self.read.mapq >= min_mapq and self.read.template_length >= min_match_size)
def to_bam (self):
return self.read
def to_fastq (self):
return "{}\n{}\n+\n{}\n".format(self.read.qname, self.read.seq, self.read.qual)
In [16]:
import pysam
from gzip import open as gopen
min_mapq = 10
min_match_size = 25
with open ("./test/test.sam", "r") as sam:
# Parse bam header
h = BAMHeader ()
for line in sam:
# Add to BAMheader object if the line is a header line
if line.startswith("@"):
h.add_header_line(line)
# Exit when at end of header
else:
break
# Create an output bam file for mapped reads and an ouput fastq file for unmapped reads
with \
pysam.AlignmentFile("out.sam", "wh", header=h.header) as bam,\
open ("out.fastq", "w") as fastq:
# Initialize a bam sequence parser
bam_parser = BAMSequenceParser (header = h, skip_secondary = True)
# Process the first sequence found when parsing header
read = bam_parser.parse_line(line)
if read:
if read.is_properly_mapped(min_mapq, min_match_size):
bam.write(read.to_bam())
else:
fastq.write(read.to_fastq())
# Process the remaining sequences
for line in sam:
read = bam_parser.parse_line(line)
if read:
if read.is_properly_mapped(min_mapq, min_match_size):
bam.write(read.to_bam())
else:
fastq.write(read.to_fastq())
print (bam_parser)