Inverse transform sampling with spline interpolation

But: construire une distribution à partir d'un modèle empirique ("inverse transform sampling").

Implémentation en Python:

Mon implémentation:

  • Ne pas garder tous les points comme ils le font dans astroml (j'ai plusieurs centaines de millions se samples...)
  • A la place, utiliser np.histogram pour faire un CDF avec un nombre de bins choisi
  • Stocker ce CDF dans un fichier
  • Ecrire une classe pour générer des echantillons: charger et interpoller (avec des splines) le CDF dans init
      interpolate.splrep(y, data)
  • Fonction rvs qui génère un echantillon
      interpolate.splev(y, self._tck)

Test


In [ ]:
with open("astri_inaf_cdf.json", "r") as fd:
    cdf = json.load(fd)

In [ ]:
x_list = cdf['cdf_x']
y_list = cdf['cdf_y']

filtered_x_list = []
filtered_y_list = []

# Dirty hack to stightly improve the spline interpolation at the border of the range
filtered_x_list.append(x_list[1] + x_list[1] - x_list[0])
filtered_y_list.append(0.)

# "Clean" data to have an actual inverse CDF (i.e. lets the CDF be *strictly* increasing)
for i, (xip, xi, yip, yi) in enumerate(zip(x_list[0:-1], x_list[1:], y_list[0:-1], y_list[1:])):
    if yi <= yip:
        print("Error at index {}: cdf({}) = cdf({}) = {}. Removing this point.".format(i, xip, xi, yi))
    else:
        filtered_x_list.append(x_list[i])
        filtered_y_list.append(y_list[i])

# Dirty hack to stightly improve the spline interpolation at the border of the range
filtered_x_list.append(x_list[-1] + x_list[-1] - x_list[-2])
filtered_y_list.append(1.)

In [ ]:
x = np.array(filtered_x_list)
y = np.array(filtered_y_list)

In [ ]:
plt.plot(x, y)
plt.title("CDF")

In [ ]:
plt.plot(y, x)
plt.title(r"$CDF^{-1}$")

In [ ]:
spl = scipy.interpolate.splrep(y, x)

In [ ]:
y2 = np.linspace(0., 1., 200)
x2 = scipy.interpolate.splev(y2, spl)

plt.plot(y2, x2, "-r")
plt.plot(y, x, ":b")

In [ ]:
def rvs(size):
    y = np.random.random(size)
    return scipy.interpolate.splev(y, spl)

In [ ]:
samples = rvs(MAX_NUM_SAMPLES)

n, bins, patches = plt.hist(samples, bins=100,)

cdf_x = np.array(cdf['cdf_x'])
cdf_y = np.array(cdf['cdf_y'])

plt.plot(cdf_x[1:], np.diff(cdf_y) / np.diff(cdf_y).max() * n.max());

Implementation


In [ ]:
%matplotlib inline

import matplotlib
matplotlib.rcParams['figure.figsize'] = (12, 12)

In [ ]:
import numpy as np
import scipy.interpolate
import json

INTERPOLATION_METH = 'spline1'

class EmpiricalDistribution:
    def __init__(self, cdf_json_file_path):
        with open(cdf_json_file_path, "r") as fd:
            cdf = json.load(fd)
            
        # Get the CDF

        self.cdf_x = np.array(cdf['cdf_x'])
        self.cdf_y = np.array(cdf['cdf_y'])

        # "Clean" data to have an actual inverse CDF (i.e. lets the CDF be *strictly* increasing)
        
        filtered_x_list = []
        filtered_y_list = []
        for i, (xip, xi, yip, yi) in enumerate(zip(self.cdf_x[0:-1], self.cdf_x[1:], self.cdf_y[0:-1], self.cdf_y[1:])):
            if yi <= yip:
                print("Error at index {}: cdf({}) = cdf({}) = {}. Removing this point.".format(i, xip, xi, yi))
            else:
                filtered_x_list.append(self.cdf_x[i])
                filtered_y_list.append(self.cdf_y[i])

        filtered_x_array = np.array(filtered_x_list)
        filtered_y_array = np.array(filtered_y_list)
        
        # Interpolate CDF^{-1}

        if INTERPOLATION_METH == 'spline1':
            # Spline interpolation
            self.spl = scipy.interpolate.splrep(filtered_y_array, filtered_x_array,
                                                xb=0., xe=1.,   # The interval to fit
                                                #s=0.,          # A smoothing factor
                                                k=1)            # The degree fo the spline fit
        elif INTERPOLATION_METH == 'linear':
            # Linear interpolation with extrapolation
            self.inv_cdf = scipy.interpolate.interp1d(filtered_y_array, filtered_x_array,
                                                      kind='linear',
                                                      fill_value="extrapolate")
        elif INTERPOLATION_METH == 'slinear':
            # Spline linear interpolation with extrapolation (should be the same than spline1...)
            self.inv_cdf = scipy.interpolate.interp1d(filtered_y_array, filtered_x_array,
                                                      kind='slinear',
                                                      fill_value="extrapolate")
        else:
            raise Exception("Unknown interpolation method", INTERPOLATION_METH)

    def rvs(self, size):
        x = np.random.random(size)
        
        if INTERPOLATION_METH == 'spline1':
            return scipy.interpolate.splev(x, self.spl)
        elif INTERPOLATION_METH in ('linear', 'slinear'):
            return self.inv_cdf(x)
        else:
            raise Exception("Unknown interpolation method", INTERPOLATION_METH)

In [ ]:
dist = EmpiricalDistribution("astri_inaf_cdf.json")
samples = dist.rvs(MAX_NUM_SAMPLES)

n, bins, patches = plt.hist(samples, bins=100,)

cdf_x = dist.cdf_x
cdf_y = dist.cdf_y

plt.plot(cdf_x[1:], np.diff(cdf_y) / np.diff(cdf_y).max() * n.max());

In [ ]:
dist = EmpiricalDistribution("astri_konrad_cdf.json")
samples = dist.rvs(MAX_NUM_SAMPLES)

n, bins, patches = plt.hist(samples, bins=100,)

cdf_x = dist.cdf_x
cdf_y = dist.cdf_y

plt.plot(cdf_x[1:], np.diff(cdf_y) / np.diff(cdf_y).max() * n.max());

In [ ]:
dist = EmpiricalDistribution("flashcam_inaf_cdf.json")
samples = dist.rvs(MAX_NUM_SAMPLES)

n, bins, patches = plt.hist(samples, bins=100,)

cdf_x = dist.cdf_x
cdf_y = dist.cdf_y

plt.plot(cdf_x[1:], np.diff(cdf_y) / np.diff(cdf_y).max() * n.max());

In [ ]:
dist = EmpiricalDistribution("flashcam_grid_prod3b_north_cdf.json")
samples = dist.rvs(MAX_NUM_SAMPLES)

n, bins, patches = plt.hist(samples, bins=100,)

cdf_x = dist.cdf_x
cdf_y = dist.cdf_y

plt.plot(cdf_x[1:], np.diff(cdf_y) / np.diff(cdf_y).max() * n.max());

In [ ]:
dist = EmpiricalDistribution("lstcam_grid_prod3b_north_cdf.json")
samples = dist.rvs(MAX_NUM_SAMPLES)

n, bins, patches = plt.hist(samples, bins=100,)

cdf_x = dist.cdf_x
cdf_y = dist.cdf_y

plt.plot(cdf_x[1:], np.diff(cdf_y) / np.diff(cdf_y).max() * n.max());

In [ ]:
dist = EmpiricalDistribution("nectarcam_grid_prod3b_north_cdf.json")
samples = dist.rvs(MAX_NUM_SAMPLES)

n, bins, patches = plt.hist(samples, bins=100,)

cdf_x = dist.cdf_x
cdf_y = dist.cdf_y

plt.plot(cdf_x[1:], np.diff(cdf_y) / np.diff(cdf_y).max() * n.max());