In [ ]:
%matplotlib inline
import numpy as np
from matplotlib import pyplot as plt
import gzip
You are studying a protein that binds to DNA. You want to know "rules" that the protein uses to recognize DNA it binds to. You create a library of DNA sequences that looks like the following:
TCGATAACCxxxxxxCCAGACCTCAC
where x is any one of the bases A, C, T, or G. You take a sample of this library and then use high-throughput sequencing to characterize it. These results are in naive.fastq.gz. You then treat the library with your protein, pulling out sequences that bind to your protein. You sequence the sequences that were pulled out. These results are in experimental.fastq.gz.
Write code to read in these libraries. It should:
Q score for any base < 15.TCGATAACCxxxxxxCCAGACCTCAC. (Some sequences might have extra bases inserted or deleted).xxxxxx sequence within the library.Calculate the enrichment of each base at each position in the xxxxxx region. The enrichment should be a 6x4 matrix, where each row is a position (1-6) and each column is a a base (ACTG). For each position, enrichment is defined as the change in the relative frequency of a base at that position. For example, the enrichment for base A at site 1 is:
where $f_{A,1} = \frac{C_{A,1}}{C_{all,1}}$. $C_{A,1}$ is the number of times $A$ is seen at site one; $C_{all,1}$ is the number of times any base is seen at site 1.
According to your analysis, what are the rules for recognition at sites 1-6?
In [ ]:
In [ ]:
# Dictionary mapping ascii value to Q-score
Q_dict = {}
for i in range(33,76):
Q_dict[chr(i)] = i-33
def q_score_check(qual_string,reject_cutoff=15):
"""
Check quality scocre
"""
for s in qual_string:
if Q_dict[s] < reject_cutoff:
return False
return True
def qual_check(sequence):
"""
Make sure sequence has the correct form.
"""
if sequence[0:9] == "TCGATAACC" and sequence[15:] == "CCAGACCTCAC":
return True
return False
def read_fastq_gz(file_name):
"""
Read and process fastq_gz file.
"""
# Read sequences and quality scores from the file
seqs = []
quality = []
line_counter = 0
with gzip.open(file_name) as f:
for l in f:
l_ascii = l.decode("ascii").strip()
if l_ascii[0] == "@":
line_counter = 1
continue
if line_counter == 1:
seqs.append(l_ascii)
line_counter += 1
elif line_counter == 2:
line_counter += 1
elif line_counter == 3:
quality.append(l_ascii)
line_counter = 0
# Count how often each high-quality sequence is seen
total_counts = 0
final = {}
for i in range(len(seqs)):
if q_score_check(quality[i]) and qual_check(seqs[i]):
s = seqs[i][9:15]
try:
final[s] += 1
except KeyError:
final[s] = 1
total_counts += 1
# Convert to counts of each base at each site
count_dict = {"A":0,"C":1,"G":2,"T":3}
counts = np.zeros((6,4),dtype=np.float)
for k in final.keys():
for i, letter in enumerate(k):
counts[i,count_dict[letter]] += final[k]/total_counts
return counts
# Load and caculate enrichment
exp_freqs = read_fastq_gz("experimental.fastq.gz")
naive_freqs = read_fastq_gz("naive.fastq.gz")
E = exp_freqs/naive_freqs
print(E)
In [ ]:
class LibGen:
def __init__(self,template="TCGATAACC{}CCAGACCTCAC"):
"""
Generate a fastq file that looks like a sequencing run.
"""
self._template = template[:]
self._create_model()
def _create_model(self):
"""
The array self._model should be an N x 4 array, where N
is the number of sites to generate. Each row should add
to 1.0. The row encodes the relative probabilities of
A,C,G, and T at that site.
"""
model = np.ones((1,4),dtype=np.float)
self._model = (model.T/np.sum(model,axis=1)).T
def create_sample(self,
q_slope_param=(0,0.2),
q_intercept_param=(25,1),
q_noise_param=(0,0.5),
indel_rate=0.001):
"""
Generate a random sample. The quality score decays linearly
with sequence length. Slope and intercepts are drawn from
distributions encoded by q_slope_param and q_intercept_param.
These are tuples of mean and standard deivation. q_norm_param
encodes random noise to be added to each quality score. The
indel_rate is the rate at which random indels will be
incorporated.
"""
# Generate a sequence according to self._model
tmp = []
for i in range(self._model.shape[0]):
tmp.append(np.random.choice(["A","C","G","T"],p=self._model[i,:]))
sequence = list(self._template.format("".join(tmp)))
# Add random indels
if np.random.random() < indel_rate:
index = np.random.choice(range(len(sequence)))
if np.random.random() > 0.5:
sequence.insert(index,np.random.choice(["A","T","G","C"]))
sequence = sequence[:-1]
else:
sequence.pop(index)
sequence.append(np.random.choice(["A","T","G","C"]))
# Generate Q scores
q_slope = -np.abs(np.random.normal(q_slope_param[0],q_slope_param[1]))
q_intercept = np.random.normal(q_intercept_param[0],q_intercept_param[1])
q_noise = np.random.normal(q_noise_param[0],q_noise_param[1],len(sequence))
Q = np.round(np.arange(len(sequence))*q_slope + q_intercept + q_noise,0)
# Now add noise accoring to Q-scores
p = 10**(-Q/10.)
for i in range(len(sequence)):
if np.random.random() < p[i]:
possibilities = ["A","C","G","T"]
possibilities.remove(sequence[i])
sequence[i] = np.random.choice(possibilities)
seq_string = "".join(sequence)
Q_string = "".join([chr(int(q)+33) for q in Q])
# Hack to make sure quality score does not start with "@" to avoid breaking
# student parsers...
if Q_string[0] == "@":
out = self.create_sample(q_slope_param,
q_intercept_param,
q_noise_param,
indel_rate)
else:
out = "@{}\n{}\n+\n{}\n".format(int(np.random.random()*20000),seq_string,Q_string)
return out
class NaiveLib(LibGen):
"""
Naive library with a hot "C" in position 3, the rest even.
"""
def _create_model(self):
# ACGT
model = np.ones((6,4),dtype=np.float)
# Make third site "C" hot, but others allowed
model[2,1] = 10000.0
self._model = (model.T/np.sum(model,axis=1)).T
class ExperimentalLib(LibGen):
"""
Experimental library with all kinds of interesting single-site rules.
"""
def _create_model(self):
# ACGT
model = np.ones((6,4),dtype=np.float)
# First site is even probs
# Second site can *only* by T
model[1,0] = 0.0
model[1,1] = 0.0
model[1,2] = 0.0
model[1,3] = 100.0
# Make third site "C" hot, but others allowed
model[2,1] = 10000.0
# Make fourth site "A" non-existance
model[3,0] = 0.0
# Make fourth site neither A nor G
model[4,0] = 0.0
model[4,2] = 0.0
# Make sixth site prefer C > T > A > G
model[5,0] = 2.0
model[5,1] = 10.0
model[5,2] = 1.0
model[5,3] = 5000.0
self._model = (model.T/np.sum(model,axis=1)).T
In [ ]:
f = gzip.open("experimental.fastq.gz","wb")
a = ExperimentalLib()
for i in range(1000380):
f.write(str.encode(a.create_sample()))
f.close()
In [ ]:
f = gzip.open("naive.fastq.gz","wb")
a = NaiveLib()
for i in range(2321080):
f.write(str.encode(a.create_sample()))
f.close()