This notebook will demonstrate the use of spline transformation in CONCISE.
Core component of spline transformation are B-splines (See Splines, Knots and Penalties by Eilers and and Marx). Here is how they look like and how to compute them with concise.
In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
from concise.utils.splines import BSpline
from concise.preprocessing import EncodeSplines # uses BSpline
In [2]:
def plot_order(degree, ax):
x = np.linspace(0, 1, 100).reshape((-1,1))
y = EncodeSplines(n_bases=10, degree=degree).fit_transform(x).squeeze()
# alternatively: y = BSpline(0, 1, 10, degree).predict(x)
ax.plot(x, y)
ax.set(title="Degree: {d}, order: {o}".format(d=degree, o=degree+1))
In [3]:
fig, ax = plt.subplots(nrows=4)
fig.tight_layout()
for i in range(ax.shape[0]):
plot_order(i, ax[i])
We can see that for degree=0, they are just segmenting the range into equally-sized bins.
Let's see how to fit a simple smooth curve, as usually done with generalized additive models (using say mgcv package in R).
In [4]:
# some data
x = np.linspace(1, 15)
y_true = np.sin(x)
y = y_true + np.random.normal(scale=0.3, size=len(x))
plt.scatter(x,y);
In [5]:
import concise.layers as cl
import concise.regularizers as cr
import keras.layers as kl
from keras.models import Model, Sequential
from keras.optimizers import Adam
In [6]:
# pre-processor
x_spl = EncodeSplines(n_bases=10).fit_transform(x.reshape((-1,1)))
In [7]:
x_spl.shape
Out[7]:
In [10]:
# keras model
m = Sequential([
cl.SplineT(input_shape=x_spl.shape[1:])
])
m.compile(Adam(lr=0.1), "mse")
In [11]:
m.fit(x_spl, y, epochs=10, verbose=0);
In [12]:
plt.scatter(x,y)
plt.plot(x, m.predict(x_spl));
The advantage of penalized B-splines (P-splines) is that we can use a large number of spline knots with smoothness regularization. In concise, this is achieved by adding a regularizer to the SplineT
layer: concise.regularizers.SplineSmoother(diff_order=2, l2_smooth=0.0, l2=0.0)
.
In [13]:
def fit_lines(x, n_bases=10, l2_smooth=0, diff_order=2):
# train the pre-processor
es = EncodeSplines(n_bases=n_bases)
es.fit(x.reshape((-1,1)))
# train the model
x_spl = es.transform(x.reshape((-1,1)))
m = Sequential([
cl.SplineT(kernel_regularizer=cr.SplineSmoother(diff_order=diff_order, # Smoothness regularization
l2_smooth=l2_smooth),
input_shape=x_spl.shape[1:])
])
m.compile(Adam(lr=0.1), "mse")
m.fit(x_spl, y, epochs=150, verbose=0)
# denser grid to predict
x_pred = np.linspace(x.min(), x.max(), num=100)
return x_pred, m.predict(es.transform(x_pred.reshape((-1,1))))
In [14]:
# Generate data
x = np.linspace(1, 15, num=20)
y_true = np.sin(x)
y = y_true + np.random.normal(scale=0.3, size=len(x))
In [15]:
l2s_list = [0, 10]
y_pred_l = [fit_lines(x, n_bases=50, l2_smooth=l2s) for l2s in l2s_list]
In [16]:
plt.scatter(x,y)
for i, (x_pred, y_pred) in enumerate(y_pred_l):
plt.plot(x_pred, y_pred, label=l2s_list[i])
plt.legend(title="l2_smooth");
Let's show a simple use-case of spline transformation in the context of DNA sequence-based models. The core idea is to have, in addition to sequence, a scalar vector along the sequence encoding distances to some genomic landmarks. For simplicity, we will here consider the distance within the sequence.
We will use a simple simulated dataset, where multiple TTAATGA motifs were embeded in the sequence and the response was computed as the sum of the position-dependent motif effects:
$$ f_{pos} (x) = 0.3*\frac{1}{\sqrt{2 \pi 0.01}} \exp{\{- \frac{(x - 0.2)^2}{0.02}\}} + 0.05 \sin(15 x) + 0.8 \;,$$where $x\in[0,1]$ is the position within the sequence.
In [17]:
from scipy.stats import norm
pef = lambda x: 0.3*norm.pdf(x, 0.2, 0.1) + 0.05*np.sin(15*x) + 0.8
In [55]:
positions = np.arange(499)
pos_effect_theoretical = pef(positions/positions.max())
plt.plot(pos_effect_theoretical);
plt.xlabel("Position")
plt.ylabel("Pos. effect");
In [56]:
from concise.preprocessing import encodeDNA
from concise.utils.pwm import PWM
import concise.initializers as ci
from keras.callbacks import EarlyStopping
In [57]:
# Load the data
data_dir = "../data/"
dt = pd.read_csv(data_dir + "/01-fixed_seq_len-1.csv")
motifs = ["TTAATGA"]
dt
Out[57]:
In [58]:
# Create arrays for model training
x_seq = encodeDNA(dt["seq"])
y = dt.y.as_matrix()
seq_length = x_seq.shape[1]
x_seq.shape # (n_samples, seq_length, n_bases)
Out[58]:
In [59]:
# positional track
x_pos = np.repeat(np.arange(seq_length).reshape((1,-1)), y.shape[0], axis=0)
x_pos[:5]
Out[59]:
In [60]:
# default data-format for SplineT positions is: (N_batch, seq_length, channels)
x_pos = x_pos.reshape((-1, seq_length, 1))
x_pos.shape
Out[60]:
In [61]:
# transform positions with the EncodeSplines pre-processor
es = EncodeSplines(n_bases=10)
x_pos_spl = es.fit_transform(x_pos)
# This adds a new axis n_bases to the end
x_pos_spl.shape
Out[61]:
In [62]:
# Compile the training set
train = {"seq": x_seq, "pos": x_pos_spl}, y
In [63]:
# To make the task easier, we will initialize the filters
# of the conv layer to the simulated motif
pwm_list = [PWM.from_consensus(motif) for motif in motifs]
motif_width = 7
print(pwm_list)
pwm_list[0].plotPWM(figsize=(3,0.5));
In [69]:
def model_SplineT(n_bases=10):
"""Create Keras model based on two data inputs: sequence and scalar-vector along the sequence"""
np.random.seed(42)
# DNA input
input_dna = cl.InputDNA(seq_length, name="seq")
# Positions input
input_pos = kl.Input((seq_length, 1, n_bases), name="pos")
# Sequence module
x_seq = cl.ConvDNA(filters=1,
kernel_size=motif_width, ## motif width
activation="relu",
# motif initialization
kernel_initializer=ci.PSSMKernelInitializer(pwm_list),
bias_initializer=ci.PSSMBiasInitializer(pwm_list,
kernel_size=motif_width,
mean_max_scale=0.99),
padding="same",
name="conv_dna",
## mean_max_scale of 1 means that only consensus sequence gets score larger than 0
)(input_dna)
# Positional module
x_pos = cl.SplineT(kernel_regularizer=cr.SplineSmoother(l2_smooth=1e-3),
kernel_initializer="zeros",
name="SplineT")(input_pos)
x_pos = kl.Lambda(lambda x: 1 + x)(x_pos)
# Merge
x = kl.multiply([x_seq, x_pos])
x = cl.GlobalSumPooling1D()(x)
x = kl.Dense(units=1,activation="linear")(x)
model = Model(inputs=[input_dna, input_pos], outputs=x)
# compile the model
model.compile(optimizer=Adam(lr=0.1), loss="mse")
return model
In [70]:
# Get the compiled model
np.random.seed(42)
m = model_SplineT()
In [71]:
# Train
np.random.seed(42)
m.fit(train[0], train[1],
epochs=30, verbose=2,
callbacks=[EarlyStopping(patience=5)],
validation_split=.2);
In [72]:
## visualize the effect
w_pos = m.get_layer("SplineT").get_weights()[0]
x_pos_single = train[0]["pos"][0,:,0,:]
In [73]:
plt.plot(1 + np.dot(x_pos_single, np.ravel(w_pos)))
plt.plot(pos_effect_theoretical);
plt.ylabel('Positional effect')
plt.xlabel('Position')
plt.title("Positional effect")
plt.legend(labels=["Inferred", "Theoretical"]);
In [74]:
def model_SplineWeight1D():
np.random.seed(42)
model = Sequential([
cl.ConvDNA(filters=1,
kernel_size=motif_width, ## motif width
activation="relu",
kernel_initializer=ci.PSSMKernelInitializer(pwm_list),
bias_initializer=ci.PSSMBiasInitializer(pwm_list,kernel_size=motif_width, mean_max_scale=0.99),
padding="same",
name="conv_dna",
seq_length=seq_length
## mean_max_scale of 1 means that only consensus sequence gets score larger than 0
),
cl.SplineWeight1D(n_bases=10, l2_smooth=1e-3, l2=0, name="spline_weight"),
cl.GlobalSumPooling1D(),
kl.Dense(units=1,activation="linear")
])
model.compile(optimizer=Adam(lr=0.1), loss="mse")
return model
In [75]:
# Train
np.random.seed(42)
m = model_SplineWeight1D()
m.fit(train[0]["seq"], train[1],
epochs=30, verbose=2,
callbacks=[EarlyStopping(patience=5)],
validation_split=.2);
In [76]:
# plot
m.get_layer("spline_weight").plot() # layer also has a plotting method
plt.plot(positions, pos_effect_theoretical - 1)
plt.ylabel('Positional effect')
plt.xlabel('Position')
plt.title("Positional effect")
plt.legend(labels=["Inferred", "Theoretical"]);