In [1]:
!date
In [2]:
import torch
import numpy as np
import math
import random
import torch.nn.functional as F
import matplotlib.pyplot as plt
from sklearn import gaussian_process
from sklearn.gaussian_process.kernels import Matern, WhiteKernel, ConstantKernel, RBF
from sklearn.utils import check_random_state
In [3]:
def add_prior_beam_energy(prior):
g = random_gaussian(mu=[30, 60], sigma=1.0)
add_prior(prior, g['mu'], g['sigma'])
def add_prior_fermi_constant(prior):
g = random_gaussian(mu=[0, 2], sigma=1.0)
add_prior(prior, g['mu'], g['sigma'])
def add_prior(prior, mu, sigma):
prior['mu'].append(mu)
prior['sigma'].append(sigma)
def random_gaussian(mu=[-1, 1], sigma=5.0):
return {'mu': np.random.uniform(mu[0], mu[1]),
'sigma': np.random.uniform(0.0, sigma)}
def draw_gaussian(d, num_samples, random_state=None):
num_parameters = len(d['mu'])
thetas = torch.zeros((num_samples, num_parameters))
mu = d['mu']
sigma = d['sigma'].exp()
for i in range(0, num_samples):
gaussian = torch.normal(mu, sigma)
thetas[i, :] = gaussian
return thetas
def real_experiment(theta, n_samples=100000):
return simulator(theta, n_samples)
def simulator(theta, n_samples, random_state=None):
rng = check_random_state(random_state)
samples = simulator_rej_sample_costheta(n_samples, theta, rng)
return torch.from_numpy(samples.reshape(-1, 1)).float()
def simulator_rej_sample_costheta(n_samples, theta, rng):
sqrtshalf = theta[0]
gf = theta[1]
ntrials = 0
samples = []
x = torch.linspace(-1, 1, steps=1000)
maxval = torch.max(simulator_diffxsec(x, sqrtshalf, gf))
while len(samples) < n_samples:
ntrials = ntrials + 1
xprop = rng.uniform(-1, 1)
ycut = rng.rand()
yprop = (simulator_diffxsec(xprop, sqrtshalf, gf) / maxval)[0]
if (yprop / maxval) < ycut:
continue
samples.append(xprop)
return np.array(samples)
def simulator_diffxsec(costheta, sqrtshalf, gf):
norm = 2. * (1. + 1. / 3.)
return ((1 + costheta ** 2) + simulator_a_fb(sqrtshalf, gf) * costheta) / norm
def simulator_a_fb(sqrtshalf, gf):
mz = 90
gf_nom = 0.9
sqrts = sqrtshalf * 2.
x = torch.FloatTensor([(sqrts - mz) / mz * 10])
a_fb_en = torch.tanh(x)
a_fb_gf = gf / gf_nom
return 2 * a_fb_en * a_fb_gf
def plot_observations(X_observed, theta, normed=True):
plt.grid(True)
plt.hist(X_observed, histtype="bar", range=(-1, 1), bins=100, normed=normed)
plt.xlim([-1, 1])
if normed:
plt.ylim([0, 2])
plt.ylabel("Normalized Number of Events")
else:
plt.ylim([0, 3000])
plt.ylabel("Number of Events")
plt.title(r"Distribution of $\cos(A)$ in $e^-e^+ \rightarrow \mu^-\mu^+$ events." + "\n"
r"$E_{beam}}$ = " + str(theta[0]) + "GeV - " + r"$G_f$ = " + str(theta[1]))
plt.xlabel(r"$\cos(A)$")
plt.show()
def sample_real_data(p_r, batch_size=256):
samples = torch.zeros((batch_size, 1))
num_samples_p_r = len(p_r)
for index in range(0, batch_size):
random_index = random.randint(0, num_samples_p_r - 1)
samples[index, :] = p_r[random_index]
return torch.autograd.Variable(samples, requires_grad=True)
def sample_generated_data(proposal, batch_size=256):
# Sample `batch_size` thetas according to our proposal distribution.
thetas = draw_gaussian(proposal, batch_size)
# Obtain the individual Gaussians.
theta_beam_energy = thetas[:, 0]
theta_fermi_constant = thetas[:, 1]
# Sample according to the proposal distribution.
samples = torch.zeros((batch_size, 1))
for sample_index, theta in enumerate(thetas):
samples[sample_index, :] = simulator(theta, 1)
return torch.autograd.Variable(samples, requires_grad=True)
def compute_gradient_penalty(critic, real, fake, l=5.0):
# Compute x_hat and its output.
epsilon = torch.rand(real.size())
x_hat = epsilon * real + ((1. - epsilon) * fake)
x_hat = torch.autograd.Variable(x_hat, requires_grad=True)
y_hat = critic(x_hat)
# Compute the associated gradients.
gradients = torch.autograd.grad(outputs=y_hat, inputs=x_hat,
grad_outputs=torch.ones(y_hat.size()),
create_graph=True, retain_graph=True, only_inputs=True)[0]
# Prevent norm 0 causing NaN.
gradients = gradients + 1e-16
# Compute the gradient penalty.
gradient_penalty = l * ((gradients.norm(2, dim=1) - 1.) ** 2)
return gradient_penalty
class Critic(torch.nn.Module):
def __init__(self, num_hidden):
super(Critic, self).__init__()
self.fc_1 = torch.nn.Linear(1, num_hidden)
self.fc_2 = torch.nn.Linear(num_hidden, num_hidden)
self.fc_3 = torch.nn.Linear(num_hidden, 1)
def forward(self, x):
x = F.relu(self.fc_1(x))
x = F.relu(self.fc_2(x))
x = (self.fc_3(x))
return x
class CriticWithSigmoid(torch.nn.Module):
def __init__(self, num_hidden):
super(CriticWithSigmoid, self).__init__()
self.fc_1 = torch.nn.Linear(1, num_hidden)
self.fc_2 = torch.nn.Linear(num_hidden, num_hidden)
self.fc_3 = torch.nn.Linear(num_hidden, 1)
def forward(self, x):
x = F.relu(self.fc_1(x))
x = F.relu(self.fc_2(x))
x = F.sigmoid(self.fc_3(x))
return x
class CriticWithTanh(torch.nn.Module):
def __init__(self, num_hidden):
super(CriticWithTanh, self).__init__()
self.fc_1 = torch.nn.Linear(1, num_hidden)
self.fc_2 = torch.nn.Linear(num_hidden, num_hidden)
self.fc_3 = torch.nn.Linear(num_hidden, 1)
def forward(self, x):
x = F.relu(self.fc_1(x))
x = F.relu(self.fc_2(x))
x = F.tanh(self.fc_3(x))
return x
class CriticWithRelu(torch.nn.Module):
def __init__(self, num_hidden):
super(CriticWithRelu, self).__init__()
self.fc_1 = torch.nn.Linear(1, num_hidden)
self.fc_2 = torch.nn.Linear(num_hidden, num_hidden)
self.fc_3 = torch.nn.Linear(num_hidden, 1)
def forward(self, x):
x = F.relu(self.fc_1(x))
x = F.relu(self.fc_2(x))
x = F.tanh(self.fc_3(x))
return x
In [4]:
# Define the true parameterization of theta.
theta_true = [45., .9]
# This parameterization will produce the following distribution in the real experiment.
real_data = real_experiment(theta_true).view(-1)
plot_observations(real_data.numpy(), theta_true)
# Display the total number of observations available.
num_real_samples = len(real_data)
print("Total number of true experimental observations: " + str(num_real_samples))
The main idea of the method lies in the fact that the desired parameterizations of your theory are the ones that maximize the loss after optimal training of the critic. To gain some additional intuition, let us consider that the beam energy $E_{beam}$ is the only parameter that can be tuned. How does the error landscape look like when we only vary this parameter?
Note: High training cost.
In [5]:
# Initialize the proposal distribution.
proposal = {
'mu': torch.FloatTensor([40., .9]), # Mean of the proposal distribution.
'sigma': torch.FloatTensor([.1, .1]) # Uncertainty of the proposal distribution.
}
In [6]:
loss_f = torch.nn.MSELoss(size_average=False)
# TODO BALANCE BOTH = GOAL -> FIX LOSS METHOD
def fit_critic(proposal, p_r, critic, optimizer, num_critic_iterations=100, batch_size=256):
ones = torch.autograd.Variable(torch.ones(batch_size, 1))
zeros = torch.autograd.Variable(torch.zeros(batch_size, 1))
# Fit the critic optimally.
for iteration in range(0, num_critic_iterations):
# Fetch the data batches.
x_r = sample_real_data(p_r, batch_size)
# Fetch the simulated data batch.
x_g = sample_generated_data(proposal, batch_size)
# Reset the gradients.
critic.zero_grad()
# Forward pass with real data.
y_r = critic(x_r)
loss_real = loss_f(y_r, zeros)
# Forward pass with generated data.
y_g = critic(x_g)
loss_generated = loss_f(y_g, ones)
# Compute the loss, and the accompanying gradients.
loss = (1 + (loss_generated + loss_real) ** 2).log()
loss.backward()
optimizer.step()
return loss.mean().data.numpy()[0]
In [7]:
# Initialize the interpolation parameters.
p_start = 40.0
p_end = 50.0
p_interpolation_step = 1
batch_size = 1024
# Initialize the proposal distribution.
proposal['mu'][0] = p_start
# Array to keep track of the losses over different parameterizations.
X = []
Y = []
# Compute the error space.
while proposal['mu'][0] <= p_end:
# Check if we need to skip the current step.
if proposal['mu'][0] == 45.:
proposal['mu'][0] += p_interpolation_step
continue
# Initialize a new critic (don't bias critic under different parameterizations).
critic = Critic(100)
optimizer = torch.optim.Adam(critic.parameters(), lr=0.01)
# Train the critic optimally.
loss = fit_critic(proposal, real_data, critic, optimizer, 100, batch_size)
X.append(proposal['mu'][0])
Y.append(loss)
print("Loss for beam energy " + str(proposal['mu'][0]) + ": " + str(loss))
# Set the new parameterization of the proposal distribution.
proposal['mu'][0] += p_interpolation_step
In order to find the optimal parameterization for the proposal distribution, we infer the error space using a Gaussian Process, where the input variables are the parameters of interest.
In [8]:
X_ = np.array(X)[:, np.newaxis]
Y_ = np.array(Y)[:, np.newaxis]
In [9]:
#kernel = ConstantKernel() + Matern(length_scale=2, nu=1/2) + WhiteKernel(noise_level=1)
kernel = 1. * RBF(length_scale=.5, length_scale_bounds=(1e-2, 1e3)) + WhiteKernel(noise_level=1e-5, noise_level_bounds=(1e-10, 1e+1))
gp = gaussian_process.GaussianProcessRegressor(kernel=kernel)
gp.fit(X_, Y_)
x = np.linspace(40, 50).reshape(-1,1)
y, s = gp.predict(x, return_std=True)
x = x.flatten()
y = y.flatten()
s = s.flatten()
y_min = y - s
y_max = y + s
plt.title("Estimated Loss over Beam Energy")
plt.plot(x, y)
plt.scatter(X, Y)
plt.fill_between(x, y_min, y_max, color='lightblue', alpha='0.5')
plt.xlim([40, 50])
plt.grid(True)
plt.show()
In [10]:
def find_max(gp, resolution=10000):
x = np.linspace(40, 50, resolution).reshape(-1,1)
y, s = gp.predict(x, return_std=True)
max = float("-inf")
current_max_element = None
for index in range(0, len(x)):
if y[index] > max:
max = y[index]
current_max_element = x[index]
return current_max_element[0]
print("Best estimated fit for Beam Energy: " + str(find_max(gp)))
Sample new point.
In [11]:
critic = Critic(100)
optimizer = torch.optim.Adam(critic.parameters(), lr=0.01)
proposal['mu'][0] = 45.1020408163
loss = fit_critic(proposal, real_data, critic, optimizer, 100, batch_size)
print("Loss at " + str(45.1020408163) + " GeV: " + str(loss))
# Append new data for GP esimation.
X.append(45.1020408163)
Y.append(loss)
In [12]:
# Apply new fitting.
X_ = np.array(X)[:, np.newaxis]
Y_ = np.array(Y)[:, np.newaxis]
gp = gaussian_process.GaussianProcessRegressor(kernel=kernel)
gp.fit(X_, Y_)
x = np.linspace(40, 50).reshape(-1,1)
y, s = gp.predict(x, return_std=True)
x = x.flatten()
y = y.flatten()
s = s.flatten()
y_min = y - s
y_max = y + s
plt.title("Estimated Loss over Beam Energy")
plt.plot(x, y)
plt.scatter(X, Y)
plt.fill_between(x, y_min, y_max, color='lightblue', alpha='0.5')
plt.xlim([40, 50])
plt.grid(True)
plt.show()
print("New maximum at: " + str(find_max(gp)) + " GeV")
In [13]:
def evaluate(theta, batch_size=1024, iterations=100):
# Initialize a proposal distribution.
proposal = {}
proposal['mu'] = torch.FloatTensor(theta)
proposal['sigma'] = torch.FloatTensor([.1, .1])
# Initialize a new critic to now bias the computation.
critic = Critic(100)
optimizer = torch.optim.Adam(critic.parameters(), lr=0.01)
# Train the critic optimally.
loss = fit_critic(proposal, real_data, critic, optimizer, iterations, batch_size)
return theta, loss
First, we need to obtain a seed of the parameter space. Initially, this means we evaluate a grid.
In [76]:
X = []
Y = []
# Set the search space.
min_beam_energy = 40.
max_beam_energy = 50.
min_fermi_constant = .5
max_fermi_constant = 1.5
# Set the number of sampling bounds.
num_samples_beam_energy = 4
num_samples_fermi_constant = 2
# Display the number of initial seeds.
print("Number of initial seed-points: " + str(num_samples_beam_energy * num_samples_fermi_constant))
# Obtain the individual samples.
samples_beam_energy = np.linspace(min_beam_energy, max_beam_energy, num_samples_beam_energy)
samples_fermi_constant = np.linspace(min_fermi_constant, max_fermi_constant, num_samples_fermi_constant)
# Compute the seed.
for beam_energy in samples_beam_energy:
for fermi_constant in samples_fermi_constant:
theta, loss = evaluate([beam_energy, fermi_constant])
print("Loss for " + str(theta) + ": " + str(loss))
X.append(theta)
Y.append(loss)
Fit the multi-dimensional Gaussian Process.
In [77]:
# Apply new fitting.
X_ = np.array(X)
Y_ = np.array(Y)
kernel = 1. * RBF(length_scale=.5, length_scale_bounds=(1e-2, 1e3)) + WhiteKernel(noise_level=1e-5, noise_level_bounds=(1e-10, 1e+1))
gp = gaussian_process.GaussianProcessRegressor(kernel=kernel)
gp.fit(np.array(X), np.array(Y))
Out[77]:
In [86]:
resolution = 100
x = np.linspace(min_beam_energy, max_beam_energy, resolution)
y = np.linspace(min_fermi_constant, max_fermi_constant, resolution)
z = np.zeros((len(x), len(y)))
for i in range(0, len(x)):
for j in range(0, len(y)):
input = np.array([x[i], y[j]]).reshape(1, -1)
z[i][j] = gp.predict(input)
extent = [min_fermi_constant, max_fermi_constant, \
min_beam_energy, max_beam_energy]
plt.clf()
#plt.imshow(z, extent=extent, origin='lower')
plt.imshow(z, origin='lower')
plt.colorbar()
plt.show()
In [82]:
max = float('-inf')
max_theta = None
for i in range(0, len(x)):
for j in range(0, len(y)):
theta = [x[i], y[j]]
if z[i][j] > max:
max_theta = theta
max = z[i][j]
print("Maximized theta: " + str(max_theta))
# Plot the generated distribution.
data = real_experiment(max_theta).view(-1)
plot_observations(data.numpy(), max_theta)
# Plot the real distribution.
plot_observations(real_data.numpy(), theta_true)
In [89]:
theta, loss = evaluate(max_theta)
loss
Out[89]:
In [90]:
X.append(theta)
Y.append(loss)
gp.fit(np.array(X), np.array(Y))
Out[90]:
In [91]:
resolution = 100
x = np.linspace(min_beam_energy, max_beam_energy, resolution)
y = np.linspace(min_fermi_constant, max_fermi_constant, resolution)
z = np.zeros((len(x), len(y)))
for i in range(0, len(x)):
for j in range(0, len(y)):
input = np.array([x[i], y[j]]).reshape(1, -1)
z[i][j] = gp.predict(input)
extent = [min_fermi_constant, max_fermi_constant, \
min_beam_energy, max_beam_energy]
plt.clf()
#plt.imshow(z, extent=extent, origin='lower')
plt.imshow(z, origin='lower')
plt.colorbar()
plt.show()
In [92]:
max = float('-inf')
max_theta = None
for i in range(0, len(x)):
for j in range(0, len(y)):
theta = [x[i], y[j]]
if z[i][j] > max:
max_theta = theta
max = z[i][j]
print("Maximized theta: " + str(max_theta))
# Plot the generated distribution.
data = real_experiment(max_theta).view(-1)
plot_observations(data.numpy(), max_theta)
# Plot the real distribution.
plot_observations(real_data.numpy(), theta_true)