Fast minibatch sampling

In this example, we show how to create a fast minibatch generator, which is typically used in Machine Learning to feed a training routine. It is not the intent of SeqTools to supplant specialized libraries such as tensorflow's data module or torch.utils.Dataset, but these might lack simplicity and flexibility for certain usages. Besides, it is absolutly possible to use seqtools at an early stage to connect with these modules.

Note: As a general guideline, a special care should be taken when using worker based functions along with these libraries. User are advised to become familiar with the behaviour of Python threads and processes before using them.

Data samples

For this example we consider a set of (X, y) data samples composed of a real vector observation and an integer label. It is common practice to store the data samples by large groups into a few binary dump files. The following script generates some random samples to simulate our dataset.


In [ ]:
import os
import tempfile
import numpy as np

workdir = tempfile.TemporaryDirectory()
os.chdir(workdir.name)

n_samples = 18000
n_classes = 10
sample_shape = (248,)
chunk_size = 5000

# generate reference class centers
means = np.random.randn(n_classes, *sample_shape) * 3

# generate random class labels
labels = np.random.randint(n_classes, size=n_samples)
np.save('labels.npy', labels)

# generate noisy samples
n_chunks = n_samples // chunk_size + (1 if n_samples % chunk_size > 0 else 0)
for i in range(n_chunks):
    n = min((i + 1) * chunk_size, n_samples) - i * chunk_size
    chunk_file = "data_{:02d}.npy".format(i)
    data = means[labels[i * chunk_size:i * chunk_size + n]] \
        + np.random.randn(n, *sample_shape) * 0.1
    np.save(chunk_file, data)

Data loading

Now begins the actual data loading. Assuming the dataset is too big to fit in memory, data is read directly from the files and not from memory using memory mapping.


In [ ]:
import os
import seqtools

labels = np.load("labels.npy")

data_files = sorted(f for f in os.listdir() if f.startswith('data_'))
data_chunks = [np.load(f, mmap_mode='r') for f in data_files]
data = seqtools.concatenate(data_chunks)

assert len(data) == n_samples

Concatenate is easy to memorize and does the job, but for that particular case we could also use data = seqtools.concatenate(data_chunks) since all of our data chunks (except for the last one) have the same size.

Let's now assemble the samples with their labels to facilitate manipulation and split the dataset between training and testing samples


In [ ]:
dataset = seqtools.collate([data, labels])
train_dataset = dataset[:-10000]
test_dataset = dataset[-10000:]

We now write a simple random minibatch sampler and pass it to seqtools.load_buffers to start generating samples with multiple background workers:


In [ ]:
def collate(samples):
    """Assembles samples into a minibatch."""
    batch_data = np.stack([sample[0] for sample in samples])
    batch_labels = np.stack([sample[1] for sample in samples])
    return batch_data, batch_labels


class MinibatchSampler:
    def __init__(self, dataset, batch_size):
        self.dataset = dataset
        self.batch_size = batch_size
    
    def __call__(self):
        subset = np.random.choice(len(self.dataset), self.batch_size)
        samples = list(seqtools.gather(self.dataset, subset))
        return collate(samples)


sampler = MinibatchSampler(train_dataset, 64)
minibatch_iter = seqtools.load_buffers(sampler, max_cached=10, nworkers=2)

minibatch_iter simply yields minibatches indefinitely by repeatedly calling sampler and put the results into buffers which are returned at each iteration. Please, note that the buffer slots are cyclicly so their content should not be used across iterations.

The sampler is a bit exagerated here and a simple function would suffice in this case:

def sample_minibatch():
    subset = np.random.choice(len(train_dataset), 64)
    samples = list(seqtools.gather(train_dataset, subset))
    return collate(samples)

Training

With the minibatches ready to be used, we create a Gaussian Naive Bayes model and start training:


In [ ]:
import time
from sklearn.naive_bayes import GaussianNB

model = GaussianNB()
classes = np.arange(n_classes)

t1 = time.time()
for _ in range(4000):
    X, y = next(minibatch_iter)
    model.partial_fit(X, y, classes=classes)
    
t2 = time.time()
print("training took {:.0f}s".format(t2 - t1))

Without multiprocessing to prefetech the minibatches, the training procedure must wait for its input data. In this case, the impact is fairly severe since training itself is fast.


In [ ]:
model = GaussianNB()
classes = np.arange(n_classes)

t1 = time.time()
for _ in range(4000):
    X, y = sampler()
    model.partial_fit(X, y, classes=classes)

t2 = time.time()
print("training took {:.0f}s".format(t2 - t1))

Testing

For completeness, we evaluate the accuracy of the results on the testing data. Assuming the testing dataset is also too big, the evaluation proceeeds by small chunks:


In [ ]:
testing_chunks = seqtools.batch(test_dataset, 64, collate_fn=collate)

predictions = []
targets = []

t1 = time.time()
for X, y in testing_chunks:
    predictions.extend(model.predict(X))
    targets.extend(y)

accuracy = np.mean(np.array(predictions) == np.array(targets))
print("Accuracy: {:.0f}%".format(accuracy * 100))