Build a model to help pronounce english words. We'll convert english words in to Arpabet phoneme
@sunilmallya
In [6]:
# Load data
#cmudict.0.6d
data = open('cmudict-0.7b', 'r').readlines()
phones = open('cmudict-0.7b.phones', 'r').readlines()
phones = open('cmudict-0.7b.symbols', 'r').readlines()
In [7]:
words = []
phones = []
def f_char(word):
for c in ["(", ".", "'", ")", "-", "_", "\xc0", "\xc9"]:
#print c in word, type(word)
if c in word:
return True
return False
for d in data:
parts = d.strip('\n').split(' ')
if not f_char(parts[0]):
words.append(parts[0])
phones.append(parts[1])
In [8]:
words[:5], phones[:5]
Out[8]:
In [9]:
len(words), len(phones)
Out[9]:
In [10]:
all_chars = set()
for word, phone in zip(words, phones):
for c in word:
all_chars.add(c)
for p in phone.split(" "):
all_chars.add(p)
print all_chars
In [28]:
# Create a map of symbols to numbers
symbol_set = list(all_chars)
symbol_set.append("+") # add space for padding
# word to symbol index
def word_to_symbol_index(word):
return [symbol_set.index(char) for char in word]
# list of symbol index to word
def symbol_index_to_word(indices):
return [symbol_set[idx] for idx in indices]
# phone to symbol index
def phone_to_symbol_index(phone):
return [symbol_set.index(p) for p in phone.split(" ")]
# list of symbol index to word
def psymbol_index_to_word(indices):
return [symbol_set[idx] for idx in indices]
print symbol_set
In [12]:
# sample
indices = word_to_symbol_index("ARDBERG")
print indices, symbol_index_to_word(indices)
In [13]:
indices = phone_to_symbol_index("AA1 B ER0 G")
print indices, symbol_index_to_word(indices)
In [14]:
# Pad input and output data
input_sequence_length = max([len(w) for w in words])
output_sequence_length = max([len(p.split(' ')) for p in phones])
input_sequence_length, output_sequence_length
Out[14]:
In [15]:
# input data
trainX = []
labels = []
def pad_string(word, max_len, pad_char = "+"):
out = ''
for _ in range(max_len - len(word)):
out += pad_char
return out + word
#for word in words:
# padded_strng = "%*s" % (input_sequence_length, word)
# trainX.append(word_to_symbol_index(padded_strng))
# output data
#for p in phones:
# padded_strng = "%*s" % (output_sequence_length, p)
# print phone_to_symbol_index(padded_strng)
In [16]:
pad_string('EY2 EY1', output_sequence_length)
Out[16]:
In [17]:
for word in words:
padded_strng = pad_string(word, input_sequence_length)
trainX.append(word_to_symbol_index(padded_strng))
In [18]:
# output labels
# TODO: Fix padding logic
labels =[]
for p in phones:
label = []
for _ in range(output_sequence_length - len(p.split(' '))):
label.append(phone_to_symbol_index('+')[0])
label.extend(phone_to_symbol_index(p))
labels.append(label)
In [15]:
len(labels), len(trainX)
Out[15]:
In [16]:
trainX[0], labels[0]
print "INP: ", symbol_index_to_word(trainX[2])
print "LBL: ", symbol_index_to_word(labels[2])
In [19]:
import mxnet as mx
import numpy as np
def shuffle_together(a, b):
assert len(a) == len(b)
p = np.random.permutation(len(a))
return a[p], b[p]
batch_size = 128
trainX, labels = np.array(trainX), np.array(labels)
trainX, labels = shuffle_together(trainX, labels)
N = int(len(trainX) * 0.9) # 90%
dataX = np.array(trainX)[:N]
dataY = np.array(labels)[:N]
testX = np.array(trainX)[N:]
testY = np.array(labels)[N:]
print dataX.shape, dataY.shape
print testX.shape, testY.shape
## Lets define the Iterator
train_iter = mx.io.NDArrayIter(data=dataX, label=dataY,
data_name="data", label_name="target",
batch_size=batch_size,
shuffle=True)
test_iter = mx.io.NDArrayIter(data=testX, label=testY,
data_name="data", label_name="target",
batch_size=batch_size,
shuffle=True)
print train_iter.provide_data, train_iter.provide_label
In [29]:
data_dim = len(symbol_set)
data = mx.sym.var('data') # Shape: (N, T)
target = mx.sym.var('target') # Shape: (N, T)
# 2 Layer LSTM
# get_next_state = return the states that can be used as starting states next time
lstm1 = mx.rnn.FusedRNNCell(num_hidden=128, prefix="lstm1_", get_next_state=True)
lstm2 = mx.rnn.FusedRNNCell(num_hidden=128, prefix="lstm2_", get_next_state=False)
# In the layout, 'N' represents batch size, 'T' represents sequence length,
# and 'C' represents the number of dimensions in hidden states.
# one hot encode
data_one_hot = mx.sym.one_hot(data, depth=data_dim) # Shape: (N, T, C)
data_one_hot = mx.sym.transpose(data_one_hot, axes=(1, 0, 2)) # Shape: (T, N, C)
# Note that when unrolling, if 'merge_outputs'== True, the 'outputs' is merged into a single symbol
# encoder (with repeat vector)
_, encode_state = lstm1.unroll(length=input_sequence_length, inputs=data_one_hot, layout="TNC")
encode_state_h = mx.sym.broadcast_to(encode_state[0], shape=(output_sequence_length, 0, 0)) #Shape: (T,N,C); use ouput seq shape
# decoder
decode_out, _ = lstm2.unroll(length=output_sequence_length, inputs=encode_state_h, layout="TNC")
decode_out = mx.sym.reshape(decode_out, shape=(-1, batch_size))
# logits out
logits = mx.sym.FullyConnected(decode_out, num_hidden=data_dim, name="logits")
logits = mx.sym.reshape(logits, shape=(output_sequence_length, -1, data_dim))
logits = mx.sym.transpose(logits, axes=(1, 0, 2))
# Lets define a loss function: Convert Logits to softmax probabilities
loss = mx.sym.mean(-mx.sym.pick(mx.sym.log_softmax(logits), target, axis=-1))
loss = mx.sym.make_loss(loss)
# visualize
#shape = {"data" : (batch_size, dataX[0].shape[0])}
#mx.viz.plot_network(loss, shape=shape)
In [30]:
net = mx.mod.Module(symbol=loss,
data_names=['data'],
label_names=['target'],
context=mx.gpu())
net.bind(data_shapes=train_iter.provide_data,
label_shapes=train_iter.provide_label)
net.init_params(initializer=mx.init.Xavier())
net.init_optimizer(optimizer="adam",
optimizer_params={'learning_rate': 1E-3,
'rescale_grad': 1.0},
kvstore=None)
In [31]:
# lets keep a test network to see how we do
predict_net = mx.mod.Module(symbol=logits,
data_names=['data'],
label_names=None,
context=mx.gpu())
data_desc = train_iter.provide_data[0]
# shared_module = True: sharesthe same parameters and memory of the training network
predict_net.bind(data_shapes=[data_desc],
label_shapes=None,
for_training=False,
grad_req='null',
shared_module=net)
def predict(data_iter):
data_iter.reset()
corr = 0
for i, data_batch in enumerate(data_iter):
#print data_batch.label[0]
predict_net.forward(data_batch=data_batch)
predictions = predict_net.get_outputs()[0].asnumpy()
indices = np.argmax(predictions, axis=2)
lbls = data_batch.label[0].asnumpy()
results = (indices == lbls)
for r in results:
# Exact match
if np.sum(r) == output_sequence_length:
corr += 1.0
# total % match per sample
#corr += (1.0 *np.sum(r)/ output_sequence_length)
return corr/data_iter.num_data
In [37]:
epochs = 125
total_batches = len(dataX) // batch_size
for epoch in range(epochs):
avg_loss = 0
train_iter.reset()
for i, data_batch in enumerate(train_iter):
net.forward_backward(data_batch=data_batch)
loss = net.get_outputs()[0].asscalar()
avg_loss += loss /total_batches
net.update()
# every 10 epochs
test_acc = predict(test_iter)
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.9f}'.format(avg_loss))
print('Epoch:', '%04d' % (epoch + 1), 'test acc =', '{:.9f}'.format(test_acc))
In [38]:
# Save the model
prefix = 'pronounce128';
net.save_checkpoint(prefix, epochs)
#pred_model = mx.mod.Module.load(prefix, num_epoch)
In [51]:
# Test module
test_net = mx.mod.Module(symbol=logits,
data_names=['data'],
label_names=None,
context=mx.gpu())
data_desc = train_iter.provide_data[0]
# shared_module = True: sharesthe same parameters and memory of the training network
test_net.bind(data_shapes=[data_desc],
label_shapes=None,
for_training=False,
grad_req='null',
shared_module=net)
def print_word(arr):
word_indices = symbol_index_to_word(arr)
out = filter(lambda x: x != symbol_set[-1], word_indices)
return "".join(out)
def print_phone(arr):
word_indices = psymbol_index_to_word(arr)
out = filter(lambda x: x != symbol_set[-1], word_indices)
return " ".join(out)
testX, testY = trainX[0:10], labels[0:10]
#print testX
testX = [word_to_symbol_index(pad_string("SUNIL", input_sequence_length))]
testX += [word_to_symbol_index(pad_string("JOSEPH", input_sequence_length))]
testX += [word_to_symbol_index(pad_string("RANDALL", input_sequence_length))]
testX += [word_to_symbol_index(pad_string("SAUSALITO", input_sequence_length))]
testX += [word_to_symbol_index(pad_string("EMBARCADERO", input_sequence_length))]
testX += [word_to_symbol_index(pad_string("KAJA", input_sequence_length))]
testX += [word_to_symbol_index(pad_string("AMULYA", input_sequence_length))]
testX += [word_to_symbol_index(pad_string("TWITCH", input_sequence_length))]
testX += [word_to_symbol_index(pad_string("ALUMINUM", input_sequence_length))]
testX = np.array(testX, dtype=np.int)
test_net.reshape(data_shapes=[mx.io.DataDesc('data', (1, input_sequence_length))])
predictions = test_net.predict(mx.io.NDArrayIter(testX, batch_size=1)).asnumpy()
print "expression", "predicted", "actual"
for i, prediction in enumerate(predictions):
#x_str = symbol_index_to_word(testX[i])
word = print_word(testX[i])
index = np.argmax(prediction, axis=1)
result = print_phone(index)
#result = [symbol_set[j] for j in index]
print "%10s" % word, result
#label = [alphabet[j] for j in testY[i]]
#print "".join(x_str), "".join(result), " ", "".join(label)
In [ ]:
# TODO: Convert Arapabet to IPA and call Polly API