In [53]:
import networkx as nx
In [54]:
def kmers(string, kmer_len):
"""
A function that takes in a string, and splits it up into k-mers.
Generator function, therefore lazily evaluated. Keeps memory usage low.
"""
i = 0
while i + kmer_len <= len(string):
yield string[i:i+kmer_len]
i += 1
# dbG = nx.DiGraph()
# for s in SeqIO.parse(filtered, 'fastq'):
# for kmer in kmers(s.seq, k):
# n1 = kmer[0:k-1]
# n2 = kmer[1:k]
# if not dbG.has_node(n1):
# dbG.add_node(n1)
# if not dbG.has_node(n2):
# dbG.add_node(n2)
# if not dbG.has_edge(n1, n2):
# dbG.add_edge(n1, n2, sequence=kmer, count=0)
# if dbG.has_edge(n1, n2):
# dbG.edge[n1][n2]['count'] += 1
In [64]:
original = 'CGTTACCTTGCAGGAATTGAGGCCGTCCGTTAATTTCCCTTGCATACATATTGCGTTTTTTTGTCCTTTTATCCGCTCACTTAGAAAAGAGACAGATAGC'
reads = [original[0:20], original[1:21], original[2:22], original[5:25], original[50:70], original[51:71], original[52:72]]
print(original)
k = 10
def generate_kmer_graph(reads, k):
testG = nx.DiGraph()
for read in reads:
for kmer in kmers(read, k):
n1 = kmer[0:k-1]
n2 = kmer[1:k]
if not testG.has_node(n1):
testG.add_node(n1)
if not testG.has_node(n2):
testG.add_node(n2)
if not testG.has_edge(n1, n2):
testG.add_edge(n1, n2, sequence=kmer, count=0)
if testG.has_edge(n1, n2):
testG.edge[n1][n2]['count'] += 1
return testG
testG = generate_kmer_graph(reads, k)
print(testG.nodes())
In [65]:
len([g for g in nx.connected_component_subgraphs(testG.to_undirected())])
Out[65]:
In [66]:
# This block of code verifies that the graph is Eulerian.
n_odd = 0
for n in testG.nodes():
n_neighbors = len(testG.successors(n)) + len(testG.predecessors(n))
if n_neighbors % 2 != 0:
n_odd += 1
print(testG.predecessors(n), n, testG.successors(n))
assert n_odd % 2 == 0, print(n_odd)
In [67]:
def start_node(G):
"""
This function finds the starting node in an Eulerian-verified de Bruijn graph.
The start node should have no predecessors.
"""
for n in G.nodes_iter():
if len(G.predecessors(n)) == 0:
return n
testG = generate_kmer_graph(reads, k)
n = start_node(testG)
print(n)
In [71]:
# This block of code performs the de novo assembly of sequences, to generate
# the final string, from one connected Eulerian graph.
stack = [n]
travG = testG.copy()
sequence = stack[-1]
from random import choice
while stack:
n = stack[-1]
print(n)
print(travG.successors(n))
if travG.successors(n):
s = testG.successors(n)[-1] # assuming only one successor
sequence += s[-1]
if travG.edge[n][s]['count'] == 0:
pass
if travG.edge[n][s]['count'] > 0:
stack.pop(-1)
stack.append(s)
testG.edge[n][s]['count'] += -1
if not travG.successors(n):
break
print(sequence)
print(original)
assert sequence == original
In [ ]:
In [ ]: