In [19]:
import khmer
from khmer.tests import khmer_tst_utils as test_utils
from khmer import reverse_hash
from khmer import reverse_complement
import screed
import random
import itertools
from imp import reload
In [20]:
contig = list(screed.open(test_utils.get_test_data('simple-genome.fa')))[0].sequence
In [21]:
contig
Out[21]:
In [22]:
def mutate_base(base):
if base in 'AT':
return random.choice('GC')
elif base in 'GC':
return random.choice('AT')
else:
assert False, 'bad base'
def mutate_sequence(sequence, N=1):
sequence = list(sequence)
positions = random.sample(range(len(sequence)), N)
for i in positions:
sequence[i] = mutate_base(sequence[i])
return ''.join(sequence)
def mutate_position(sequence, pos):
sequence = list(sequence)
sequence[pos] = mutate_base(sequence[pos])
return ''.join(sequence)
for _ in range(100):
assert 'A' not in mutate_sequence('A' * 10, 10)
assert 'T' not in mutate_sequence('T' * 10, 10)
assert 'C' not in mutate_sequence('C' * 10, 10)
assert 'G' not in mutate_sequence('G' * 10, 10)
assert mutate_position('AAAA', 2) in ['AACA', 'AAGA']
assert mutate_position('TTTT', 2) in ['TTCT', 'TTGT']
assert mutate_position('CCCC', 2) in ['CCAC', 'CCTC']
assert mutate_position('GGGG', 2) in ['GGAG', 'GGTG']
In [23]:
def reads_from_sequence(sequence, L=100, N=100):
positions = list(range(len(sequence) - L))
for i in range(N):
start = random.choice(positions)
yield sequence[start:start+L]
for read in reads_from_sequence(contig):
assert read in contig
for read in reads_from_sequence(contig):
assert mutate_sequence(read) not in contig
In [24]:
def str_tag(tag, K):
return '<Tag {0} {1}>'.format(reverse_hash(tag, K), reverse_complement(reverse_hash(tag, K)))
With the NaiveLabeledAssembler, we expect this configuration to produce only one shortened contig: we get two possible branches with spanning reads, neither of which has coverage of 1 (being a putative error / tip).
In [25]:
K = 21
graph = khmer.Countgraph(K, 1e6, 4)
labeller = khmer._GraphLabels(graph)
graph.consume(contig)
bubble = mutate_position(contig, 100)
reads = list(itertools.chain(reads_from_sequence(contig), reads_from_sequence(bubble)))
random.shuffle(reads)
for n, read in enumerate(reads):
graph.consume(read)
hdns = graph.find_high_degree_nodes(read)
labeller.label_across_high_degree_nodes(read, hdns, n)
In [26]:
paths = labeller.assemble_labeled_path(contig[:K])
print(*[str(len(p)) + ' ' + p for p in paths], sep='\n\n')
Let's try introducing an error in a singe read. This should trip the filter and cause one full length contig to be produced, so long as the coverage at the branch is greater than the arbitrarily selected minimum.
In [27]:
K = 21
graph = khmer.Countgraph(K, 1e6, 4)
labeller = khmer._GraphLabels(graph)
bubble = mutate_position(contig, 100)
reads = itertools.chain(reads_from_sequence(contig, N=50),
[bubble],
reads_from_sequence(contig, N=50))
for n, read in enumerate(reads):
graph.consume(read)
hdns = graph.find_high_degree_nodes(read)
if list(hdns):
print('Read:', n)
print([str_tag(h, K) for h in hdns])
labeller.label_across_high_degree_nodes(read, hdns, n)
In [28]:
paths = labeller.assemble_labeled_path(contig[:K])
print(*[str(len(p)) + ' ' + p for p in paths], sep='\n\n')
In [29]:
nodegraph = khmer.Nodegraph(K, 1e5, 4)
lh = khmer._GraphLabels(nodegraph)
nodegraph.consume(contig)
branch = contig[:120] + 'TGATGGACAG'
nodegraph.consume(branch) # will add a branch
hdn = nodegraph.find_high_degree_nodes(contig)
hdn += nodegraph.find_high_degree_nodes(branch)
print(list(hdn))
lh.label_across_high_degree_nodes(contig, hdn, 1)
lh.label_across_high_degree_nodes(branch, hdn, 2)
print(lh.get_tag_labels(list(hdn)[0]))
paths = lh.assemble_labeled_path(contig[:K])
print([len(x) for x in paths])
len_path = len(paths)
print('len path:', len_path)