In [ ]:
#Do all of the imports and setup inline plotting
import time
import numpy as np
from scipy import sparse
from ripser import ripser
import cechmate as cm
from persim import plot_diagrams
from persim import wasserstein, wasserstein_matching
from persim import bottleneck, bottleneck_matching
%matplotlib notebook
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display
import warnings
warnings.filterwarnings('ignore')
Let's first do a simple example of matching the H1 persistence diagram for points sampled from a circle to the H1 diagram from a noisy version of that circle. We'll use the alpha filtration to get our persistence diagrams since we are in such a low dimension.
In [ ]:
# First, sample points from a circle
# First point cloud has no noise
N1 = 300
X1 = np.zeros((N1, 2))
t1 = np.linspace(0, 2*np.pi, N1+1)[0:N1]
X1[:, 0] = np.cos(t1)
X1[:, 1] = np.sin(t1)
# Second point cloud has a lot of noisy points
N2 = 300
t2 = np.linspace(0, 2*np.pi, N2+1)[0:N2]
X2 = np.zeros((N2, 2))
X2[:, 0] = np.cos(t2)
X2[:, 1] = np.sin(t2)
X2 = X2 + 0.25*np.random.randn(N2, 2)
alpha = cm.Alpha()
filtration = alpha.build(X1)
I1 = alpha.diagrams(filtration)[1]
filtration = alpha.build(X2)
I2 = alpha.diagrams(filtration)[1]
# Perform the matchings
tic = time.time()
bdist, (matchidxb, bD) = bottleneck(I1, I2, matching=True)
wdist, (matchidxw, wD) = wasserstein(I1, I2, matching=True)
print("Elapsed Time Matching: %.3g"%(time.time()-tic))
plt.figure(figsize=(12, 4))
plt.subplot(131)
plt.scatter(X2[:, 0], X2[:, 1])
plt.scatter(X1[:, 0], X1[:, 1])
plt.subplot(132)
bottleneck_matching(I1, I2, matchidxb, bD)
plt.title("Bottleneck Dist: %.3g"%bdist)
plt.subplot(133)
wasserstein_matching(I1, I2, matchidxw)
plt.title("Wasserstein Dist: %.3g"%wdist)
plt.show()
We will now explore a different example of a sublevelset filtration or a ''lower star filtration'' of a 1D time series, compared to the lower star filtration of a time series with some noise added.
First, we define a sparse distance matrix which represents the lower star filtration
In [ ]:
def getLowerStarTimeSeriesD(x):
N = x.size
# Add edges between adjacent points in the time series, with the "distance"
# along the edge equal to the max value of the points it connects
I = np.arange(N-1)
J = np.arange(1, N)
V = np.maximum(x[0:-1], x[1::])
# Add vertex birth times along the diagonal of the distance matrix
I = np.concatenate((I, np.arange(N)))
J = np.concatenate((J, np.arange(N)))
V = np.concatenate((V, x))
#Create the sparse distance matrix
D = sparse.coo_matrix((V, (I, J)), shape=(N, N)).tocsr()
return D
Now, we can perfor sublevelset filtrations on a time series by itself and with a small amount of noise added
In [ ]:
NSamples = 2000
t = np.linspace(0, 5, NSamples)
x = np.cos(2*np.pi*t) + t
y = x + 0.2*np.random.randn(NSamples)
Dx = getLowerStarTimeSeriesD(x)
Dy = getLowerStarTimeSeriesD(y)
Ix = ripser(Dx, distance_matrix=True, maxdim=0)['dgms'][0]
Iy = ripser(Dy, distance_matrix=True, maxdim=0)['dgms'][0]
plt.figure(figsize=(8, 4))
plt.subplot(121)
plt.plot(x)
plt.plot(y)
plt.subplot(122)
plot_diagrams([Ix, Iy], labels = ['H0 Original', 'H0 Noisy'])
#Remove point at infinity before bottleneck/wasserstein
Ix = Ix[np.isfinite(Ix[:, 1]), :]
Iy = Iy[np.isfinite(Iy[:, 1]), :]
tic = time.time()
dw = wasserstein(Ix, Iy)
print("Elapsed time Wasserstein: %.3g"%(time.time()-tic))
tic = time.time()
db = bottleneck(Ix, Iy)
print("Elapsed time Bottleneck: %.3g"%(time.time()-tic))
plt.title("Wasserstein = %.3g, Bottleneck=%.3g"%(dw, db))
plt.show()
In [ ]: