projection of $x_0$ onto $\lbrace x \mid Ax = b\rbrace$ given by $$ \mbox{proj}(x_0) = \left(I - A^T (AA^T)^{-1}A\right)x_0 + A^T (AA^T)^{-1}b = \left(I - A^T (AA^T)^{-1}A\right)x_0 + c $$
In [1]:
import numpy as np
from scipy.linalg import cho_factor, cho_solve
%matplotlib inline
import matplotlib.pyplot as plt
def factor(A,b):
"""Return cholesky factorization data to project onto Ax=b."""
AAt = A.dot(A.T)
chol = cho_factor(AAt, overwrite_a=True)
c = cho_solve(chol, b, overwrite_b=False)
c = A.T.dot(c)
proj_data = dict(A=A, b=b, chol=chol,c=c)
return proj_data
def proj(proj_data, x0):
"""Use cholesky factorization data to project onto Ax=b."""
A, chol, c = (proj_data[k] for k in 'A chol c'.split())
x = A.dot(x0)
x = cho_solve(chol, x, overwrite_b=True)
x = A.T.dot(x)
x = x0 - x + c
return x
def average(*vals):
""" Come to a consensus """
return np.mean(vals, axis=0)
def make_data(k, rows, seed=0):
"""Make some random test data."""
# each of k chunks gets 'rows' rows of the full matrix
n = rows*k
np.random.seed(seed)
Ahat = np.random.randn(n,n)
bhat = np.random.randn(n)
x_true = np.linalg.solve(Ahat,bhat)
x0 = np.random.randn(n)
As = []
bs = []
for i in range(k):
s = slice(i*rows,(i+1)*rows)
As += [Ahat[s,:]]
bs += [bhat[s]]
return As, bs, x_true, x0
In [2]:
As, bs, x_true, x0 = make_data(4, 2, seed=0)
proj_data = list(map(factor, As, bs))
x = x0
r = []
for i in range(1000):
z = (proj(d,x) for d in proj_data)
x = average(*z)
r.append(np.linalg.norm(x_true-x))
plt.semilogy(r)
Out[2]:
In [3]:
As, bs, x_true, x0 = make_data(4, 100, seed=0)
proj_data = list(map(factor, As, bs))
x = x0
r = []
for i in range(1000):
z = (proj(d,x) for d in proj_data)
x = average(*z)
r.append(np.linalg.norm(x_true-x))
plt.semilogy(r)
Out[3]:
In [4]:
As, bs, x_true, x0 = make_data(4, 1000, seed=0)
In [5]:
%%time
proj_data = list(map(factor, As, bs))
x = x0
r = []
for i in range(10):
z = (proj(d,x) for d in proj_data)
x = average(*z)
r.append(np.linalg.norm(x_true-x))
In [5]:
As, bs, x_true, x0 = make_data(4, 3000, seed=0)
proj_data = list(map(factor, As, bs))
In [4]:
%%timeit -n1 -r50
a= list(map(lambda d: proj(d, x0), proj_data))
In [4]:
import concurrent.futures
from multiprocessing.pool import ThreadPool
In [23]:
ex = concurrent.futures.ThreadPoolExecutor(2)
pool = ThreadPool(2)
In [33]:
%timeit -n1 -r50 list(ex.map(lambda d: proj(d, x0), proj_data))
In [34]:
%timeit -n1 -r50 list(pool.map(lambda d: proj(d, x0), proj_data))
In [36]:
242/322.0
Out[36]:
In [6]:
import dask
from dask import do, value, compute, visualize, get
from dask.imperative import Value
from dask.dot import dot_graph
from itertools import repeat
def enum_values(vals, name=None):
"""Create values with a name and a subscript"""
if not name:
raise ValueError('Need a name.')
return [value(v,name+'_%d'%i) for i,v in enumerate(vals)]
def rename(value, name):
"""Rename a Value."""
d = dict(value.dask)
d[name] = d[value.key]
del d[value.key]
return Value(name, [d])
def enum_map(func, *args, name=None):
"""Map `func` over `args` to create `Value`s with a name and a subscript."""
if not name:
raise ValueError('Need a name.')
values = (do(func)(*a) for a in zip(*args))
return [rename(v, name+'_%d'%i) for i, v in enumerate(values)]
def step(proj_data, xk, k=None):
"""One step of the projection iteration."""
if k is None:
sufx = '^k+1'
else:
sufx = '^%d'%k
z = enum_map(proj, proj_data, repeat(xk), name='z'+sufx)
xkk = do(average)(*z)
xkk = rename(xkk, 'x'+sufx)
return xkk
In [7]:
lAs = enum_values(As, 'A')
lbs = enum_values(bs, 'b')
proj_data = enum_map(factor, lAs, lbs, name='proj_data')
visualize(*proj_data)
Out[7]:
In [8]:
pd_val = [pd.compute() for pd in proj_data]
xk = value(x0,'x^k')
xkk = step(pd_val, xk)
xkk.visualize()
Out[8]:
The setup step along with 3 iterations gives the following dask graph. (Which I'm showing mostly because it was satisfying to make.)
In [9]:
x = value(x0,'x^0')
for k in range(3):
x = step(proj_data, x, k+1)
x.visualize()
Out[9]:
Obviously, it's not efficient to make a huge dask graph, especially if I'll be doing thousands of iterations. I really just want to create the dask graph for computing $x^{k+1}$ from $x^k$ and re-apply it at every iteration. Is it more efficient to create that dask graph once and reuse it? Maybe that's a premature optimization... I'll do it anyway for fun.
In [10]:
proj_data = enum_map(factor, As, bs, name='proj_data')
proj_data = compute(*proj_data)
x = value(0,'x^k')
x = step(proj_data, x)
dsk_step = x.dask
dot_graph(dsk_step)
Out[10]:
In [11]:
dask.set_options(get=dask.threaded.get) # multiple threads
#dask.set_options(get=dask.async.get_sync) # single thread
Out[11]:
In [12]:
%%time
# do one-time computation of factorizations
proj_data = enum_map(factor, As, bs, name='proj_data')
# realize the computations, so they aren't recomputed at each iteration
proj_data = compute(*proj_data)
# get dask graph for reuse
x = value(x0,'x^k')
x = step(proj_data, x)
dsk_step = x.dask
K = 100
r = []
for k in range(K):
dsk_step['x^k'] = get(dsk_step, 'x^k+1')
r.append(np.linalg.norm(x_true-dsk_step['x^k']))
In [13]:
%%time
# serial execution
proj_data = list(map(factor, As, bs))
x = x0
K = 100
r = []
for i in range(K):
z = (proj(d,x) for d in proj_data)
x = average(*z)
r.append(np.linalg.norm(x_true-x))
dask.set_options(get=dask.threaded.get) and dask.set_options(get=dask.async.get_sync); not sure if it's actually changing the scheduler, but I haven't looked into it closelyValue and change the data that it points todask.visualize also worked on Dask graph dictionaries, so I don't have to remember dask.dot.dot_graph
In [15]:
%%time
# do one-time computation of factorizations
proj_data = enum_map(factor, As, bs, name='proj_data')
# realize the computations, so they aren't recomputed at each iteration
proj_data = compute(*proj_data, get=dask.threaded.get, num_workers=2)
# get dask graph for reuse
x = value(x0,'x^k')
x = step(proj_data, x)
dsk_step = x.dask
K = 100
r = []
for k in range(K):
dsk_step['x^k'] = dask.threaded.get(dsk_step, 'x^k+1', num_workers=2)
r.append(np.linalg.norm(x_true-dsk_step['x^k']))
dask.set_options(get=dask.threaded.get)
In [23]:
%%time
# do one-time computation of factorizations
proj_data = enum_map(factor, As, bs, name='proj_data')
# realize the computations, so they aren't recomputed at each iteration
proj_data = compute(*proj_data)
# get dask graph for reuse
x = value(x0,'x^k')
x = step(proj_data, x)
dsk_step = x.dask
K = 100
r = []
for k in range(K):
dsk_step['x^k'] = get(dsk_step, 'x^k+1', num_workers=2)
r.append(np.linalg.norm(x_true-dsk_step['x^k']))
In [16]:
%%time
# do one-time computation of factorizations
proj_data = enum_map(factor, As, bs, name='proj_data')
# realize the computations, so they aren't recomputed at each iteration
proj_data = compute(*proj_data)
# get dask graph for reuse
x = value(x0,'x^k')
x = step(proj_data, x)
dsk_step = x.dask
K = 100
r = []
for k in range(K):
dsk_step['x^k'] = get(dsk_step, 'x^k+1', num_workers=2)
r.append(np.linalg.norm(x_true-dsk_step['x^k']))
In [ ]:
np.__config__.show()
In [ ]: