In [ ]:
# Some Illumina reads from ancient DNA samples were found to contain an inverted repeat
# with a different sequence inbetween
# in other words, the first x bases of a read are the reverse complement of the last x bases
# This script is meant to clip the 3' part of an inverted repeat when present
# A new fastq file is generated mentioning in the sequences ID which sequence was clipped, if any
# Two metrics files on the repeats found (and clipped) are produced as well
# When an entire sequence is its own reverse complement, this does not get clipped
# but a mention is made sequence identifier and
# these are also reported in the metrics file
#
# Written by Lex Nederbragt, with input from Bastiaan Star
# Version 1.0 release candidate 1, May 2013
#
# requires biopython, os and argparse modules
# on the UiO supercomputer "Abel", needs 'module load python2'
#
# run as 'python script_name.py -h' for instructions
In [ ]:
from Bio import Seq, SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.Alphabet import IUPAC
import os
import argparse
In [ ]:
# help text and argument parser
desc = '\n'.join(["Strips off the 3' copy of an terminal inverted repeat, when present.",
"Input: one fastq file.",
"Output:",
"1) a new fastq file with cleaned sequences: 'infile.fastq' gives 'infile.clean.fastq'",
"2) a file called 'infile.inv_reps.txt' with the stripped sequences and their counts",
"3) a file called 'infile.inv_rep_lengths.txt' with the length distribution of the stripped sequences.",
"An optional argument -s/--shortest_length can be used to set the minumum length of repeat to clip (default: 4 bp)"
])
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('-i','--input', help='Input file name',required=True)
parser.add_argument('-s', '--shortest_length', help='Shortest repeat length to clip', type=int, default=4, required = False)
In [ ]:
def get_outfnames(infile):
"""Adds '.clean' to input filename (before the extension).
Example: 'infile.fastq' becomes 'infile.clean.fastq'"""
out_fnames = []
in_fname = os.path.basename(infile)
[in_fname, in_ext] = os.path.splitext(in_fname)
# fastq outfile
out_fnames.append(in_fname + '.clean' + in_ext)
# inv_rep sequences + counts
out_fnames.append(in_fname + '.inv_reps.txt')
# inv_rep lengths + counts
out_fnames.append(in_fname + '.inv_rep_lengths.txt')
return out_fnames
In [ ]:
def find_inv_repeat(seq, seq_rc):
"""Check for inverted repeat:
whether first x bases are the reverse complement of the last x bases
Returns starting position of the inverted repeat (or 0)"""
inv_rep_length = 0
# need to test whether seq is a reverse complement of itself
if seq == seq_rc:
inv_rep_length = len(seq)
else:
# check if first x bases are a reverse complement of the last x bases
for i in range(1, len(seq)):
if seq_rc[-i:] == seq[-i:]:
inv_rep_length = i
else:
break
return inv_rep_length
In [ ]:
def extract_inv_repeat(seq, shortest_length_to_clip):
"""After finding position of inverted repeat - if any -
returns Bio.SeqRecord with the inverted repeated part removed,
and the sequence of the removed part"""
assert shortest_length_to_clip > 0, "Shortest length to remove needs to be larger than zero, not %s" % shortest_length_to_clip
# expects a Bio.SeqRecord.SeqRecord
assert type(seq) == SeqRecord, "Not a sequence record: '%s'" % str(seq)
# get sequence and reverse complement as text format
seq_txt = str(seq.seq)
rc_seq_txt = str(seq.reverse_complement().seq)
# locate the inverted repeat - if any
inv_rep_length = find_inv_repeat(seq_txt, rc_seq_txt)
# process results
if inv_rep_length == len(seq_txt):
# sequence is its own reverse complement
new_seq = seq
inv_rep = seq_txt
new_seq.description += ' self_reverse_complement'
elif inv_rep_length >= shortest_length_to_clip:
# hit
new_seq = seq[:-inv_rep_length]
inv_rep = str(seq[-inv_rep_length:].seq)
new_seq.description += ' cleaned_off_' + inv_rep
else:
# either no hit, or hit shorter than minimum to report
new_seq = seq
inv_rep = ''
return [new_seq, inv_rep, inv_rep_length]
In [ ]:
def test(seq, shortest_length_to_clip):
"""Performs the 'unit tests'"""
result = extract_inv_repeat(SeqRecord(Seq(seq, IUPAC.unambiguous_dna)), shortest_length_to_clip)
return [str(result[0].seq), result[1], result[2]]
In [ ]:
# set of 'unit tests'
# first/last 10 RC of eachother
assert test('AGTCGTAGCTGATGCTTAGGGGCTTACTAGGCTTGAAGCTACGACT', 1) == ['AGTCGTAGCTGATGCTTAGGGGCTTACTAGGCTTGA', 'AGCTACGACT', 10]
# one base
assert test('AGTCGTAGCTGATGCTTAGGGGCTTACTAGGCTTGATGAGGATTAT', 1) == ['AGTCGTAGCTGATGCTTAGGGGCTTACTAGGCTTGATGAGGATTA', 'T', 1]
assert test('AGTCGTAGCTGATGCTTAGGGGCTTACTAGGCTTGATGAGGATTAT', 4) == ['AGTCGTAGCTGATGCTTAGGGGCTTACTAGGCTTGATGAGGATTAT', '', 1]
# no inv_rep
assert test('AGTCGTAGCTGATGCTTAGGGGCTTACTAGGCTTGATGAGGATTAA', 1) == ['AGTCGTAGCTGATGCTTAGGGGCTTACTAGGCTTGATGAGGATTAA', '', 0]
# entire sequence it's own reverse complement
assert test('ACACAGGCCTGTGT', 1) == ['ACACAGGCCTGTGT', 'ACACAGGCCTGTGT', 14]
# empty sequence
assert test('', 4) == ['', '', 0]
In [ ]:
def process(infile, shortest_length_to_clip):
""" Does the actual work:
Goes through the input file and streams the content through the inverted_repeat locator
Collects the new sequences and repeats found and reports them """
# test for existing inut file
assert os.path.exists(infile), "Input file '%s' appears not to exist." %infile
[out_fname, out_rname, out_lname] = get_outfnames(infile)
inv_reps = {}
lengths = {}
total_trimmed = 0
total_skipped = 0
processed = 0
max_rec_to_process = 1e30
print "Processing sequences..."
with open(out_fname, 'w') as out_fh:
for rec in SeqIO.parse(infile, "fastq"):
processed += 1
if len(rec) < 1:
# skip zero-length sequences
total_skipped += 1
continue
new_rec, inv_rep, inv_rep_length = extract_inv_repeat(rec, shortest_length_to_clip)
out_fh.write(new_rec.format("fastq"))
if inv_rep_length >= shortest_length_to_clip:
inv_reps[inv_rep] = inv_reps.get(inv_rep, 0) +1
total_trimmed += 1
lengths[inv_rep_length] = lengths.get(inv_rep_length, 0) +1
if processed == max_rec_to_process:
break
print "Writing summary files..."
with open(out_rname, "w") as p_out_fh:
p_out_fh.write("inverted_repeat\tcount\n")
for p in inv_reps.keys():
p_out_fh.write("%s\t%s\n" %(p, inv_reps[p]))
with open(out_lname, "w") as l_out_fh:
l_out_fh.write("repeat_length\tcount\n")
for l in sorted(lengths.iterkeys()):
l_out_fh.write("%s\t%s\n" %(l, lengths[l]))
print "\nProcessed %i records:\n- skipped %i (because of zero-length)\n- found %i inverted repeat(s)\n" % (processed, total_skipped, total_trimmed)
In [ ]:
if __name__ == "__main__":
args = parser.parse_args()
print ("Input file: %s" % args.input )
print ("Shortest length to clip: %s" % args.shortest_length)
process(args.input, args.shortest_length)