This notebook can be used to generate fake structural variant test data for testing the genome finishing module. Given a source fasta, bam, and paired-end fastqs and insertion parameters, it creates a directory with the following files:
ref.fa
reads.1.fq
reads.2.fq
In [1]:
import sys
from django.core.management import setup_environ
import settings
setup_environ(settings)
Out[1]:
In [2]:
import random
import re
import os
from Bio import SeqIO
import pysam
from genome_finish.millstone_de_novo_fns import get_avg_genome_coverage
# def _make_fake_insertion(ref_endpoints, ins_endpoints):
ref_endpoints = (2930000, 2940000)
ins_endpoints = (2932000, 2933000)
desired_coverage = 40
output_no_insertion_ref = True
test_number = 6
template_dir = '/home/wahern/projects/millstone/genome_designer/test_data/genome_finish_test/mg1655_test/templates'
source_fasta = os.path.join(template_dir, 'mg1655.fa')
source_bam = os.path.join(template_dir, 'lib1_rec07.bwa_align.bam')
source_fq1 = os.path.join(template_dir, 'lib1_rec07.1.fq')
source_fq2 = os.path.join(template_dir, 'lib1_rec07.2.fq')
test_dir = '/home/wahern/projects/millstone/genome_designer/test_data/genome_finish_test/mg1655_test'
output_dir = os.path.join(test_dir, str(test_number))
output_fasta = os.path.join(output_dir, 'ref.fa')
output_fq1 = os.path.join(output_dir, 'reads.1.fq')
output_fq2 = os.path.join(output_dir, 'reads.2.fq')
assert not os.path.exists(output_dir)
# Get sample
if desired_coverage:
coverage = get_avg_genome_coverage(source_bam)
if desired_coverage > coverage:
raise Exception('Desired coverage:' + str(desired_coverage) +
' is greater than the genome\'s average coverage of ' +
str(coverage))
read_sampling_fraction = desired_coverage / coverage
linecount = 0
with open(source_fq1) as fh:
for line in fh:
linecount+=1
include_read = [random.random() < read_sampling_fraction for i in xrange(linecount)]
os.mkdir(output_dir)
# Get source seqrecord
with open(source_fasta, 'r') as source_fasta_fh:
source_seqrecord = SeqIO.parse(source_fasta_fh, 'fasta').next()
output_seqrecord = (
source_seqrecord[ref_endpoints[0]:ins_endpoints[0]] +
source_seqrecord[ins_endpoints[1]:ref_endpoints[1]]
)
# Sanity check
assert len(output_seqrecord) == ref_endpoints[1] - ref_endpoints[0] - (
ins_endpoints[1] - ins_endpoints[0])
# Add some metadata to the header.
output_seqrecord.id = source_seqrecord.id
output_seqrecord.description = source_seqrecord.description + (
', FAKE_SHORT: ' +
'{ref_endpoints:' + str(ref_endpoints) + ', ' +
'ins_endpoints:' + str(ins_endpoints) + '}')
# Write output fasta
with open(output_fasta, 'w') as fh:
SeqIO.write([output_seqrecord], fh, 'fasta')
# Get reads in region
qnames_in_region = {}
source_af = pysam.AlignmentFile(source_bam)
for read in source_af.fetch('NC_000913', ref_endpoints[0], ref_endpoints[1]):
if (not read.is_unmapped and
ref_endpoints[0] <= read.reference_start <= ref_endpoints[1] and
ref_endpoints[0] <= read.reference_end <= ref_endpoints[1]):
qnames_in_region[read.qname] = True
source_af.close()
# Go through fastqs and write reads in ROI to file
p1 = re.compile('@(\S+)')
for input_fq_path, output_fq_path in [(source_fq1, output_fq1), (source_fq2, output_fq2)]:
counter = 0
if desired_coverage:
iterator = iter(include_read)
with open(input_fq_path, 'r') as in_fh, open(output_fq_path, 'w') as out_fh:
for line in in_fh:
m1 = p1.match(line)
if m1:
qname = m1.group(1)
if qname in qnames_in_region:
if desired_coverage:
if iterator.next():
out_fh.write(line)
out_fh.write(in_fh.next())
out_fh.write(in_fh.next())
out_fh.write(in_fh.next())
else:
out_fh.write(line)
out_fh.write(in_fh.next())
out_fh.write(in_fh.next())
out_fh.write(in_fh.next())
if output_no_insertion_ref:
# Get source seqrecord
with open(source_fasta, 'r') as source_fasta_fh:
source_seqrecord = SeqIO.parse(source_fasta_fh, 'fasta').next()
output_seqrecord = (
source_seqrecord[ref_endpoints[0]:ref_endpoints[1]]
)
# Add some metadata to the header.
output_seqrecord.id = source_seqrecord.id
output_seqrecord.description = source_seqrecord.description + (
', FAKE_SHORT: ' +
'{ref_endpoints:' + str(ref_endpoints) + ', ' +
'ins_endpoints:None}')
# Write output fasta
no_ins_fasta = os.path.join(output_dir, 'no_ins_ref.fa')
with open(no_ins_fasta, 'w') as fh:
SeqIO.write([output_seqrecord], fh, 'fasta')
In [ ]: