In [1]:
import tensorflow as tf
import numpy as np
import os
from IPython.display import Audio
import scipy.io.wavfile as wav
from python_speech_features import fbank, mfcc
from keras.layers import LSTM, Dense, Convolution1D
from keras.models import Sequential
from keras.layers.wrappers import TimeDistributed, Bidirectional
In [2]:
from scipy.io.wavfile import read
import os
import numpy as np
from math import *
from scipy.signal import lfilter, hamming
from scipy.fftpack import dct, idct
In [3]:
# оригинал на OCTAVE http://www.cs.cmu.edu/~robust/archive/algorithms/PNCC_ICASSP2010/PNCC_ICASSP2010Package.tar.gz
# Obtaning the gammatone coefficient.
# Based on M. Snelly's auditory toolbox.
# In actual C-implementation, we just use a table
aad_H=np.genfromtxt('add_h.csv',delimiter=',')
def PNCC_ICASSP2010(WavFileName16Gz):
bPreem=1
bMedPowerBiasSub=1
bPowerLaw=1
bDisplay=0
iInitBufferSize=10
dDelta=0.01
iM=2
iN=4
dPowerCoeff=1.0 / 15
dFrameLen=0.0256
dSampRate=16000
dFramePeriod=0.01
iPowerFactor=1
iFL=floor(dFrameLen*dSampRate)
iFP=floor(dFramePeriod*dSampRate)
iFFTSize=1024
iNumFilts=40
eps=2.2204e-16
d,x=read(WavFileName16Gz)
ad_x=x[490:]
iNumFrames=floor((len(ad_x) - iFL) / iFP) + 1
iSpeechLen=len(ad_x)
# Pre-emphasis using H(z) = 1 - 0.97 z ^ -1
if (bPreem == 1):
ad_x=lfilter(np.array([1,-0.97]), 1, ad_x, axis=0)
i_FI=0
adSumPower=np.zeros((int(iNumFrames)))
aad_P=np.zeros((40,int(iSpeechLen - iFL)/int(iFP)+1))
# Obtaining the short-time Power P(i, j)
for m in range(0,int(iSpeechLen - iFL),int(iFP)):
doo=int(m + iFL)
doo1=int(m)
ad_x_st=ad_x[doo1 :doo]*hamming(int(iFL))
adSpec=np.fft.fft(ad_x_st.conj(),iFFTSize).reshape(-1, 1)
ad_X=np.abs(adSpec[0:iFFTSize / 2])
for j in range(int(iNumFilts)):
###########################################################################
# Squared integration
aad_P[j,i_FI ]=np.sum((ad_X.T*aad_H[:,j]) ** 2)
adSumPower[i_FI ]=np.sum(aad_P[:,i_FI ])
i_FI+= 1
# Peak Power Normalization Using 95 # percentile
adSorted=np.sort(adSumPower)
dMaxPower =adSorted[int(round(0.95 * len(adSumPower)))-1]
aad_P = aad_P /dMaxPower * 1e15
# Medium-duration power calculation
aad_Q=np.zeros((int(iNumFilts),int(iNumFrames)))
for j in range(int(iNumFrames)):
for i in range(int(iNumFilts)):
aad_Q[i,j]=np.mean(aad_P[i,int(max(0,j - iM)):int(min(iNumFrames,j + iM+1))])
aad_w=np.zeros((aad_Q.shape))
aad_w_Smooth=np.zeros((aad_Q.shape))
aad_tildeQ=np.zeros((aad_Q.shape))
for i in range(iNumFilts):
aad_tildeQ[i,:]=PowerBiasSub(aad_Q[i,:],dDelta)
aad_w[i,:]=np.maximum(aad_tildeQ[i,:],eps) / np.maximum(aad_Q[i,:],eps)
# Weight smoothing aross channels
for j in range(int(iNumFrames)):
for i in range(int(iNumFilts)):
aad_w_Smooth[i,j]=np.mean(aad_w[np.maximum(i - iN,0):np.minimum(i + iN+1,iNumFilts),j])
aad_P=aad_w_Smooth*aad_P
aa1p=aad_P[:, iM:]
[iNumFilts, iLen] = aa1p.shape
aad_P=aa1p[:, :iLen - iM -1]
# Apply the nonlinearity
aadSpec = aad_P ** dPowerCoeff
aadDCT = dct(aadSpec,norm = 'ortho', axis = 0)
aadDCT =aadDCT[:13, :]
for i in range(13):
aadDCT[i,:]=aadDCT[i,:] - np.mean(aadDCT[i,:])
return aadDCT
In [4]:
###################################################################
# Power Bias Subtraction Algorithm
# Bias level is obtained by maximizing the AM-GM ratio
def PowerBiasSub(ad_Q=None,dDelta=None,*args,**kwargs):
dNormPower=1e+15
ad_B=np.hstack((0,dNormPower/(10.0**((np.arange(70,9,- 1)) / 10.0)+1)))
d_tildeGTemp=0
ad_tildeQSave=np.copy(ad_Q)
for d_B in ad_B:
aiIndex=[i for i,x in enumerate(ad_Q) if x >d_B]
if (len(aiIndex) == 0):
break
dPosMean=np.mean(ad_Q[aiIndex] - d_B)
aiIndex=[i for i,x in enumerate(ad_Q ) if x >d_B + dDelta*dPosMean]
if (len(aiIndex) == 0):
break
d_cf=np.mean(ad_Q[aiIndex] - d_B)*dDelta
ad_tildeQ=np.maximum(ad_Q - d_B,d_cf)
adData=ad_tildeQ[aiIndex]
d_tildeG=np.log(np.mean(adData)) - np.mean(np.log(adData))
if (d_tildeG > d_tildeGTemp):
ad_tildeQSave=ad_tildeQ
d_tildeGTemp=d_tildeG
return ad_tildeQSave
In [5]:
vocabulary = { 'а': 1,
'б': 2,
'в': 3,
'г': 4,
'д': 5,
'е': 6,
'ё': 7,
'ж': 8,
'з': 9,
'и': 10,
'й': 11,
'к': 12,
'л': 13,
'м': 14,
'н': 15,
'о': 16,
'п': 17,
'р': 18,
'с': 19,
'т': 20,
'у': 21,
'ф': 22,
'х': 23,
'ц': 24,
'ч': 25,
'ш': 26,
'щ': 27,
'ъ': 28,
'ы': 29,
'ь': 30,
'э': 31,
'ю': 32,
'я': 33}
inv_mapping = dict(zip(vocabulary.values(), vocabulary.keys()))
inv_mapping[34]='<пробел>'
In [6]:
def decode_single(session, test_input):
z=np.zeros((30,13))
zz=np.vstack((test_input,z))
val_feed = {
input_X: np.asarray([zz]),
seq_lens: np.asarray([len(test_input)])
}
# Decoding
d = session.run(decoded[0], feed_dict=val_feed)
dense_decoded = tf.sparse_tensor_to_dense(d, default_value=-1).eval(session=session)
seq = [s for s in dense_decoded[0] if s != -1]
ret=decode(d, inv_mapping )
for i in range(len(ret)):
print(str(ret[i])),
print('')
In [7]:
def decode(d, mapping):
"""Decode."""
shape = d.dense_shape
batch_size = shape[0]
ans = np.zeros(shape=shape, dtype=int)
seq_lengths = np.zeros(shape=(batch_size, ), dtype=np.int)
for ind, val in zip(d.indices, d.values):
ans[ind[0], ind[1]] = val
seq_lengths[ind[0]] = max(seq_lengths[ind[0]], ind[1] + 1)
ret = []
for i in range(batch_size):
ret.append("".join(map(lambda s: mapping.get(s,' '), ans[i, :seq_lengths[i]])))
return ret
In [8]:
graph = tf.Graph()
with graph.as_default():
input_X = tf.placeholder(tf.float32, shape=[None, None, 13],name="input_X")
labels = tf.sparse_placeholder(tf.int32)
seq_lens = tf.placeholder(tf.int32, shape=[None],name="seq_lens")
model = Sequential()
model.add(Bidirectional(LSTM(128, return_sequences=True, implementation=2), input_shape=(None, 13)))
model.add(Bidirectional(LSTM(128, return_sequences=True, implementation=2)))
model.add(TimeDistributed(Dense(len(inv_mapping) + 2)))
final_seq_lens = seq_lens
logits = model(input_X)
logits = tf.transpose(logits, [1, 0, 2])
ctc_loss = tf.reduce_mean(tf.nn.ctc_loss(labels, logits, final_seq_lens))
# ctc_greedy_decoder? merge_repeated=True
decoded, log_prob = tf.nn.ctc_greedy_decoder(logits, final_seq_lens)
ler = tf.reduce_mean(tf.edit_distance(tf.cast(decoded[0], tf.int32), labels))
train_op = tf.train.AdamOptimizer(learning_rate=1e-3).minimize(ctc_loss)
Скачиваем тестовый wav фаил с мужским голосом
In [9]:
WAVE_OUTPUT_FILENAME = 'test.wav'
In [10]:
sample_rate, X1= wav.read(WAVE_OUTPUT_FILENAME)
# Через несколько лет путешествие на Марс будет не более сложно, чем перелёт, из Москвы в Берлин.
Audio(data=X1, rate=sample_rate)
Out[10]:
Выдиляем из файла фичи PNCC
In [11]:
features=PNCC_ICASSP2010(WAVE_OUTPUT_FILENAME).T
mean_scale = np.mean(features, axis=0)
std_scale = np.std(features, axis=0)
features = (features - mean_scale[np.newaxis, :]) / std_scale[np.newaxis, :]
seq_len = features.shape[0]
Распознаем речь на предворительно обученной модели
In [13]:
with tf.Session(graph=graph) as session:
saver = tf.train.Saver(tf.global_variables())
snapshot = "ctc"
checkpoint = tf.train.latest_checkpoint(checkpoint_dir="chekpoint3")
last_epoch = 0
if checkpoint:
print("[i] LOADING checkpoint " + checkpoint)
try:
saver.restore(session, checkpoint)
except:
print("[!] incompatible checkpoint, restarting from 0")
else:
# Initializate the weights and biases
tf.global_variables_initializer().run()
decode_single(session, features)
In [23]:
WAVE_OUTPUT_FILENAME = 'ru_test.wav'
# Покалывало грудь стучала кровь в виски но дышалось легко воздух был тонок и сух
sample_rate, X1= wav.read(WAVE_OUTPUT_FILENAME)
Audio(data=X1, rate=sample_rate)
Out[23]:
In [26]:
features=PNCC_ICASSP2010(WAVE_OUTPUT_FILENAME).T
mean_scale = np.mean(features, axis=0)
std_scale = np.std(features, axis=0)
features = (features - mean_scale[np.newaxis, :]) / std_scale[np.newaxis, :]
seq_len = features.shape[0]
with tf.Session(graph=graph) as session:
saver = tf.train.Saver(tf.global_variables())
snapshot = "ctc"
checkpoint = tf.train.latest_checkpoint(checkpoint_dir="chekpoint3")
last_epoch = 0
if checkpoint:
print("[i] LOADING checkpoint " + checkpoint)
try:
saver.restore(session, checkpoint)
except:
print("[!] incompatible checkpoint, restarting from 0")
else:
# Initializate the weights and biases
tf.global_variables_initializer().run()
decode_single(session, features)
In [27]:
import pyaudio
import wave
# and IPython.display for audio output
import IPython.display
from scipy.io import wavfile
In [28]:
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
RECORD_SECONDS = 5 #время записи
WAVE_OUTPUT_FILENAME = 'mikr.wav'
запись с микрофона в wav
In [32]:
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
print("* ЗАПИСЬ С МИКРОФОНА")
frames = []
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
frames.append(data)
print("* КОНЕЦ ЗАПИСИ")
stream.stop_stream()
stream.close()
p.terminate()
wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()
fs, audio = wav.read(WAVE_OUTPUT_FILENAME)
features=PNCC_ICASSP2010(WAVE_OUTPUT_FILENAME).T
mean_scale = np.mean(features, axis=0)
std_scale = np.std(features, axis=0)
features = (features - mean_scale[np.newaxis, :]) / std_scale[np.newaxis, :]
seq_len = features.shape[0]
In [33]:
sample_rate, X1= wavfile.read(WAVE_OUTPUT_FILENAME)
# Play it back!
IPython.display.Audio(data=X1, rate=sample_rate)
Out[33]:
In [35]:
with tf.Session(graph=graph) as session:
saver = tf.train.Saver(tf.global_variables())
snapshot = "ctc"
checkpoint = tf.train.latest_checkpoint(checkpoint_dir="chekpoint3")
last_epoch = 0
if checkpoint:
print("[i] LOADING checkpoint " + checkpoint)
try:
saver.restore(session, checkpoint)
except:
print("[!] incompatible checkpoint, restarting from 0")
else:
# Initializate the weights and biases
tf.global_variables_initializer().run()
decode_single(session, features)
In [ ]: