In [ ]:
'''
http://iamtrask.github.io/2016/02/25/deepminds-neural-stack-machine/
http://machinaut.com/a-graphical-explanation-of-deepminds-neural-stacks/
'''

In [1]:
import numpy as np

In [2]:
def s_t(i, t, u, d):
    if (i >= 0 and i < t):
        # when calculate the strength, the newest pushed row is not included
        inner_sum = sum(s[t-1][i+1:t])
        out = max(0, s[t-1][i] - max(0, u[t] - inner_sum))
        # out >= 0
        return out
    elif (i == t):
        return d[t]
    else:
        print("Undefined i -> t relationship: i <= t")

In [20]:
def s_t_error(i, t, u, d, error):
    if (i >= 0 and i < t):     
        partial_sum = sum(s[t-1][i+1:t])
        if abs(s[t-1][i] - max(0, u[t] - partial_sum)) >= 1e-10:
            s_delta[t-1][i] += error
            if abs(u[t] - partial_sum) >= 1e-10:
                s_delta[t-1][i+1:t-1] += error
                u_delta[t] -= error
    elif i == t:
        d_delta[t] += error
    else:
        print("Undefined!")

In [4]:
# when read from the stack, the newest pushed row is included
def r_t(t):
    r_t_out = np.zeros(stack_width)
    for i in range(0, t+1):
        temp = min(s[t][i], max(0, 1 - sum(s[t][i+1:t+1])))
        r_t_out += temp * V[t][i]
    return r_t_out

In [5]:
def r_t_error(t, r_t_error):
    for i in range(0, t+1):
        partial_sum = np.sum(s[t][i+1:t+1])
        temp = min(s[t][i], max(0, 1 - partial_sum))
        
        # get the delta for V
        V_delta[t][i] += temp * r_t_error
        
        # total derivative with respect to s[t][i]
        # r_t_error is d(TotalError_t)/dr_t, r_t_error is a vector of stack_width elements
        # we would like to get d(TotalError_t)/d_s_t_i
        # and s_t[i] impacts every element in V[t][i]
        # This is the total derivative with respect to s[t][i]
        # element-wise multiple
        temp_error = sum(r_t_error * V[t][i])
        if (s[t][i] < max(0, 1 - partial_sum)):
            s_delta[t][i] += temp_error
        else:
            if (max(0, 1 - partial_sum) > 0):
                # here -= temp_error applied to elements from i+1 to t in s[t]
                s_delta[t][i+1:t] -= temp_error

In [6]:
def pushAndPop(v_t, d_t, u_t, t):
    d.append(d_t)
    u.append(u_t)
    
    new_s = np.zeros(t+1)
    for i in range(t+1):
        new_s[i] = s_t(i, t, u, d)
    s.append(new_s)
    
    if (len(V) == 0):
        V_t = np.zeros((1, stack_width))
        V_t += v_t
    else:
        # V[-1] is the memory snapshot in the last timestamp
        depth = len(V[-1])
        # the new memory will add one more row: the depth will increase by 1
        # the difference from the last memory snapshot is adding one more row
        V_t = np.zeros((depth+1, stack_width))
        for i in range(depth):
            V_t[i] += V[-1][i]
        # the newest row is v_t, which is a list of stack_width elements
        V_t[depth] += v_t
    
    V.append(V_t)
    # read from the stack including the newest row
    return r_t(t)

In [7]:
def pushAndPopDeriv(v_t, d_t, u_t, t, deriv=False, error=None):
    if not deriv:
        d.append(d_t)
        d_delta.append(0)
        
        u.append(u_t)
        u_delta.append(0)
        
        new_s = np.zeros(t+1)
        for i in range(t+1):
            new_s[i] = s_t(i, t, u, d)
        s.append(new_s)
        s_delta.append(np.zeros(new_s.shape[0]))
        
        if len(V) == 0:
            V_t = np.zeros((1, stack_width))
            # V_t += v_t
        else:
            depth = len(V[-1])
            V_t = np.zeros((depth + 1, stack_width))
            for i in range(depth):
                V_t[i] += V[-1][i]
            V_t[depth] += v_t
        V.append(V_t)
        V_delta.append(np.zeros(V[-1].shape))
        
        return r_t(t)
    else:
        r_t_error(t, error)
        for i in range(t+1):
            s_t_error((t+1) - i - 1, t, u, d, s_delta[t][i])

In [8]:
stack_width = 3
copy_length = 5

v_0 = np.zeros(stack_width)
v_0[0] = 1
v_1 = np.zeros(stack_width)
v_1[1] = 1
v_2 = np.zeros(stack_width)
v_2[2] = 1

# init
V = list() # stack states
s = list() # stack strengths
d = list() # push strengths
u = list() # pop strengths

print(str(pushAndPop(v_0,0.8,0.0,0)))
print(str(pushAndPop(v_1,0.5,0.1,1)))
print(str(pushAndPop(v_2,0.9,0.9,2)))

# Stack is empty again
V = list() # stack states
s = list() # stack strengths 
d = list() # push strengths
u = list() # pop strengths

assert str(pushAndPop(v_0,0.8,0.0,0)) == str((0.8 * v_0))
assert str(pushAndPop(v_1,0.5,0.1,1)) == str((0.5 * v_0) + (0.5 * v_1))
assert str(pushAndPop(v_2,0.9,0.9,2)) == str((0.9 * v_2) + (0 * v_1) + (0.1 * v_0))

print("\nFinal Value of S:")
for i in range(3):
  print(s_t(2-i,2,u,d))

print("\nPassed All Assertions!!!")


[ 0.8  0.   0. ]
[ 0.5  0.5  0. ]
[ 0.1  0.   0.9]

Final Value of S:
0.9
0
0.3

Passed All Assertions!!!

In [47]:
u_weights = np.array([0.1,0.1,0.1,0.1,0.1,0.1])
# push the 3 sequences onto the stack with strength of 1
# try different d_weights initialize, the result is different. need understand why
d_weights = np.array([0.1,0.1,.1,0.1,0.1,0.1])
stack_width = 2
copy_length = 5

# INIT
V = list() # stack states
s = list() # stack strengths 
d = list() # push strengths
u = list() # pop strengths

V_delta = list() # stack states
s_delta = list() # stack strengths 
d_delta = list() # push strengths
u_delta = list() # pop strengths

for i in range(3001):
    alpha = 5 * ( (1 - (float(i)/500)) ** 2)
    sequence = np.array([[0.1, 0.2, 0.3], [0, 0, 0]]).T
    
    # RE-INITIALIZE WEIGHTS (empty the stack and strengths)
    V = list() # stack states
    s = list() # stack strengths 
    d = list() # push strengths
    u = list() # pop strengths

    V_delta = list() # stack states
    s_delta = list() # stack strengths 
    d_delta = list() # push strengths
    u_delta = list() # pop strengths
    
    # forward propagation
    out_0 = pushAndPopDeriv(sequence[0], d_weights[0], u_weights[0], 0)
    out_1 = pushAndPopDeriv(sequence[1], d_weights[1], u_weights[1], 1)
    out_2 = pushAndPopDeriv(sequence[2], d_weights[2], u_weights[2], 2)
    
    out_3 = pushAndPopDeriv(np.zeros(2), d_weights[3], u_weights[3], t=3)
    out_4 = pushAndPopDeriv(np.zeros(2), d_weights[4], u_weights[4], t=4)
    out_5 = pushAndPopDeriv(np.zeros(2), d_weights[5], u_weights[5], t=5)
    
    # backward propagation
    y = np.array([1, 0])
    
    pushAndPopDeriv(sequence[2], d_weights[2], u_weights[2], 2, deriv=True, error = out_0 - 0)
    pushAndPopDeriv(sequence[1], d_weights[1], u_weights[1], 1, deriv=True, error = out_1 - 0)
    pushAndPopDeriv(sequence[0], d_weights[0], u_weights[0], 0, deriv=True, error = out_2 - 0)
    
    pushAndPopDeriv(np.zeros(2), d_weights[5], u_weights[5], 5, deriv=True, error = out_5 - y*0.1)
    pushAndPopDeriv(np.zeros(2), d_weights[4], u_weights[4], 4, deriv=True, error = out_4 - y*0.2)
    pushAndPopDeriv(np.zeros(2), d_weights[3], u_weights[3], 3, deriv=True, error = out_3 - y*0.3)
    
    alpha = 0.2
    
    u_weights[0] -= alpha * u_delta[0]
    u_weights[1] -= alpha * u_delta[1]
    u_weights[2] -= alpha * u_delta[2]
    
    u_weights[3] -= alpha * u_delta[3]
    u_weights[4] -= alpha * u_delta[4]
    u_weights[5] -= alpha * u_delta[5]
    
    d_weights[2] -= alpha * d_delta[2]
    d_weights[1] -= alpha * d_delta[1]
    d_weights[0] -= alpha * d_delta[0]    
    
    d_weights[3] -= alpha * d_delta[3]
    d_weights[4] -= alpha * d_delta[4]
    d_weights[5] -= alpha * d_delta[5]
    
    for k in range(len(d_weights)):
        if d_weights[k] < 0:
            d_weights[k] = 0
        elif d_weights[k] > 1:
            d_weights[k] = 1
            
        if u_weights[k] < 0:
            u_weights[k] = 0
        elif u_weights[k] > 1:
            u_weights[k] = 1
    
    if i % 500 == 0:
        print("\nIteration:", i)
        # print u_delta
        print(out_3[0], out_4[0], out_5[0])


Iteration: 0
0.0 0.0 0.0

Iteration: 500
0.0 0.0 0.0

Iteration: 1000
0.0 0.0 0.0

Iteration: 1500
0.0 0.0 0.0

Iteration: 2000
0.0 0.0 0.0

Iteration: 2500
0.0 0.0 0.0

Iteration: 3000
0.0 0.0 0.0

In [ ]: