Ali Taylan Cemgil, 2018
Bogazici University
This notebook contains a self contained python implementation of the junction tree algorithm for exact inference in Bayesian networks with discrete probability tables. The aim is pedagogical, to provide a compact implementation so that the algorithms can be run in a notebook.
A Bayesian Network is a probability model that has the form $ \newcommand{\pa}[1]{ {\mathop{pa}({#1})}} $ $$ p(x_{1:N}) = \prod_{n=1}^N p(x_n | x_\pa{n}) $$ The distribution is said to respect a directed acyclic graph (DAG) $\mathcal{G} = (V_\mathcal{G}, E_\mathcal{G})$ with $N$ nodes, where each random variable $x_n$ is associated with the node $n$ of $\mathcal{G}$.
For each node $n$, the set of (possibly empty) parents nodes is denoted by $\pa{n}$.
We assume that for each node $j$ in $\pa{n}$, we have a directed edge $j\rightarrow n$, denoted as the pair $(j,n)$. The edge set of $\mathcal{G}$ is the union of all such edges $$ E_\mathcal{G} = \bigcup_{n \in [N]} \bigcup_{j \in \pa{n}} \{(j,n)\} $$
To be associated with a proper probability model, the graph $G$ must be acyclic, that is it must be free of any directed cycles.
In BJN, the elements of a Bayesian Network model are:
alphabet:
The alphabet is a set of strings for each random variable $x_n$ for $n \in [N]$. For each random variable $x_n$ that we include in our model (an observable or a hidden) we have a corresponding unique identifier in the alphabet. To enforce a user specified ordering, the alphabet is represented as a list index_names where index_names[n] is the string identifier for $x_n$.
parents:
parents is a catalog that defines the edges of the directed acyclic graph $\mathcal{G}$. For each random variable $x_n$, we specify the set of parents $\pa{n}$, the variables that directly effect $x_n$. Here, each element of the alphabet is a key, and parents[index_names[n]] = $\pa{n}$ is the parent list.
states: states is a catalog, where each key is a name from the alphabet and the value is a list of strings that specify a human readable string with each state of the random variable.
In a discrete model each random variable $x_n$ has $I_n$ possible states. We derive a list cardinalities from states that is ordered according to the order specified by index_names where cardinalities[n] = $I_n$. The list cardinalities and the dictionary states must be consistent: cardinalities[n] = len(states[index_names[n]])
theta: A catalog or list of conditional probability tables, indexed by $n =0 \dots N-1$ . Each table theta[n] is an array that has $N$ dimensions. The dimensions that are in $\{n\} \cup \pa{n}$ have a cardinality $I_n$. The remaining dimensions have a cardinality of one, given by cardinality[n].
In [1]:
import numpy as np
import scipy as sc
from scipy.special import gammaln
from scipy.special import digamma
%matplotlib inline
from itertools import combinations
import pygraphviz as pgv
from IPython.display import Image
from IPython.display import display
def normalize(A, axis=None):
"""Normalize a probability table along a specified axis"""
Z = np.sum(A, axis=axis,keepdims=True)
idx = np.where(Z == 0)
Z[idx] = 1
return A/Z
def find(cond):
"""
finds indices where the given condition is satisfied.
"""
return list(np.where(cond)[0])
In [2]:
def random_alphabet(N=20, first_letter='A'):
"""Generates unique strings to be used as index_names"""
if N<27:
alphabet = [chr(i+ord(first_letter)) for i in range(N)]
else:
alphabet = ['X'+str(i) for i in range(N)]
return alphabet
def random_parents(alphabet, max_indeg=3):
"""Random DAG generation"""
N = len(alphabet)
print(alphabet)
indeg = lambda: np.random.choice(range(1,max_indeg+1))
parents = {a:[b for b in np.random.choice(alphabet[0:(1 if i==0 else i)], replace=False, size=min(indeg(),i))] for i,a in enumerate(alphabet)}
return parents
def random_cardinalities(alphabet, cardinality_choices=[2,3,4,5]):
"""Random cardinalities"""
return [np.random.choice(cardinality_choices) for a in alphabet]
def states_from_cardinalities(alphabet, cardinalities):
"""Generate generic labels for each state"""
return {a:[a+"_state_"+str(u) for u in range(cardinalities[i])] for i,a in enumerate(alphabet)}
def cardinalities_from_states(alphabet, states):
"""Count each cardinality according to the order implied by the alphabet list"""
return [len(states[a]) for a in alphabet]
def random_observations(cardinalities, visibles):
"""
Samples a tensor of the shape of visibles. This function does not sample
from the joint distribution implied by the graph and the probability tables
"""
return np.random.choice(range(10), size=clique_shape(cardinalities, visibles))
def random_dirichlet_cp_table(gamma, cardinalities, n, pa_n):
'''
gamma : Dirichlet shape parameter
cardinalities : List of number of states of each variable
n, pa_n : Output a table of form p(n | pa_n ), n is an index, pa_n is the list of parents of n
'''
N = len(cardinalities)
cl_shape = clique_shape(cardinalities, [n]+pa_n)
U = clique_prior_marginal(cardinalities, cl_shape)
return normalize(np.random.gamma(shape=gamma*U, size=cl_shape), axis=n)
def random_cp_tables(index_names, cardinalities, parents, gamma):
"""
Samples a set of conditional probability tables consistent with the factorization
implied by the graph.
"""
N = len(index_names)
theta = [[]]*N
for n,a in enumerate(index_names):
theta[n] = random_dirichlet_cp_table(gamma, cardinalities, n, index_names_to_num(index_names, parents[a]))
#print(a, parents[a])
#print(theta[n].shape)
#print('--')
return theta
def random_model(N=10, max_indeg=4):
"""
Generates a random Bayesian Network
"""
index_names = random_alphabet(N)
parents = random_parents(index_names)
cardinalities = random_cardinalities(index_names)
states = states_from_cardinalities(index_names, cardinalities)
return index_names, parents, cardinalities, states
In [3]:
def clique_shape(cardinalities, family):
N = len(cardinalities)
size = [1]*N
for i in family:
size[i] = cardinalities[i]
return size
def clique_prior_marginal(cardinalities, shape):
U = 1
for a1,a2 in zip(shape, cardinalities):
U = U*a2/a1
return U
def index_names_to_num(index_names, names):
name2idx = {name: i for i,name in enumerate(index_names)}
return [name2idx[nm] for nm in names]
def show_dag_image(index_names, parents, imstr='_BJN_tempfile.png'):
name2idx = {name: i for i,name in enumerate(index_names)}
A = pgv.AGraph(directed=True)
for i_n in index_names:
A.add_node(name2idx[i_n], label=i_n)
for j_n in parents[i_n]:
A.add_edge(name2idx[j_n], name2idx[i_n])
A.layout(prog='dot')
A.draw(imstr)
display(Image(imstr))
return
def show_ug_image(UG, imstr='_BJN_tempfile.png'):
A = pgv.AGraph(directed=False)
for i_n in range(UG.shape[0]):
A.add_node(i_n, label=i_n)
for j_n in find(UG[i_n,:]):
if j_n>i_n:
A.add_edge(j_n, i_n)
A.layout(prog='dot')
A.draw(imstr)
display(Image(imstr))
return
def make_cp_tables(index_names, cardinalities, cp_tables):
N = len(index_names)
theta = [[]]*N
for c in cp_tables:
if not isinstance(c, tuple):
nums = index_names_to_num(index_names, (c,))
else:
nums = index_names_to_num(index_names, c)
#print(nums)
n = nums[0]
idx = list(reversed(nums))
theta[n] = np.einsum(np.array(cp_tables[c]), idx, sorted(idx)).reshape(clique_shape(cardinalities,idx))
return theta
def make_adjacency_matrix(index_names, parents):
nVertex = len(index_names)
name2idx = {name: i for i,name in enumerate(index_names)}
## Build Graph data structures
# Adjacency matrix
adj = np.zeros((nVertex, nVertex), dtype=int)
for i_name in parents.keys():
i = name2idx[i_name]
for m_name in parents[i_name]:
j = name2idx[m_name]
adj[i, j] = 1
return adj
def make_families(index_names, parents):
nVertex = len(index_names)
adj = make_adjacency_matrix(index_names, parents)
# Possibly check topological ordering
# toposort(adj)
# Family, Parents and Children
fa = [[]]*nVertex
#pa = [[]]*nVertex
#ch = [[]]*nVertex
for n in range(nVertex):
p = find(adj[n,:])
#pa[n] = p
fa[n] = [n]+p
#c = find(adj[:,n])
#ch[n] = c
return fa
def permute_table(index_names, cardinalities, visible_names, X):
'''
Given a network with index_names and cardinalities, reshape a table X with
the given order as in visible_names so that it fits the storage convention of BNJNB.
'''
nums = index_names_to_num(index_names, visible_names)
osize = [cardinalities[n] for n in nums]
idx = list(nums)
shape = clique_shape(cardinalities,idx)
return np.einsum(X, idx, sorted(idx)).reshape(shape)
def make_cliques(families, cardinalities, visibles=None, show_graph=False):
'''
Builds the set of cliques of a triangulated graph.
'''
N = len(families)
if visibles:
C = families+[visibles]
else:
C = families
# Moral Graph
MG = np.zeros((N, N))
for F in C:
for edge in combinations(F,2):
MG[edge[0], edge[1]] = 1
MG[edge[1], edge[0]] = 1
# if show_graph:
# show_ug_image(MG,imstr='MG.png')
elim = []
Clique = []
visited = [False]*N
# Find an elimination sequence
# Based on greedy search
# Criteria, select the minimum induced clique size
for j in range(N):
min_clique_size = np.inf
min_idx = -1
for i in range(N):
if not visited[i]:
neigh = find(MG[i,:])
nm = np.prod(clique_shape(cardinalities, neigh+[i]))
if min_clique_size > nm:
min_idx = i
min_clique_size = nm
neigh = find(MG[min_idx,:])
temp = set(neigh+[min_idx])
is_subset = False
for CC in Clique:
if temp.issubset(CC):
is_subset=True
if not is_subset:
Clique.append(temp)
# Remove the node from the moral graph
for edge in combinations(neigh,2):
MG[edge[0], edge[1]] = 1
MG[edge[1], edge[0]] = 1
MG[min_idx,:] = 0
MG[:, min_idx] = 0
elim.append(min_idx)
visited[min_idx] = True
# if show_graph:
# show_ug_image(MG,imstr='MG'+str(j)+'.png')
return Clique, elim
def topological_order(index_names, parents):
"""
returns a topological ordering of the graph
"""
adj = make_adjacency_matrix(index_names, parents)
nVertex = len(index_names)
indeg = np.sum(adj, axis = 1)
zero_in = find(indeg==0)
topo_order = []
while zero_in:
n = zero_in.pop(0)
topo_order.append(n)
for j in find(adj[:,n]):
indeg[j] -= 1
if indeg[j] == 0:
zero_in.append(j)
if len(topo_order)<nVertex:
return []
else:
return topo_order
In [4]:
def mst(E, N):
"""
Generate a Spanning Tree of a graph with N nodes by Kruskal's algorithm,
given preordered edge set E with each edge as (weight, v1, v2)
For a minimum spanning tree, use
E.sort()
mst(E, N)
For a maximum spanning tree, use
E.sort(reverse=True)
mst(E, N)
"""
parent = list(range(N))
spanning_tree = {i:[] for i in range(N)}
def find_v(vertex):
v = vertex
while parent[v] != v:
v = parent[v]
return v
def union(v1, v2):
root1 = find_v(v1)
root2 = find_v(v2)
if root1 != root2:
parent[root2] = root1
for edge in E:
weight, v1, v2 = edge
p1, p2 = find_v(v1), find_v(v2)
if p1 != p2:
union(p1, p2)
spanning_tree[v1].append(v2)
spanning_tree[v2].append(v1)
return spanning_tree
def bfs(adj_list, root):
"""
Breadth-first search starting from the root
adj_list : A list of lists where adj_list[n] denotes the set of nodes that can be reached from node n
Returns a BFS order, and a BFS tree as an array parent[i]
The root node has parent[rootnode] = -1
"""
N = len(adj_list)
visited = [False]*N
parent = [-1]*N
queue = [root]
order = []
while queue:
v = queue.pop(0)
if not visited[v]:
visited[v] = True
for w in adj_list[v]:
if not visited[w]:
parent[w] = v
queue.append(w)
order.append(v)
return order, parent
In [5]:
def is_leaf(i, parent):
return not (i in parent)
def is_root(i, parent):
return parent[i] == -1
def make_list_receive_from(parent):
lst = [[] for i in range(len(parent)) ]
for i,p in enumerate(parent):
if p!= -1:
lst[p].append(i)
return lst
In [6]:
def sample_indices(index_names, parents, cardinalities, theta, num_of_samples=1):
'''
Sample directly the indices given a Bayesian network
'''
N = len(index_names)
order = topological_order(index_names, parents)
X = []
for count in range(num_of_samples):
x = [[]]*N
for n in order:
varname = index_names[n]
idx = index_names_to_num(index_names,parents[varname])
j = [0]*N
for i in idx:
j[i] = x[i]
I_n = cardinalities[n]
j[n] = tuple(range(I_n))
#print(j)
#print(theta[n][j])
x[n] = np.random.choice(I_n, p=theta[n][j].flatten())
#print(x)
X.append(x)
return X
def sample_states(var_names, states, index_names, parents, theta, num_of_samples=1):
"""
Returns a dict with keys as state_name tuples and values as counts.
This function generates each sample separately, so if
num_of_samples is large, consider using sample_counts
"""
N = len(index_names)
order = topological_order(index_names, parents)
X = dict()
nums = index_names_to_num(index_names,var_names)
cardinalities = cardinalities_from_states(index_names, states)
shape = clique_shape(cardinalities, nums)
for count in range(num_of_samples):
x = [[]]*N
for n in order:
varname = index_names[n]
idx = index_names_to_num(index_names,parents[varname])
j = [0]*N
for i in idx:
j[i] = x[i]
I_n = cardinalities[n]
j[n] = tuple(range(I_n))
#print(j)
#print(theta[n][j])
x[n] = np.random.choice(I_n, p=theta[n][j].flatten())
#print(x)
key = tuple((states[index_names[n]][x[n]] for n in nums))
X[key] = X.get(key, 0) + 1
return X
def counts_to_table(var_names, ev_counts, index_names, states):
"""
Given observed variables names as var_names and
observations as key-value pairs {state_configuration: count}
create a table of counts.
A state configuration is a tuple (state_name_0, ..., state_name_{K-1})
where K is the lenght of var_names, and state_name_k is a state
from states[var_names[k]]
"""
var_nums = list(index_names_to_num(index_names, var_names))
cardinalities = cardinalities_from_states(index_names, states)
shape = clique_shape(cardinalities, var_nums)
C = np.zeros(shape=shape)
N = len(index_names)
for rec in ev_counts.keys():
conf = [0]*N
for key, val in zip(var_names, rec):
s = states[key].index(val)
n = index_names_to_num(index_names, [key])[0]
conf[n] = s
#print(conf)
# Final value is the count that the pair is observed
C[tuple(conf)] += ev_counts[rec]
return C
def table_to_counts(T, var_names, index_names, states, clamped=[], threshold = 0):
"""
Convert a table on index_names clamped on setdiff(index_names, var_names)
to a dict of counts. Keys are state configurations.
"""
var_nums = list(index_names_to_num(index_names, var_names))
M = len(index_names)
ev_count = {}
for u in zip(*(T>threshold).nonzero()):
if not clamped:
key = tuple((states[v][u[var_nums[i]]] for i,v in enumerate(var_names)))
else:
key = tuple((states[v][u[var_nums[i]]] if clamped[var_nums[i]] is None else states[v][clamped[var_nums[i]]] for i,v in enumerate(var_names)))
ev_count[key] = T[u]
return ev_count
In [7]:
def clamped_pot(X, ev_states):
"""
Returns a subslice of a table. Used for clamping conditional probability tables
to a given set of evidence.
X: table
ev_states: list of clamped states ev_states[i]==e (use None if not clamped)
"""
# var is clamped, var not clamped
# ev_states[i]==e ev_states[i]==None
# var is member idx[i] = e idx[i] = slice(0, X.shape[i])
# var not member idx[i] = None idx[i] = slice(0, X.shape[i])
card = list(X.shape)
N = len(card)
idx = [[]]*N
for i,e in enumerate(ev_states):
if e is None and X.shape[i]>1: # the variable is unclamped or it is not a member of the potential
idx[i] = slice(0, X.shape[i])
else:
if X.shape[i]==1:
idx[i] = 0
else:
idx[i] = e
card[i] = 1
return X[tuple(idx)].reshape(card)
In [8]:
sz = (2,4,3,1,5)
T = np.random.choice([1,2,3,4], size=sz)
print(T.shape)
#print(T[1,:,:,0,0].shape)
print('*')
print(clamped_pot(T, [1, None, None, 7, 0]).shape)
In [9]:
def multiply(theta, idx):
"""Multiply a subset of a given list of potentials"""
par = [f(n) for n in idx for f in (lambda n: theta[n], lambda n: range(len(theta)))]+[range(len(theta))]
return np.einsum(*par)
def condition_and_multiply(theta, idx, ev_states):
"""Multiply a subset of a given list of potentials"""
par = [f(n) for n in idx for f in (lambda n: clamped_pot(theta[n], ev_states), lambda n: range(len(theta)))]+[range(len(theta))]
return np.einsum(*par)
def marginalize(Cp, idx, cardinalities):
return np.einsum(Cp, range(len(cardinalities)), [int(s) for s in sorted(idx)]).reshape(clique_shape(cardinalities,idx))
class Engine():
def __init__(self, index_names, parents, states, theta, visible_names=[]):
self.states = states
cardinalities = [len(states[a]) for a in index_names]
families = make_families(index_names, parents)
self.cardinalities = cardinalities
self.index_names = index_names
visibles = index_names_to_num(index_names, visible_names)
self.Clique, self.elim = make_cliques(families, cardinalities, visibles)
# Assign each conditional Probability table to one of the Clique potentials
# Clique2Pot is the assignments
self.Pot = families
self.Clique2Pot = np.zeros((len(self.Clique), len(self.Pot)))
selected = [False]*len(self.Pot)
for i,c in enumerate(self.Clique):
for j,p in enumerate(self.Pot):
if not selected[j]:
self.Clique2Pot[i,j] = set(p).issubset(c)
if self.Clique2Pot[i,j]:
selected[j] = True
# Find the root clique
# In our case it will be the one where all the visibles are a subset of
self.RootClique = -1
for i,c in enumerate(self.Clique):
if set(visibles).issubset(c):
self.RootClique = i
break
# Build the junction graph and compute a spanning tree
junction_graph_edges = []
for i,p in enumerate(self.Clique):
for j,q in enumerate(self.Clique):
ln = len(p.intersection(q))
if i<j and ln>0:
junction_graph_edges.append((ln,i,j))
junction_graph_edges.sort(reverse=True)
self.mst = mst(junction_graph_edges, len(self.Clique))
self.order, self.parent = bfs(self.mst, self.RootClique)
self.receive_from = make_list_receive_from(self.parent)
self.visibles = visibles
# Setup the data structures for the Junction tree algorithm
self.SeparatorPot = dict()
self.CliquePot = dict()
self.theta = theta
self.cardinalities_clamped = []
def propagate_observation(self, observed_configuration={}):
ev_names = list(observed_configuration.keys())
observed_states = [self.states[nm].index(observed_configuration[nm]) for nm in ev_names]
nums = index_names_to_num(self.index_names, ev_names)
#cardinalities_clamped = self.cardinalities.copy()
cardinalities_clamped = [1 if i in nums else c for i,c in enumerate(self.cardinalities)]
ev_states = [None]*len(self.cardinalities)
for i,e in zip(nums, observed_states):
ev_states[i] = e
# Collect stage
for c in reversed(self.order):
self.CliquePot[c] = np.ones(clique_shape(cardinalities_clamped, self.Clique[c]))
for p in self.receive_from[c]:
self.CliquePot[c] *= self.SeparatorPot[(p,c)]
# Prepare Clique Potentials
# Find probability tables that need to be multiplied into
# the Clique potential
idx = find(self.Clique2Pot[c, :])
if idx:
#print(idx)
#print(ev_states)
self.CliquePot[c] *= condition_and_multiply(self.theta, idx, ev_states)
# Set the separator potential
if not is_root(c, self.parent):
idx = self.Clique[self.parent[c]].intersection(self.Clique[c])
self.SeparatorPot[(c,self.parent[c])] = marginalize(self.CliquePot[c], idx, cardinalities_clamped)
# Distribution Stage
for c in self.order[1:]:
idx = self.Clique[self.parent[c]].intersection(self.Clique[c])
self.CliquePot[c] *= marginalize(self.CliquePot[self.parent[c]], idx, cardinalities_clamped)/self.SeparatorPot[(c,self.parent[c])]
self.cardinalities_clamped = cardinalities_clamped
self.values_clamped = ev_states
def propagate_table(self, X=None):
# Reset
self.values_clamped = [None]*len(self.cardinalities)
# Collect stage
for c in reversed(self.order):
self.CliquePot[c] = np.ones(clique_shape(self.cardinalities, self.Clique[c]))
for p in self.receive_from[c]:
self.CliquePot[c] *= self.SeparatorPot[(p,c)]
# Prepare Clique Potentials
# Find probability tables that need to be multiplied into
# the Clique potential
idx = find(self.Clique2Pot[c, :])
if idx:
self.CliquePot[c] *= multiply(self.theta, idx)
# Set the separator potential
if not is_root(c, self.parent):
idx = self.Clique[self.parent[c]].intersection(self.Clique[c])
self.SeparatorPot[(c,self.parent[c])] = marginalize(self.CliquePot[c], idx, self.cardinalities)
if X is not None:
SepX = marginalize(self.CliquePot[self.RootClique], self.visibles, self.cardinalities)
# Note: Take care of zero divide
self.CliquePot[self.RootClique] *= X/SepX
# Distribution Stage
for c in self.order[1:]:
idx = self.Clique[self.parent[c]].intersection(self.Clique[c])
self.CliquePot[c] *= marginalize(self.CliquePot[self.parent[c]], idx, self.cardinalities)/self.SeparatorPot[(c,self.parent[c])]
# def propagate(self, ev_names=[],ev_counts=None):
#
# if ev_names:
# X = evidence_to_table(ev_names, ev_counts, self.index_names, self.cardinalities, self.states)
# else:
# X = None
def compute_ESS(self, X=[]):
"""Compute Expected Sufficient Statistics for each probability table"""
E_S = dict()
self.propagate_table(X)
for c in self.order:
for n in find(self.Clique2Pot[c, :]):
E_S[n] = marginalize(self.CliquePot[c], self.Pot[n], self.cardinalities)
return E_S
def compute_marginal(self, var_names, normalization=False):
"""
Compute a marginal table on variables in var_names
if the variables are the subset of a clique, otherwise returns None.
var_names can be forced to be a subset of a clique by specifying
Engine(..., visible_names=var_names)
"""
var_indices = index_names_to_num(self.index_names, var_names)
idx = set(var_indices)
j = None
for c in self.order:
if idx.issubset(self.Clique[c]):
j = c
break
if j is not None:
if self.cardinalities_clamped:
if normalization:
return normalize(marginalize(self.CliquePot[j], var_indices, self.cardinalities_clamped))
else:
return marginalize(self.CliquePot[j], var_indices, self.cardinalities_clamped)
else:
if normalization:
return normalize(marginalize(self.CliquePot[j], var_indices, self.cardinalities))
else:
return marginalize(self.CliquePot[j], var_indices, self.cardinalities)
else:
print('Desired marginal is not a subset of any clique')
return None
def singleton_marginals(self, var_names, normalization=False):
""" For each variable in var_names compute its marginal """
L = {}
var_indices = index_names_to_num(self.index_names, var_names)
for j, v in enumerate(var_names):
marg = self.compute_marginal([v])
if normalization:
marg = normalize(marg)
if self.values_clamped[var_indices[j]] is None:
L[v] = {self.states[v][i]: p for i,p in enumerate(marg.flatten())}
else:
L[v] = {self.states[v][self.values_clamped[var_indices[j]]]: p for i,p in enumerate(marg.flatten())}
return L
def sample_table(self, var_names, num_of_samples=1):
#self.propagate_observation({})
P = self.compute_marginal(var_names)
if P is not None:
return np.random.multinomial(num_of_samples, P.flatten()).reshape(P.shape)
else:
return None
def marginal_table(self, marg_names, normalization=False):
clamped = self.values_clamped
return table_to_counts(self.compute_marginal(marg_names, normalization), marg_names, self.index_names, self.states, clamped)
Alice has two boxes, Box1 and Box2. In Box1, there are $10$ oranges, $4$ apples and $1$ banana, in Box2 there are $2$ oranges, $6$ apples and $2$ bananas. Alice chooses one of the boxes randomly, with equal probability and selects one of the fruits randomly, with equal probability.
In [10]:
## Define the model
index_names = ['Box', 'Fruit']
parents = {'Box': [], 'Fruit': ['Box']}
show_dag_image(index_names, parents)
In [25]:
states = {'Box': ['Box1', 'Box2'],
'Fruit': ['Apple', 'Orange', 'Banana']}
cardinalities = cardinalities_from_states(index_names, states)
# Conditional Probability Tables
cp_tables = {('Box'): [0.5, 0.5],
('Fruit', 'Box'): [[10./15, 4./15, 1./15],[2./10, 6./10, 2./10]]
}
# Initialize the correct index order for strided access by computing the necessary permutations
theta = make_cp_tables(index_names, cardinalities, cp_tables)
In [13]:
eng = Engine(index_names, parents, states, theta)
In [14]:
# Using the BJN
eng.propagate_observation()
#eng.propagate_table()
print(eng.compute_marginal(['Fruit']))
# Independent verification
print(marginalize(multiply(theta, [0,1]), [1], cardinalities))
Example Query: Given a banana is chosen, what is the probability that Alice took it from Box1?
We need to compute the posterior $$p(\text{Box}| \text{Fruit} = \text{`banana`}) = \frac{ p(\text{Fruit} = \text{`banana`}|\text{Box}) p(\text{Box})}{p(\text{Fruit} = \text{`banana`})}$$
BJN implements two methods for calculating the answer for this query.
propagate_observationpropagate_tableIn the first method, propagate_observation, a single observation is represented by a catalog with key-value pairs where keys are observed variable names and values are observed states. This is the standard method for inference in Bayesian networks.
In [15]:
#obs = {'Fruit': 'Orange', 'Box': 'Box1'}
#obs = {'Fruit': 'Orange'}
obs = {'Fruit':'Banana'}
eng = Engine(index_names, parents, states, theta)
eng.propagate_observation(obs)
marg_names = ['Box']
print(normalize(eng.compute_marginal(marg_names)))
# Independent verification
#print(normalize(multiply(theta, [0,1])[:,1]))
print(normalize(multiply(theta, [0,1])[:,2]))
In the second method, propagate_table, we assume that we have obtained a collection
of observations and we are given the counts of each possible configuration.
In [52]:
ev_names = ['Fruit']
ev_counts = {('Banana',): 1}
ev_table = counts_to_table(ev_names, ev_counts, index_names, states)
eng = Engine(index_names, parents, states, theta, visible_names=ev_names)
eng.propagate_table(ev_table)
marg_names = ['Box']
#idx = index_names_to_num(index_names, ['Box'])
print(normalize(eng.compute_marginal(marg_names)))
# Independent verification
print(normalize(multiply(theta, [0,1])[:,2]))
The advantage of using propagate_table is that if several observations are given on
the same variables, as is often the case in applications, the expected sufficient statistics can be computed in one collect-distribute iteration instead of a separate one for each observation.
In [28]:
ev_names = ['Fruit']
ev_counts = {('Orange',):3, ('Banana',):1}
#ev_counts = {('Orange',):2, ('Banana',):5, ('Apple',):12}
#ev_counts = {('Orange',):1}
C = counts_to_table(ev_names, ev_counts, index_names, states)
eng.propagate_table(C)
marg_names = ['Box']
#idx = index_names_to_num(index_names, ['Box'])
print(normalize(eng.compute_marginal(marg_names)))
In [17]:
eng.marginal_table(marg_names)
Out[17]:
In [29]:
ev_names = ['Fruit','Box']
ev_counts = {('Apple','Box1'):12, ('Orange','Box2'):2, ('Banana','Box2'):5}
eng = Engine(index_names, parents, states, theta, visible_names=ev_names)
C = counts_to_table(ev_names, ev_counts, index_names, states)
eng.propagate_table(C)
marg_names = ['Fruit', 'Box']
#idx = index_names_to_num(index_names, ['Box'])
print(normalize(eng.compute_marginal(marg_names)))
In [30]:
var_names = ['Fruit', 'Box']
ev_counts = sample_states(var_names, states, index_names, parents, theta, num_of_samples=1000)
C = counts_to_table(var_names, ev_counts, index_names, states)
#eng = Engine(index_names, visible_names, parents, cardinalities, theta)
#C = evidence_to_counts(X, index_names)
print(ev_counts)
print(C)
In [31]:
from itertools import product
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
## Define the model
index_names = ['Die1', 'Die2', 'Sum']
parents = {'Die1': [], 'Die2': [], 'Sum': ['Die1', 'Die2']}
states = {'Die1': list(range(1,7)), 'Die2': list(range(1,7)), 'Sum': list(range(2,13))}
cardinalities = cardinalities_from_states(index_names, states)
show_dag_image(index_names, parents)
S = np.zeros(cardinalities)
for i,j in product(range(1,7),repeat=2):
S[i-1,j-1,i+j-2] = 1
cp_tables = {('Die1',): [1/6, 1/6,1/6,1/6,1/6,1/6],
('Die2',): [1/6, 1/6,1/6,1/6,1/6,1/6],
('Sum','Die1','Die2'): S
}
theta = make_cp_tables(index_names, cardinalities, cp_tables )
eng = Engine(index_names, parents, states, theta)
In [32]:
eng.propagate_observation({'Sum':8})
marg = eng.compute_marginal(['Die1', 'Die2'], normalization=True)
marg
Out[32]:
In [33]:
vname = 'Die2'
eng.propagate_observation({'Sum':9})
marg = eng.singleton_marginals([vname], states)
a = [x for x in marg[vname].keys()]
b = [marg[vname][x] for x in marg[vname].keys()]
plt.title(vname)
plt.bar(a, normalize(b))
plt.ylim([0, 1])
plt.show()
In [34]:
ev_names = ['Sum']
ev_counts = {(2,):1, (3,):1, (4,):1}
C = counts_to_table(ev_names, ev_counts, index_names, states)
eng.propagate_table(C)
marg_names = ['Die1']
#idx = index_names_to_num(index_names, ['Box'])
print(normalize(eng.compute_marginal(marg_names)))
In [35]:
#marg_names = ['Sum','Die2', 'Die1']
marg_names = ['Die2', 'Die1']
eng.propagate_observation({'Sum':9})
clamped = eng.values_clamped
table_to_counts(eng.compute_marginal(marg_names), marg_names, index_names, states, clamped)
Out[35]:
In [139]:
eng.compute_marginal(marg_names)
Out[139]:
In [74]:
marg_names = ['Sum','Die2', 'Die1']
eng.propagate_observation({'Sum': 11})
eng.marginal_table(marg_names, normalization=True)
Out[74]:
In [29]:
marg_names = ['Sum','Die2', 'Die1']
T = eng.singleton_marginals(marg_names, normalization=True)
T
Out[29]:
In [32]:
## Define the model
index_names = ['i', 'j', 'k']
cardinalities = [10, 20, 3]
parents = {'i': [], 'j': ['k'], 'k': ['i']}
show_dag_image(index_names, parents)
#parents = {'k': [], 'i': ['k'], 'j': ['k']}
visible_names = ['i','j']
visibles = index_names_to_num(index_names, visible_names)
The chest clinic model (originally known also as Asia) is a famous toy medical expert system example from the classical 1988 paper of Lauritzen and Spiegelhalter.
https://www.jstor.org/stable/pdf/2345762.pdf
Shortness-of-breath (dyspnoea) may be due to tuberculosis, lung cancer or bronchitis, or none of them, or more than one of them. A recent visit to Asia increases the chances of tuberculosis, while smoking is known to be a risk factor for both lung cancer and bronchitis. The results of a single chest X-ray do not discriminate between lung cancer and tuberculosis, as neither does the presence or absence of dyspnoea.
Also see: http://www.bnlearn.com/documentation/man/asia.html
In this toy domain, a patient can have three diseases:
We also have the following possible a-priori effects on contracting a disease:
Logical variables may be present, for example
Symptoms or medical test results
In [11]:
# [A][S][T|A][L|S][B|S][E|T:L][X|E][D|B:E]
index_names = ['A', 'S', 'T', 'L', 'B', 'E', 'X', 'D']
parents = {'A':[], 'S':[], 'T':['A'], 'L':['S'], 'B':['S'], 'E':['T','L'], 'X':['E'], 'D':['B','E']}
## A Method for systematically entering the conditional probability tables
# P(A = Yes) = 0.01
# P(S = Yes) = 0.5
# P(T=Positive | A=Yes) = 0.05
# P(T=Positive | A=No) = 0.01
# P(L=Positive | S=Yes) = 0.1
# P(L=Positive | S=No) = 0.01
# P(B=Positive | S=Yes) = 0.6
# P(B=Positive | S=No) = 0.3
# P(E=True | L=Positive, T=Positive) = 1
# P(E=True | L=Positive, T=Negative) = 1
# P(E=True | L=Negative, T=Positive) = 1
# P(E=True | L=Negative, T=Negative) = 0
# P(X=Positive | E=True) = 0.98
# P(X=Positive | E=False) = 0.05
# P(D=Positive | E=True, B=Positive) = 0.9
# P(D=Positive | E=True, B=Negative) = 0.7
# P(D=Positive | E=False, B=Positive) = 0.8
# P(D=Positive | E=False, B=Negative) = 0.1
states = {'A':['No', 'Yes'],
'S':['No', 'Yes'],
'T':['Negative','Positive'],
'L':['Negative','Positive'],
'B':['Negative','Positive'],
'E':['False', 'True'],
'X':['Negative','Positive'],
'D':['Negative','Positive']}
cardinalities = cardinalities_from_states(index_names, states)
# Conditional Probability Tables
cp_tables = {('A',): [0.99, 0.01],
('S',): [0.5, 0.5],
('T','A'): [[0.99, 0.01],[0.95,0.05]],
('L','S'): [[0.99, 0.01],[0.9,0.1]],
('B','S'): [[0.7,0.3],[0.4, 0.6]],
('E','T','L'): [[[1.,0.],[0.,1.]] , [[0.,1.],[0.,1.]]],
('X','E'): [[0.95, 0.05], [0.02, 0.98]],
('D','B','E'):[[[0.9,0.1],[0.2,0.8]],[[0.3,0.7],[0.1,0.9]]]
}
# Todo: write a converter from a standard bn format
theta = make_cp_tables(index_names, cardinalities, cp_tables)
show_dag_image(index_names, parents)
eng = Engine(index_names, parents, states, theta)
In [12]:
eng.propagate_observation({'X': 'Positive', 'D':'Positive', 'S':'Yes'})
#normalize(eng.compute_marginal(['T']))
eng.marginal_table(['T'])
Out[12]:
In [13]:
eng.marginal_table(['T'])
Out[13]:
In [39]:
vis_names = ['A','B']
eng = Engine(index_names, parents, states, theta, visible_names=vis_names)
eng.propagate_observation({'X': 'Positive', 'D':'Positive', 'S':'Yes'})
eng.marginal_table(vis_names)
Out[39]:
In [40]:
marg = eng.compute_marginal(['A'])
marg
Out[40]:
In [41]:
eng.marginal_table(['A'])
Out[41]:
In [27]:
#eng.propagate_observation({'X': 'Positive', 'D':'Positive', 'S':'Yes'})
#eng.propagate_observation({'A':'Yes','S':'Yes','X':'Positive','D':'Positive'})
#eng.propagate_observation({'A':'Yes','S':'Yes'})
eng.propagate_observation({'X': 'Positive', 'A':'Yes', 'D':'Positive','S':'Yes'})
#eng.propagate_observation({'S':'Yes'})
#eng.propagate_observation({})
eng.singleton_marginals(['A','T','L','B'], normalization=True)
Out[27]:
In [114]:
normalize(eng.compute_marginal(['T','L']))
Out[114]:
In [43]:
eng.propagate_observation({'X':'Positive'})
var_names = ['T','L']
X = eng.sample_table(var_names, num_of_samples=10000)
table_to_counts(X, index_names, index_names, states, clamped=eng.values_clamped)
visibles = index_names_to_num(index_names, var_names)
visibles
Out[43]:
In [44]:
X.shape
Out[44]:
In [45]:
P = multiply(eng.theta, range(len(cardinalities)))
P /= marginalize(P, visibles, cardinalities)
E_S = X*P
In [46]:
marginalize(E_S, [1], cardinalities)
Out[46]:
In [47]:
#index_names = [a for a in set(['A', 'S', 'T', 'L', 'B', 'E', 'X', 'D'])]
index_names = ['A', 'S', 'T', 'L', 'B', 'E', 'X', 'D']
parents = random_parents(index_names)
show_dag_image(index_names, parents)
visible_names = ['X','A','B']
visibles = index_names_to_num(index_names, visible_names)
families = make_families(index_names, parents)
cardinalities = random_cardinalities(index_names)
states = states_from_cardinalities(index_names, cardinalities)
gamma = 0.01
theta = random_cp_tables(index_names, cardinalities, parents, gamma)
X = random_observations(cardinalities, visibles)
print(parents)
print(cardinalities)
print(states)
theta[0].shape
Out[47]:
In [152]:
index_names, parents, cardinalities, states = random_model(10, max_indeg=4)
show_dag_image(index_names, parents)
#theta = make_random_cp_tables(index_names, cardinalities, parents, gamma=0.01)
visible_names = index_names[-4:]
families = make_families(index_names, parents)
visibles = index_names_to_num(index_names, visible_names)
Clique, elim_seq = make_cliques(families, cardinalities, visibles, show_graph=False)
print(elim_seq)
print(Clique)
In [43]:
index_names = ['X1', 'X2', 'X3', 'X4', 'Y1', 'Y2', 'Y3', 'Y4']
parents = {'X1':[], 'Y1':['X1'], 'X2':['X1'], 'Y2':['X2'], 'X3':['X2'], 'Y3':['X3'], 'X4':['X3'], 'Y4':['X4'] }
cardinalities = [2]*8
show_dag_image(index_names, parents)
In [ ]:
#index_names = [a for a in set(['A', 'S', 'T', 'L', 'B', 'E', 'X', 'D'])]
index_names = ['A', 'S', 'T', 'L', 'B', 'E', 'X', 'D']
visible_names = ['X', 'A', 'D']
parents = random_parents(index_names)
cardinalities = random_cardinalities(index_names)
states = random_states(index_names, cardinalities)
show_dag_image(index_names, parents)
families = make_families(index_names, parents)
visibles = make_visibles(index_names, visible_names)
Clique, elim_seq = make_cliques(families, cardinalities, visibles=None)
gamma = 0.01
theta = make_random_cp_tables(index_names, cardinalities, parents, gamma)
In [146]:
index_names = ['A', 'S', 'T', 'L', 'B', 'E', 'X', 'D']
parents = {'A':[], 'S':[], 'T':['A'], 'L':['S'], 'B':['S'], 'E':['T','L'], 'X':['E'], 'D':['B','E']}
show_dag_image(index_names, parents)
cardinalities = [2,2,2,2,2,2,2,2]
states = states_from_cardinalities(index_names, cardinalities)
visible_names = ['A', 'X', 'D']
visibles = index_names_to_num(index_names, visible_names)
## Generate random potentials
gamma = 0.01
theta = random_cp_tables(index_names, cardinalities, parents, gamma)
#X = random_observations(cardinalities, visibles)
eng = Engine(index_names, parents, states, theta, visible_names)
eng.propagate_observation({})
eng.sample_table(visible_names, num_of_samples=1000 )
Out[146]:
In [147]:
eng.propagate_observation()
eng.marginal_table(visible_names)
Out[147]:
In [148]:
X = eng.sample_table(visible_names, num_of_samples=1000)
In [149]:
X
Out[149]:
In [150]:
E_S_new = eng.compute_ESS()
In [323]:
P = multiply(eng.theta, range(len(cardinalities)))
E_S = P
In [105]:
E_S_new = eng.propagate(X)
In [324]:
P = multiply(eng.theta, range(len(cardinalities)))
Px = marginalize(P, visibles, cardinalities)
P /= Px
E_S = X*P
In [49]:
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
from ipywidgets import interact, interactive, fixed
import ipywidgets as widgets
from IPython.display import clear_output, display, HTML
from matplotlib import rc
mpl.rc('font',**{'family':'sans-serif','sans-serif':['Helvetica']})
mpl.rc('text', usetex=True)
fig = plt.figure(figsize=(5,5))
ax = plt.gca()
ln = plt.Line2D([0],[0])
ax.add_line(ln)
ax.set_xlim([-1,1])
ax.set_ylim([-1,1])
ax.set_axis_off()
plt.close(fig)
def set_line(th):
ln.set_xdata([np.cos(th), -np.cos(th)])
ln.set_ydata([np.sin(th), -np.sin(th)])
display(fig)
interact(set_line, th=(0.0, 2*np.pi,0.01))
Out[49]:
In [76]:
widgets.IntSlider(
value=7,
min=0,
max=10,
step=1,
description='Test:',
disabled=False,
continuous_update=False,
orientation='horizontal',
readout=True,
readout_format='d'
)
In [93]:
widgets.FloatSlider(
value=7.5,
min=0,
max=10.0,
step=0.1,
description='Test:',
disabled=False,
continuous_update=False,
orientation='vertical',
readout=True,
readout_format='.2f',
)
In [56]:
w = widgets.IntRangeSlider(
value=[5, 7],
min=0,
max=10,
step=1,
description='Test:',
disabled=False,
continuous_update=False,
orientation='horizontal',
readout=True,
readout_format='d',
)
display(w)
In [60]:
w.get_interact_value()
Out[60]:
In [61]:
w = widgets.BoundedIntText(
value=7,
min=0,
max=10,
step=1,
description='Text:',
disabled=False
)
In [62]:
display(w)
In [65]:
w.value
Out[65]:
In [52]:
accordion = widgets.Accordion(children=[widgets.IntSlider(), widgets.Text()])
accordion.set_title(0, 'Slider')
accordion.set_title(1, 'Text')
accordion
In [85]:
tab_nest = widgets.Tab()
tab_nest.children = [accordion, accordion]
tab_nest.set_title(0, 'An accordion')
tab_nest.set_title(1, 'Copy of the accordion')
tab_nest
In [53]:
items = [widgets.Label(str(i)) for i in range(4)]
w2 = widgets.HBox(items)
display(w2)
In [54]:
items = [widgets.Label(str(i)) for i in range(4)]
left_box = widgets.VBox([items[0], items[1]])
right_box = widgets.VBox([items[2], items[3]])
widgets.HBox([left_box, right_box])
In [94]:
w = widgets.Dropdown(
options=[('One', 1), ('Two', 2), ('Three', 3)],
value=2,
description='Number:',
)
display(w)
In [122]:
w = widgets.ToggleButtons(
options=['Slow', 'Regular', 'Fast','Ultra'],
description='Speed:',
disabled=False,
button_style='warning', # 'success', 'info', 'warning', 'danger' or ''
tooltips=['Description of slow', 'Description of regular', 'Description of fast'],
#icons=['check'] * 3
)
display(w)
In [ ]:
w.
In [120]:
w = widgets.Select(
options=['Linux', 'Windows', 'OSX', '?'],
value='OSX',
rows=4,
description='OS:',
disabled=False
)
w2 = widgets.Select(
options=['A', 'B', 'C','D', '?'],
value='?',
rows=5,
description='Class:',
disabled=True
)
display(w, w2)
In [121]:
w2.disabled = False
In [110]:
caption = widgets.Label(value='The values of range1 and range2 are synchronized')
slider = widgets.IntSlider(min=-5, max=5, value=1, description='Slider')
def handle_slider_change(change):
caption.value = 'The slider value is ' + (
'negative' if change.new < 0 else 'nonnegative'
)
slider.observe(handle_slider_change, names='value')
display(caption, slider)
In [112]:
from IPython.display import display
button = widgets.Button(description="Click Me!")
output = widgets.Output()
display(button, output)
def on_button_clicked(b):
with output:
print("Button clicked.")
print(b)
button.on_click(on_button_clicked)
In [114]:
int_range = widgets.IntSlider()
output2 = widgets.Output()
display(int_range, output2)
def on_value_change(change):
with output2:
print(change['new'])
int_range.observe(on_value_change, names='value')