核方法往简单地说,就是将一个低维的线性不可分的数据映射到一个高维的空间、并期望映射后的数据在高维空间里是线性可分的。我们以异或数据集为例:在二维空间中、异或数据集是线性不可分的;但是通过将其映射到三维空间、我们可以非常简单地让其在三维空间中变得线性可分。比如定义映射:
$$ \phi(x,y)=\left\{ \begin{aligned} &(x,y,1),\ \ xy>0 \\ &(x,y,0),\ \ xy\le0 \end{aligned} \right. $$该映射的效果如下图所示:
可以看到,虽然左图的数据集线性不可分、但显然右图的数据集是线性可分的,这就是核方法工作原理的一个不太严谨但仍然合理的解释
应用核方法的思路如下:
感知机的原始损失函数为
$$ L(D) = \sum_{i=1}^N\left[ -y_i(w\cdot x_i+b)\right]_+ $$为让损失函数中只含样本的内积,考虑令
$$ w = \sum_{i=1}^N\alpha_ix_i $$则
$$ \begin{align} L(D) &= \sum_{i=1}^N\left[ -y_i\left[\left(\sum_{j=1}^N\alpha_jx_j\right)\cdot x_i+b\right]\right]_+ \\ &= \sum_{i=1}^N\left[ -y_i\left(\sum_{j=1}^N\alpha_j(x_i\cdot x_j)+b\right)\right]_+ \end{align} $$在此之上应用核方法是平凡的:设核函数为$K$,只需把所有的$x_i\cdot x_j$换成$K(x_i,x_j)$即可:
$$ L(D) = \sum_{i=1}^N\left[ -y_i\left(\sum_{j=1}^N\alpha_jK(x_i, x_j)+b\right)\right]_+ $$于是优化问题变为
$$ \min_{\alpha}\sum_{i=1}^N\left[ -y_i\left(\sum_{j=1}^N\alpha_jK(x_i, x_j)+b\right)\right]_+ $$易知,我们仍然可以用梯度下降法来求解该问题
In [1]:
import numpy as np
class KP:
def __init__(self):
self._x = None
self._alpha = self._b = self._kernel = None
# 定义多项式核函数
@staticmethod
def _poly(x, y, p=4):
return (x.dot(y.T) + 1) ** p
# 定义 rbf 核函数
@staticmethod
def _rbf(x, y, gamma):
return np.exp(-gamma * np.sum((x[..., None, :] - y) ** 2, axis=2))
def fit(self, x, y, kernel="poly", p=None, gamma=None, lr=0.001, batch_size=128, epoch=10000):
x, y = np.asarray(x, np.float32), np.asarray(y, np.float32)
if kernel == "poly":
p = 4 if p is None else p
self._kernel = lambda x_, y_: self._poly(x_, y_, p)
elif kernel == "rbf":
gamma = 1 / x.shape[1] if gamma is None else gamma
self._kernel = lambda x_, y_: self._rbf(x_, y_, gamma)
else:
raise NotImplementedError("Kernel '{}' has not defined".format(kernel))
self._alpha = np.zeros(len(x))
self._b = 0.
self._x = x
k_mat = self._kernel(x, x)
for _ in range(epoch):
indices = np.random.permutation(len(y))[:batch_size]
k_mat_batch, y_batch = k_mat[indices], y[indices]
err = -y_batch * (k_mat_batch.dot(self._alpha) + self._b)
if np.max(err) < 0:
continue
mask = err >= 0
delta = lr * y_batch[mask]
self._alpha += np.sum(delta[..., None] * k_mat_batch[mask], axis=0)
self._b += np.sum(delta)
def predict(self, x, raw=False):
x = np.atleast_2d(x).astype(np.float32)
k_mat = self._kernel(self._x, x)
y_pred = self._alpha.dot(k_mat) + self._b
if raw:
return y_pred
return np.sign(y_pred).astype(np.float32)
In [2]:
from Util import gen_two_clusters, visualize2d
xc, yc = gen_two_clusters()
kp = KP()
kp.fit(xc, yc, p=1)
print("准确率:{:8.6} %".format((kp.predict(xc) == yc).mean() * 100))
visualize2d(kp, xc, yc, True)
In [3]:
from Util import gen_spiral
xs, ys = gen_spiral()
kp = KP()
kp.fit(xs, ys, p=8)
print("准确率:{:8.6} %".format((kp.predict(xs) == ys).mean() * 100))
visualize2d(kp, xs, ys, True)
In [4]:
class SVM(KP):
def fit(self, x, y, kernel="rbf", p=None, gamma=None, c=1, lr=0.0001, batch_size=128, epoch=10000):
x, y = np.asarray(x, np.float32), np.asarray(y, np.float32)
if kernel == "poly":
p = 4 if p is None else p
self._kernel = lambda x_, y_: self._poly(x_, y_, p)
elif kernel == "rbf":
gamma = 1 / x.shape[1] if gamma is None else gamma
self._kernel = lambda x_, y_: self._rbf(x_, y_, gamma)
else:
raise NotImplementedError("Kernel '{}' has not defined".format(kernel))
self._alpha = np.zeros(len(x))
self._b = 0.
self._x = x
k_mat = self._kernel(x, x)
k_mat_diag = np.diag(k_mat)
for _ in range(epoch):
self._alpha -= lr * (np.sum(self._alpha * k_mat, axis=1) + self._alpha * k_mat_diag) * 0.5
indices = np.random.permutation(len(y))[:batch_size]
k_mat_batch, y_batch = k_mat[indices], y[indices]
err = 1 - y_batch * (k_mat_batch.dot(self._alpha) + self._b)
if np.max(err) <= 0:
continue
mask = err > 0
delta = c * lr * y_batch[mask]
self._alpha += np.sum(delta[..., None] * k_mat_batch[mask], axis=0)
self._b += np.sum(delta)
In [5]:
svm = SVM()
svm.fit(xc, yc)
print("准确率:{:8.6} %".format((svm.predict(xc) == yc).mean() * 100))
visualize2d(svm, xc, yc, True)
In [6]:
svm = SVM()
svm.fit(xs, ys)
print("准确率:{:8.6} %".format((svm.predict(xs) == ys).mean() * 100))
visualize2d(svm, xs, ys, True)
In [7]:
from sklearn.svm import SVC
svm = SVC()
svm.fit(xc, yc)
visualize2d(svm, xc, yc, True)
print("准确率:{:8.6} %".format((svm.predict(xc) == yc).mean() * 100))
svm.fit(xs, ys)
visualize2d(svm, xs, ys, True)
print("准确率:{:8.6} %".format((svm.predict(xs) == ys).mean() * 100))
孰优孰劣的话有点见仁见智的感觉。当然了,这只是两个非常简单的数据集,在真实数据集上,sklearn
的SVM
会比我们实现的SVM
要好很多