In this example, we show how to create a fast minibatch generator, which is typically used in Machine Learning to feed a training routine. It is not the intent of SeqTools to supplant specialized libraries such as tensorflow's data module or torch.utils.Dataset, but these might lack simplicity and flexibility for certain usages. Besides, it is absolutly possible to use seqtools at an early stage to connect with these modules.
Note: As a general guideline, a special care should be taken when using worker based functions along with these libraries. User are advised to become familiar with the behaviour of Python threads and processes before using them.
For this example we consider a set of (X, y) data samples composed of a real vector observation and an integer label. It is common practice to store the data samples by large groups into a few binary dump files. The following script generates some random samples to simulate our dataset.
In [ ]:
import os
import tempfile
import numpy as np
workdir = tempfile.TemporaryDirectory()
os.chdir(workdir.name)
n_samples = 18000
n_classes = 10
sample_shape = (248,)
chunk_size = 5000
# generate reference class centers
means = np.random.randn(n_classes, *sample_shape) * 3
# generate random class labels
labels = np.random.randint(n_classes, size=n_samples)
np.save('labels.npy', labels)
# generate noisy samples
n_chunks = n_samples // chunk_size + (1 if n_samples % chunk_size > 0 else 0)
for i in range(n_chunks):
n = min((i + 1) * chunk_size, n_samples) - i * chunk_size
chunk_file = "data_{:02d}.npy".format(i)
data = means[labels[i * chunk_size:i * chunk_size + n]] \
+ np.random.randn(n, *sample_shape) * 0.1
np.save(chunk_file, data)
In [ ]:
import os
import seqtools
labels = np.load("labels.npy")
data_files = sorted(f for f in os.listdir() if f.startswith('data_'))
data_chunks = [np.load(f, mmap_mode='r') for f in data_files]
data = seqtools.concatenate(data_chunks)
assert len(data) == n_samples
Concatenate is easy to memorize and does the job, but for that particular case we could also use data = seqtools.concatenate(data_chunks)
since all of our data chunks (except for the last one) have the same size.
Let's now assemble the samples with their labels to facilitate manipulation and split the dataset between training and testing samples
In [ ]:
dataset = seqtools.collate([data, labels])
train_dataset = dataset[:-10000]
test_dataset = dataset[-10000:]
We now write a simple random minibatch sampler and pass it to seqtools.load_buffers
to start generating samples with multiple background workers:
In [ ]:
def collate(samples):
"""Assembles samples into a minibatch."""
batch_data = np.stack([sample[0] for sample in samples])
batch_labels = np.stack([sample[1] for sample in samples])
return batch_data, batch_labels
class MinibatchSampler:
def __init__(self, dataset, batch_size):
self.dataset = dataset
self.batch_size = batch_size
def __call__(self):
subset = np.random.choice(len(self.dataset), self.batch_size)
samples = list(seqtools.gather(self.dataset, subset))
return collate(samples)
sampler = MinibatchSampler(train_dataset, 64)
minibatch_iter = seqtools.load_buffers(sampler, max_cached=10, nworkers=2)
minibatch_iter
simply yields minibatches indefinitely by repeatedly calling sampler
and put the results into buffers which are returned at each iteration.
Please, note that the buffer slots are cyclicly so their content should not be used across iterations.
The sampler is a bit exagerated here and a simple function would suffice in this case:
def sample_minibatch():
subset = np.random.choice(len(train_dataset), 64)
samples = list(seqtools.gather(train_dataset, subset))
return collate(samples)
With the minibatches ready to be used, we create a Gaussian Naive Bayes model and start training:
In [ ]:
import time
from sklearn.naive_bayes import GaussianNB
model = GaussianNB()
classes = np.arange(n_classes)
t1 = time.time()
for _ in range(4000):
X, y = next(minibatch_iter)
model.partial_fit(X, y, classes=classes)
t2 = time.time()
print("training took {:.0f}s".format(t2 - t1))
Without multiprocessing to prefetech the minibatches, the training procedure must wait for its input data. In this case, the impact is fairly severe since training itself is fast.
In [ ]:
model = GaussianNB()
classes = np.arange(n_classes)
t1 = time.time()
for _ in range(4000):
X, y = sampler()
model.partial_fit(X, y, classes=classes)
t2 = time.time()
print("training took {:.0f}s".format(t2 - t1))
In [ ]:
testing_chunks = seqtools.batch(test_dataset, 64, collate_fn=collate)
predictions = []
targets = []
t1 = time.time()
for X, y in testing_chunks:
predictions.extend(model.predict(X))
targets.extend(y)
accuracy = np.mean(np.array(predictions) == np.array(targets))
print("Accuracy: {:.0f}%".format(accuracy * 100))