The most current verision of this file is at :: https://github.com/mdda/cnn-speech-mnist
In [ ]:
In [ ]:
import os
import numpy as np
import matplotlib.pyplot as plt
import scipy.misc # for image resizing
#import scipy.io.wavfile
# pip install soundfile
import soundfile
from IPython.display import Audio as audio_playback_widget
In [ ]:
f = './data/raw-from-phone.wav'
#f = './data/num_phone_en-UK_m_Martin15.wav'
In [ ]:
# Read in the original file
samples, sample_rate = soundfile.read(f)
def show_waveform(sound):
n_samples = sound.shape[0]
plt.figure(figsize=(12,2))
plt.plot(np.arange(0.0, n_samples)/sample_rate, sound)
plt.xticks( np.arange(0.0, n_samples/sample_rate, 0.5), rotation=90 )
plt.grid(True)
plt.show()
show_waveform(samples)
audio_playback_widget(f)
Now, let's select the region of interest
In [ ]:
crop = (3.25, 16.25) # in seconds (from waveform graph above)
cropped = samples[ int(crop[0]*sample_rate):int(crop[1]*sample_rate) ]
show_waveform(cropped)
When satisfied, write the file to disk - and update the name as appropriate (it's also possible to over-write the existing file).
Be careful with this step.
In [ ]:
#Only do this (set it to 1) if you want to replace the file with the cropped version...
if 1:
f = './data/cropped-raw-from-phone.wav'
soundfile.write(f, cropped, samplerate=sample_rate)
print("Wrote '%s'" % (f,))
In [ ]:
f = './data/num_phone_en-UK_m_Martin00.wav'
#f = './data/num_Bing_en-UK_f_Susan.wav'
#f = './data/animals_phone_en-UK_m_Martin02.wav'
#f = './data/num_phone_en-UK_m_Martin00.ogg'
#f = './data/num_Bing_en-UK_f_Susan.ogg'
The following defines a function that does the spectrogram (FFT, etc), and then we define a smoothing function that will help us segment the audio into words later.
In [ ]:
def spectrogram(wav_filepath):
samples, sample_rate = soundfile.read(wav_filepath)
# Original code from :
# https://mail.python.org/pipermail/chicago/2010-December/007314.html
# Rescale so that max/min are ~ +/- 1 around 0
data_av = np.mean(samples)
data_max = np.max(np.absolute(samples-data_av))
sound_data = (samples - data_av)/data_max
## Parameters: 10ms step, 30ms window
nstep = int(sample_rate * 0.01)
nwin = int(sample_rate * 0.03)
nfft = 2*int(nwin/2)
window = np.hamming(nwin)
# will take windows x[n1:n2]. generate and loop over
# n2 such that all frames fit within the waveform
nn = range(nwin, len(sound_data), nstep)
X = np.zeros( (len(nn), nfft//2) )
for i,n in enumerate(nn):
segment = sound_data[ n-nwin:n ]
z = np.fft.fft(window * segment, nfft)
X[i,:] = np.log(np.absolute(z[:nfft//2]))
return X
In [ ]:
# This is a function that smooths a time-series
# which enables us to segment the input into words by looking at the 'energy' profile
def smooth(x, window_len=31): # , window='hanning'
# http://scipy-cookbook.readthedocs.io/items/SignalSmooth.html
#s = np.r_[ x[window_len-1:0:-1], x, x[-1:-window_len:-1]]
s = np.r_[ np.zeros( ((window_len-1)//2,) ), x, np.zeros( ((window_len-1)//2,) ) ]
w=np.hamming(window_len)
return np.convolve(w/w.sum(), s, mode='valid') #[window_len-1 : -(window_len-1) ]
In [ ]:
X = spectrogram(f)
print("X.shape=", X.shape)
#Y = np.std(X, axis=1)
Y = np.max(X, axis=1)
Y_min = np.min(Y)
Y_range = Y.max()-Y_min
Y = (Y - Y_min)/Y_range
print("Y.shape=", Y.shape)
Y_crop = np.where(Y>0.25, 1.0, 0.0)
# Apply some smoothing
Y_crop = smooth(Y_crop)
Y_crop = np.where(Y_crop>0.01, 1.0, 0.0)
print("Y_crop.shape=", Y_crop.shape)
plt.figure(figsize=(12,3))
plt.imshow(X.T, interpolation='nearest', origin='lower', aspect='auto')
plt.xlim(xmin=0)
plt.ylim(ymin=0)
plt.plot(Y * X.shape[1])
plt.plot(Y_crop * X.shape[1])
plt.show()
#Y.min(), Y.max()
#X[100,:]
print( np.argmin(X)/248, np.argmax(X)/248 )
audio_playback_widget(f)
Work out the contiguous region of high enery (== sound) so that we can split the file into voiced segments.
In [ ]:
#http://stackoverflow.com/questions/4494404/find-large-number-of-consecutive-values-fulfilling-condition-in-a-numpy-array
def contiguous_regions(condition):
idx = []
i = 0
while i < len(condition):
x1 = i + condition[i:].argmax()
try:
x2 = x1 + condition[x1:].argmin()
except:
x2 = x1 + 1
if x1 == x2:
if condition[x1] == True:
x2 = len(condition)
else:
break
idx.append( [x1,x2] )
i = x2
return idx
contiguous_regions(Y_crop>0.5)
In [ ]:
import re
remove_punc = re.compile('[\,\.\?\!]')
squash_spaces = re.compile('\s+')
def words(s):
s = remove_punc.sub(' ', s)
s = squash_spaces.sub(' ', s)
return s.strip().lower()
sentences=dict(
num=words("zero one two three four five six seven eight nine."),
animals=words("cat dog fox bird."),
# https://www.quora.com/Is-there-a-text-that-covers-the-entire-English-phonetic-range/
qbf=words("That quick beige fox jumped in the air over each thin dog. "+
"Look out, I shout, for he's foiled you again, creating chaos."),
shy=words("Are those shy Eurasian footwear, cowboy chaps, "+
"or jolly earthmoving headgear?"),
ate=words("The hungry purple dinosaur ate the kind, zingy fox, the jabbering crab, "+
"and the mad whale and started vending and quacking."),
suz=words("With tenure, Suzie'd have all the more leisure for yachting, "+
"but her publications are no good."),
tbh=words("Shaw, those twelve beige hooks are joined if I patch a young, gooey mouth."),
# https://en.wikipedia.org/wiki/The_North_Wind_and_the_Sun #594
# http://videoweb.nie.edu.sg/phonetic/courses/aae103-web/wolf.html #1111
)
sentences['num']
We can also generate voices synthetically - and Bing has a nice interface for that at https://www.microsoft.com/cognitive-services/en-us/speech-api
In [ ]:
def for_msft(prefixes): # comma separated
return ' '.join([sentences[a] for a in prefixes.split(',')]).replace(' ', '\n')
"""
This is the SSML that will be sent to the service:
<speak version="1.0" xmlns="http://www.w3.org/2001/10/synthesis"
xmlns:mstts="http://www.w3.org/2001/mstts" xml:lang="en-GB">
<voice xml:lang="en-GB" name="Microsoft Server Speech Text to Speech Voice (en-GB, Susan, Apollo)">
zero
one
two
three
four
five
six
seven
eight
nine
</voice>
</speak>
"""
# https://www.microsoft.com/cognitive-services/en-us/Speech-api/documentation/API-Reference-REST/BingVoiceOutput
a=for_msft('num') # 49 long...
#a=for_msft('qbf,shy,ate,suz,tbh') # 474 long...
print("length_in_chars=%d\n%s" % (len(a),a,))
If you want to do some manipulations on raw audio in Linux, sox
is the perfect tool.
In [ ]:
# sox_ogg_param='--rate 16000 --channels 1'
# sox_wav_param="${sox_ogg_param} --encoding signed-integer"
# sox english.au ${sox_wav_param} english.wav norm -3
# sox english.au ${sox_ogg_param} english.ogg norm -3
In [ ]:
# pip install python_speech_features
import python_speech_features
sample_window_step = 0.01 # in seconds (10ms)
def get_sample_features(samples, sample_rate):
#sample_feat = python_speech_features.mfcc(samples, sample_rate, numcep=13, nfilt=26, appendEnergy=True)
#sample_feat = python_speech_features.mfcc(samples, sample_rate, numcep=28, nfilt=56, appendEnergy=True)
#sample_feat, e = python_speech_features.fbank(samples,samplerate=sample_rate,
# winlen=0.025,winstep=0.01,nfilt=26,nfft=512,
# lowfreq=0,highfreq=None,preemph=0.97, winfunc=lambda x:np.ones((x,)))
features, energy = python_speech_features.fbank(samples, samplerate=sample_rate,
winlen=0.025, winstep=sample_window_step,
nfilt=32,nfft=512,
lowfreq=0,highfreq=None,preemph=0.25,
winfunc=lambda x:np.hamming( x ))
return features, energy
def get_sample_isolated_words(energy, plot=False):
log_e = np.log(energy)
if plot: plt.plot(log_e-5)
#log_e = smooth(log_e)
#if plot: plt.plot(log_e)
log_e_hurdle = (log_e.max() - log_e.min())*0.25 + log_e.min()
log_e_crop = np.where(log_e>log_e_hurdle, 1.0, 0.0)
if plot: plt.plot(log_e_crop * 25 - 2.5)
# By smoothing, and applying a very low hurdle, we expand the crop area safely
log_e_crop_expanded = np.where( smooth(log_e_crop, )>0.01, 1.0, 0.0)
if plot: plt.plot(log_e_crop_expanded * 30 -5)
return contiguous_regions(log_e_crop_expanded>0.5)
Redo the calculation above, but using the 'proper' tools. Notice how the scaling, contrast, etc, are better 'looking'.
Actually, the 'look' is something that we actually care about here.
In [ ]:
samples, sample_rate = soundfile.read(f)
sample_feat, energy = get_sample_features(samples, sample_rate)
plt.figure(figsize=(12,3))
plt.imshow(np.log(sample_feat.T), interpolation='nearest', origin='lower', aspect='auto')
plt.xlim(xmin=0)
word_ranges = get_sample_isolated_words(energy, plot=True)
plt.show()
print(sample_feat.shape, energy.shape, energy[10])
audio_playback_widget(f)
In [ ]:
def split_combined_file_into_wavs(f, prefix='num'):
# f ~ './data/num_Bing_en-UK_f_Susan.wav'
f_base_orig = os.path.basename( f )
if not f_base_orig.startswith(prefix+"_"):
print("Wrong prefix for '%s'" % (f_base_orig,))
return
# Here's the new filename (directory to be calculated per-word)
f_base = os.path.splitext(f_base_orig)[0][len(prefix)+1:] + '.wav'
samples, sample_rate = soundfile.read(f)
sample_feat, energy = get_sample_features(samples, sample_rate)
word_ranges = get_sample_isolated_words(energy, plot=False)
#print(word_ranges)
words = sentences[prefix].split(' ')
if len(word_ranges) != len(words):
print("Found %d segments, rather than %d, in '%s'" % (len(word_ranges), len(words), f,))
return
for i, word in enumerate(words):
word_path = os.path.join('data', prefix, word)
os.makedirs(word_path, exist_ok=True)
wr = word_ranges[i]
fac = int(sample_window_step*sample_rate)
soundfile.write(os.path.join(word_path, f_base), samples[ wr[0]*fac:wr[1]*fac ], samplerate=sample_rate)
In [ ]:
split_combined_file_into_wavs('./data/num_Bing_en-UK_f_Susan.wav')
#split_combined_file_into_wavs('./data/num_phone_en-UK_m_Martin00.wav')
In [ ]:
def split_all_combined_files_into_wavs(prefix='num'):
for audio_file in sorted(os.listdir( './data' )):
filename_stub, ext = os.path.splitext(audio_file)
if not (ext=='.wav' or ext=='.ogg'): continue
if not filename_stub.startswith( prefix+'_'): continue
print("Splitting %s" % (audio_file,))
split_combined_file_into_wavs( './data/'+audio_file, prefix=prefix)
In [ ]:
split_all_combined_files_into_wavs(prefix='num')
Now we have some nice WAV files placed into folders which are named according to the word inside, let's create a function that preprocesses the audio clips into 'stamp' files that are essentially spectrograms with a fixed size (and uint8
data type - which makes the numpy
array small).
In [ ]:
# Convert a given (isolated word) WAV into a 'stamp' - using a helper function
def samples_to_stamp(samples, sample_rate):
sample_feat, energy = get_sample_features(samples, sample_rate)
data = np.log(sample_feat)
# Now normalize each vertical slice so that the minimum energy is ==0
data_mins = np.min(data, axis=1)
data_min0 = data - data_mins[:, np.newaxis]
# Force the data into the 'stamp size' as an image (implicit range normalization occurs)
stamp = scipy.misc.imresize(data_min0, (64, 32), 'bilinear')
# https://github.com/scipy/scipy/issues/4458 :: The stamps are stored as uint8...
return stamp
def wav_to_stamp(prefix, word, wav):
samples, sample_rate = soundfile.read( os.path.join('data', prefix, word, wav) )
return samples_to_stamp(samples, sample_rate)
In [ ]:
# Show what the 'visual stamp' for a given word looks like
stamp = wav_to_stamp('num', 'six', 'phone_en-UK_m_Martin00.wav')
plt.imshow(stamp.T, interpolation='nearest', origin='lower', aspect='auto')
plt.show()
print( np.min(stamp), np.max(stamp) )
audio_playback_widget( os.path.join('data', 'num', 'six', 'phone_en-UK_m_Martin00.wav') )
In [ ]:
# combine all words from a given prefix into a dataset of 'stamps'
import pickle
def create_dataset_from_folders(prefix, save_as='.pkl', seed=13):
words = sentences[prefix].split(' ')
stamps, labels = [], []
for label_i, word in enumerate( words ):
# Find all the files for this word
for stamp_file in os.listdir( os.path.join('data', prefix, word )):
if not f.endswith('.wav'): continue
#print(stamp_file)
stamp = wav_to_stamp(prefix, word, stamp_file)
stamps.append(stamp)
labels.append(label_i)
if save_as is None: # Return the data directly
return stamps, labels, words
np.random.seed(seed)
data_dictionary = dict(
stamp=stamps, label=labels,
rand=np.random.rand( len(labels) ), # This is to enable us to sample the data (based on hurdles)
words=words,
)
ds_file = os.path.join('data', prefix+save_as)
pickle.dump(data_dictionary, open(ds_file, 'wb'), protocol=pickle.HIGHEST_PROTOCOL)
print("Created dataset : %s" % (ds_file, ))
In [ ]:
#if not os.path.exists('data/num.pkl'):
if True:
create_dataset_from_folders('num')
In [ ]:
# Read in the dataset
dataset = pickle.load(open(os.path.join('data', 'num.pkl'), 'rb'))
In [ ]:
# Plot all of a given 'word'
indices = [ i for i,label in enumerate(dataset['label'])
if dataset['words'][label]=='four']
plt.figure(figsize=(12, 2))
for pos, i in enumerate(indices[0:16]): # at most 16
plt.subplot(2, 8, pos+1) # nrows, ncols, subplot#
plt.imshow(dataset['stamp'][i].T, cmap='gray', origin='lower', interpolation='nearest')
plt.axis('off')
plt.show()
In [ ]:
# Now do something similar for 'test files', create a dataset for all the audio files in the given folder
def create_dataset_from_adhoc_wavs(prefix, save_as='.pkl', seed=13):
stamps, labels, words = [], [], []
for audio_file in sorted(os.listdir( os.path.join('data', prefix) )):
filename_stub, ext = os.path.splitext(audio_file)
if not (ext=='.wav' or ext=='.ogg'): continue
samples, sample_rate = soundfile.read( os.path.join('data', prefix, audio_file) )
sample_feat, energy = get_sample_features(samples, sample_rate)
word_ranges = get_sample_isolated_words(energy, plot=False)
for i, wr in enumerate(word_ranges):
wr = word_ranges[i]
fac = int(sample_window_step*sample_rate)
segment = samples[ wr[0]*fac:wr[1]*fac ]
stamp = samples_to_stamp(segment, sample_rate)
print("Adding : %s #%2d : (%d,%d)" % (filename_stub, i, wr[0], wr[1],))
stamps.append(stamp)
labels.append(-1)
words.append("%s_%d" % (filename_stub, i))
np.random.seed(seed)
data_dictionary = dict(
stamp=stamps, label=labels,
rand=np.random.rand( len(labels) ),
words=words,
)
ds_file = os.path.join('data', prefix+save_as)
pickle.dump(data_dictionary, open(ds_file, 'wb'), protocol=pickle.HIGHEST_PROTOCOL)
print("Created dataset : %s" % (ds_file, ))
In [ ]:
test_prefix = 'num' +'-test'
In [ ]:
create_dataset_from_adhoc_wavs(test_prefix)
In [ ]:
# Read in the ad-hoc test dataset
dataset = pickle.load(open(os.path.join('data', 'num-test.pkl'), 'rb'))
In [ ]:
plt.figure(figsize=(12,2))
for pos in range(len(dataset['stamp'][0:16])): # at most 16
plt.subplot(2, 8, pos+1) # nrows, ncols, subplot#
plt.imshow(dataset['stamp'][pos].T, cmap='gray', origin='lower', interpolation='nearest')
plt.axis('off')
plt.show()
In [ ]:
# First a training set
split_all_combined_files_into_wavs(prefix='animals')
create_dataset_from_folders('animals')
In [ ]:
# And then some ad-hoc test cases
test_prefix = 'animals' +'-test'
create_dataset_from_adhoc_wavs(test_prefix)
In [ ]:
audio_playback_widget( os.path.join('data', test_prefix, 'cat_dog_fox_bird.wav') )
Now see whether we can learn the 'animal' words using the 'numbers' network...
In [ ]:
In [ ]: