Learning Math with LSTMs and Keras

This is the notebook that goes with this blog post and this repository.

It generates it's own data so no need to connect to drive or anything šŸ™‚

Enjoy!

Generating Math Formulas as Strings


InĀ [0]:
import itertools
import random

def generate_equations(shuffle=True, max_count=None):
    """
    Generates all possible math equations given the global configuration.
    If max_count is given, returns that many at most. If shuffle is True,
    the equation will be generated in random order.
    """
    # Generate all possible unique sets of numbers
    number_permutations = itertools.permutations(
        range(MIN_NUMBER, MAX_NUMBER + 1), 2
    )

    # Shuffle if required. The downside is we need to convert to list first
    if shuffle:
        number_permutations = list(number_permutations)
        random.shuffle(number_permutations)

    # If a max_count is given, use itertools to only look at that many items
    if max_count is not None:
        number_permutations = itertools.islice(number_permutations, max_count)

    # Build an equation string for each and yield to caller
    for x, y in number_permutations:
        yield '{} + {}'.format(x, y)

Encoding Strings for Neural Networks


InĀ [0]:
import numpy as np

CHARS = [str(n) for n in range(10)] + ['+', ' ', '\0']
CHAR_TO_INDEX = {i: c for c, i in enumerate(CHARS)}
INDEX_TO_CHAR = {c: i for c, i in enumerate(CHARS)}

def one_hot_to_index(vector):
    if not np.any(vector):
        return -1

    return np.argmax(vector)

def one_hot_to_char(vector):
    index = one_hot_to_index(vector)
    if index == -1:
        return ''

    return INDEX_TO_CHAR[index]

def one_hot_to_string(matrix):
    return ''.join(one_hot_to_char(vector) for vector in matrix)

InĀ [0]:
def equations_to_x_y(equations, n):
    """
    Given a list of equations, converts them to one-hot vectors to build
    two data matrixes x and y.
    """
    x = np.zeros(
        (n, MAX_EQUATION_LENGTH, N_FEATURES), dtype=np.bool
    )
    y = np.zeros(
        (n, MAX_RESULT_LENGTH, N_FEATURES), dtype=np.bool
    )

    # Get the first n_test equations and convert to test vectors
    for i, equation in enumerate(itertools.islice(equations, n)):
        result = str(eval(equation))

        # Pad the result with spaces
        result = ' ' * (MAX_RESULT_LENGTH - 1 - len(result)) + result

        # We end each sequence in a sequence-end-character:
        equation += '\0'
        result += '\0'

        for t, char in enumerate(equation):
            x[i, t, CHAR_TO_INDEX[char]] = 1

        for t, char in enumerate(result):
            y[i, t, CHAR_TO_INDEX[char]] = 1

    return x, y


def build_dataset():
    """
    Generates equations based on global config, splits them into train and test
    sets, and returns (x_test, y_test, x_train, y_train).
    """
    generator = generate_equations(max_count=N_EXAMPLES)

    # Split into training and test set based on SPLIT:
    n_test = round(SPLIT * N_EXAMPLES)
    n_train = N_EXAMPLES - n_test

    x_test, y_test = equations_to_x_y(generator, n_test)
    x_train, y_train = equations_to_x_y(generator, n_train)

    return x_test, y_test, x_train, y_train

InĀ [0]:
def print_example_predictions(count, model, x_test, y_test):
    """
    Print some example predictions along with their target from the test set.
    """
    print('Examples:')

    # Pick some random indices from the test set
    prediction_indices = np.random.choice(
        x_test.shape[0], size=count, replace=False
    )
    # Get a prediction of each
    predictions = model.predict(x_test[prediction_indices, :])

    for i in range(count):
        print('{} = {}   (expected: {})'.format(
            one_hot_to_string(x_test[prediction_indices[i]]),
            one_hot_to_string(predictions[i]),
            one_hot_to_string(y_test[prediction_indices[i]]),
        ))

Building the Model in Keras


InĀ [0]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, RepeatVector, Dense, Activation
from tensorflow.keras.layers import TimeDistributed, Bidirectional
from tensorflow.keras.optimizers import Adam

def build_model():
    """
    Builds and returns the model based on the global config.
    """
    input_shape = (MAX_EQUATION_LENGTH, N_FEATURES)

    model = Sequential()

    # Encoder:
    model.add(Bidirectional(LSTM(20), input_shape=input_shape))

    # The RepeatVector-layer repeats the input n times
    model.add(RepeatVector(MAX_RESULT_LENGTH))

    # Decoder:
    model.add(Bidirectional(LSTM(20, return_sequences=True)))

    model.add(TimeDistributed(Dense(N_FEATURES)))
    model.add(Activation('softmax'))

    model.compile(
        loss='categorical_crossentropy',
        optimizer=Adam(lr=0.01),
        metrics=['accuracy'],
    )

    return model


Using TensorFlow backend.

Config


InĀ [0]:
MIN_NUMBER = 0
MAX_NUMBER = 999

MAX_N_EXAMPLES = (MAX_NUMBER - MIN_NUMBER) ** 2
N_EXAMPLES = 30000
N_FEATURES = len(CHARS)
MAX_NUMBER_LENGTH_LEFT_SIDE = len(str(MAX_NUMBER))
MAX_NUMBER_LENGTH_RIGHT_SIDE = MAX_NUMBER_LENGTH_LEFT_SIDE + 1
MAX_EQUATION_LENGTH = (MAX_NUMBER_LENGTH_LEFT_SIDE * 2) + 4
MAX_RESULT_LENGTH = MAX_NUMBER_LENGTH_RIGHT_SIDE + 1

SPLIT = .1
EPOCHS = 200
BATCH_SIZE = 256

RANDOM_SEED = 1

Setting up TensorBoard


InĀ [0]:
%load_ext tensorboard

InĀ [0]:
%tensorboard --logdir logs


Training


InĀ [0]:
# Fix the random seed to get a consistent dataset
random.seed(RANDOM_SEED)

x_test, y_test, x_train, y_train = build_dataset()

InĀ [16]:
import os
import datetime
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard

model = build_model()

model.summary()
print()

# Let's print some predictions now to get a feeling for the equations
print()
print_example_predictions(5, model, x_test, y_test)
print()

logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

try:
    model.fit(
        x_train, y_train,
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        validation_data=(x_test, y_test),
        verbose=2,
        callbacks=[
            ModelCheckpoint(
                'model.h5',
                save_best_only=True,
            ),
            TensorBoard(logdir, histogram_freq=1)
        ]
    )
except KeyboardInterrupt:
    print('\nCaught SIGINT\n')

# Load weights achieving best val_loss from training:
model.load_weights('model.h5')

print_example_predictions(20, model, x_test, y_test)


_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
bidirectional_9 (Bidirection (None, 40)                5440      
_________________________________________________________________
repeat_vector_5 (RepeatVecto (None, 5, 40)             0         
_________________________________________________________________
bidirectional_10 (Bidirectio (None, 5, 40)             9760      
_________________________________________________________________
time_distributed_5 (TimeDist (None, 5, 13)             533       
_________________________________________________________________
activation_5 (Activation)    (None, 5, 13)             0         
=================================================================
Total params: 15,733
Trainable params: 15,733
Non-trainable params: 0
_________________________________________________________________


Examples:
212 + 282 = 33333   (expected:  494)
114 + 934 = 00000   (expected: 1048)
914 + 865 = 00000   (expected: 1779)
355 + 799 = 00   (expected: 1154)
18 + 516 = 0000   (expected:  534)

Train on 27000 samples, validate on 3000 samples
Epoch 1/200

Caught SIGINT

Examples:
666 + 200 =  866   (expected:  866)
683 + 872 = 1555   (expected: 1555)
183 + 651 =  834   (expected:  834)
587 + 510 = 1097   (expected: 1097)
925 + 88 = 1013   (expected: 1013)
230 + 211 =  441   (expected:  441)
809 + 881 = 1690   (expected: 1690)
908 + 275 = 1183   (expected: 1183)
642 + 735 = 1377   (expected: 1377)
331 + 740 = 1071   (expected: 1071)
56 + 60 =  116   (expected:  116)
647 + 16 =  663   (expected:  663)
582 + 776 = 1358   (expected: 1358)
641 + 270 =  911   (expected:  911)
926 + 558 = 1484   (expected: 1484)
32 + 985 = 1017   (expected: 1017)
473 + 222 =  695   (expected:  695)
270 + 498 =  768   (expected:  768)
482 + 298 =  780   (expected:  780)
628 + 751 = 1379   (expected: 1379)

Try out the model


InĀ [0]:
def predict(model, equation):
    """
    Given a model and an equation string, returns the predicted result.
    """
    x = np.zeros((1, MAX_EQUATION_LENGTH, N_FEATURES), dtype=np.bool)
    equation += '\0'

    for t, char in enumerate(equation):
        x[0, t, CHAR_TO_INDEX[char]] = 1

    predictions = model.predict(x)
    return one_hot_to_string(predictions[0])[:-1]

InĀ [0]:
predict(model, '111 + 222')


Out[0]:
' 333'