In [1]:
import matplotlib.pyplot as plt
from IPython.display import Audio, display
from librosa.display import specshow

%matplotlib inline

In [2]:
import numpy as np
import librosa
import os

class AudioLoader(object):
    def __init__(self):
        self.gram_shape = None
    def get_filelist(self, dirpath, recurse=False):
        return(librosa.util.find_files(dirpath, recurse=recurse))
    def create_spectrograms(self, files, ws=1024, hs=1024//2, nb=128,
                            lmatch="pad", norm=False, sgs=True, verbose=False):
        """
        Create spectrograms for a list of files and return a numpy array
        :param files: Path where audio is stored
        :param ws: window size for the short time fourier transform
        :param hs: hop size for the stft
        :param lmatch: length matching of the spectrograms pad or trim
        :param norm: normalize spectrograms # consider -1 to 1 normalization
        :param sgs: save the shape of the spectrograms or use saved shape
        :param verbose: testing stuff
        :return: numpy array of spectrograms in format (N, freq_bands, time)
        """
        grams = []
        srs = []
        shapes = []
        for file in files:
            sig, sr = librosa.core.load(file, sr=None)
            gram = librosa.feature.melspectrogram(sig, sr=sr, n_fft=ws,
                                                  n_mels=nb, hop_length=hs)
            gram = librosa.power_to_db(gram, ref=np.max)
            if not sgs:
                if gram.shape[1] > self.gram_shape[1]:
                    gram = trim_sample(gram, self.gram_shape, True)
                else:
                    gram = pad_sample(gram, self.gram_shape, True)

            gram -= gram.min()
            if norm:
                gram = librosa.util.normalize(gram)
                #gram -= gram.mean()
                #gram /= gram.std()
            grams.append(gram)
            srs.append(sr)
            shapes.append(gram.shape)
        if not sgs:
            return(np.array(grams), shapes)
        elif lmatch == "pad":
            ms = max(shapes) if sgs else self.gram_shape
            grams = np.array([pad_sample(g,ms) for g in grams])
        elif lmatch == "trim":
            ms = min(shapes) if sgs else self.gram_shape
            grams = np.array([trim_sample(g,ms) for g in grams])
        else:
            print("Method {} not recognized".format(lmatch))
            return(grams)
        if sgs:
            self.gram_shape = ms
        if verbose:
            print("shapes:", shapes)
            print("final grams shape:", grams.shape)
        return(grams, shapes)

class VADLabeler(object):
    """
    Basic VAD Labeler based on mean power in middle frequency bands
    """
    def __init__(self, threshold=30):
        self.threshold = threshold
        self.label_max_len = -1
    def label(self, grams):
        """
        :param grams: 3d input in the form (num samples, freq bands, time)
        :return: binary 2d numpy array in the form (num samples, time)
        """
        hpids = np.where(grams[:,10:100,:].mean(axis=1) > self.threshold)
        L = np.zeros((grams.shape[0], grams.shape[2]))
        L[hpids] = 1
        return(L)
    def tedlium_label(self, wav_manifest, ddims, csv_manifest=None, use_lml = False):
        if csv_manifest is None:
            csv_manifest = []
            for wav_path in wav_manifest:
                fn, ext = os.path.splitext(os.path.basename(wav_path))
                fn = "_".join(fn.split("_")[:-2]) + ".csv"
                csv_path = os.path.join(os.path.dirname(os.path.dirname(wav_path)),
                                        "csv", fn)
                csv_manifest += [csv_path]
        L = []
        for wav_path, csv_path, dd in zip(wav_manifest, csv_manifest, ddims):
            with open(csv_path, "r") as f:
                data = f.readline().strip().split(",")
            sr, ws, hs = int(data[1]), int(data[2]), int(data[3])
            labels = data[4:]
            wfp, wext = os.path.splitext(wav_path)
            st, end = wfp.split("_")[-2:]
            st_hop = int(st)//hs
            if end == "end":
                end_hop = None
            elif st_hop+dd < len(labels):
                end_hop = st_hop+dd
            else:
                end_hop = len(labels)
                st_hop = end_hop-dd
            L += [np.array(labels[st_hop:end_hop]).astype("int")]
            if len(labels[st_hop:end_hop]) > self.label_max_len:
                self.label_max_len = len(labels[st_hop:end_hop])
        L = self._pad_labels(L)
        return(np.array(L))
    def dummy_labels(self, grams):
        return(np.zeros((grams.shape[0], grams.shape[2])))
    def _pad_labels(self, labels):
        return([np.r_[l, np.zeros(self.label_max_len-l.shape[0])] if self.label_max_len>l.shape[0] else l for l in labels])

class NoiseInjector(object):
    def __init__(self, noise_multiplier=1.):
        self.nm = noise_multiplier
    def mix_grams(self, data, noise, rescale=False, nm=None):
        if nm is None:
            nm = self.nm
        # pad/trim as needed
        if data.shape != noise.shape:
            f = trim_sample if noise.shape > data.shape else pad_sample
            noise = f(noise, data.shape, True)
        # additive noise and rescaling
        g = data + (noise * self.nm)
        if rescale:
            g *= data.max() / g.max()
        return(g)
    def mix_sigs(self, sig, noise):
        """
        TODO
        """
        return(sig+(noise*self.nm))


def trim_sample(g, ms, rd=False):
    st = np.random.choice(g.shape[1]-ms[1]) if rd else 0
    return(g[:,st:(st+ms[1])])

def pad_sample(g, ms, rd=False):
    if ms[1] == g.shape[1]:
        return(g)
    st = np.random.choice(ms[1]-g.shape[1]) if rd else 0 # 1 to k-1
    g_new = np.c_[np.ones((g.shape[0],st))*g.min(),
                  g,
                  np.ones((g.shape[0],ms[1]-(st+g.shape[1])))*g.min()]
    return(g_new)

In [7]:
N = 10
trainset_path = "/home/david/Programming/python/snips/data/tedlium_r2_dataset/train"

# data loading
al = AudioLoader()
data_manifest = al.get_filelist(trainset_path, recurse=True)
data_manifest = np.random.choice(data_manifest, N).tolist()
data, data_dims = al.create_spectrograms(data_manifest, lmatch="pad")
noise_manifest = al.get_filelist("/home/david/Programming/python/snips/data/rirs_noises_dataset/pointsource_noises")
noise_manifest = np.random.choice(noise_manifest, len(data_manifest)).tolist()
noises, noises_dims = al.create_spectrograms(noise_manifest, sgs=False)
# sort audio by length in descending order
data_dims = np.array(data_dims)
idx = np.argsort(data_dims, axis=0)[:,1][::-1]
data = data[idx]
data_dims = data_dims[idx]
data_manifest = [data_manifest[i] for i in idx]

# label data
lb = VADLabeler()
labels = lb.tedlium_label(data_manifest, data_dims[:,1])

In [9]:
hs = 512
sr = 16000
fmax = sr/2
# plot
for file, gram, vad_labels in zip(data_manifest, data, labels):
    display(Audio(file))
    #fig, ax = plt.subplots(nrows=1, ncols=2)
    fig = plt.figure(figsize=(12,4))
    fig.add_subplot(1,2,1)
    specshow(gram, x_axis='time', sr=sr, hop_length=hs, y_axis='mel', fmax=fmax)
    plt.colorbar(format='%+02.0f dB')
    fig.add_subplot(1,2,2)
    vad_labels = np.tile(vad_labels, (gram.shape[0],1))
    plt.imshow(vad_labels)
    plt.tight_layout()
    plt.show()