How to create a simple pipeline to process a dataset by batches


In [1]:
import sys
import warnings
warnings.filterwarnings("ignore")

import PIL
import numpy as np
from matplotlib import pyplot as plt
%matplotlib inline

# the following line is not required if BatchFlow is installed as a python package.
sys.path.append("../..")
from batchflow import Dataset, DatasetIndex, R, P, V, C
from batchflow.opensets import MNIST

In [2]:
BATCH_SIZE = 10

Create a dataset

MNIST is a dataset of handwritten digits frequently used as a baseline for machine learning tasks.

Let's download the data and create the dataset (it might take a few minutes to complete)


In [3]:
dataset = MNIST()


Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Extracting /tmp/t10k-labels-idx1-ubyte.gz
Extracting /tmp/train-labels-idx1-ubyte.gz
Extracting /tmp/train-images-idx3-ubyte.gz
Extracting /tmp/t10k-images-idx3-ubyte.gz

MNIST dataset will create instances of ImagesBatch.

You can also use CIFAR:

from batchflow.opensets import CIFAR10

dataset = CIFAR10()

It takes much more time to download, though.

For CIFAR examples see the image augmentation tutorial.

Let's get a batch and look at the dataset content.


In [4]:
def show_images(batch):
    shape = batch.image_shape
    total_width = len(batch) * shape[0]
    img = PIL.Image.new('L' if shape[-1]==1 else 'RGB', (total_width, shape[1]))
    for image, offset in zip(batch.images, np.arange(0, len(batch)*shape[0], shape[0])):
        img.paste(image, (offset,0))
    fig, ax = plt.subplots(1, figsize=(10, 4))
    ax.axis('off')
    ax.imshow(img, cmap="gray" if shape[-1]==1 else None)
    plt.show()

In [5]:
batch = dataset.train.next_batch(BATCH_SIZE)
show_images(batch)


Execute the cell above several time to see different batches.

Define a pipeline

A pipeline represents a sequence of actions applied to a dataset.

These actions might come from Pipeline API or a batch class action-methods (e.g. ImagesBatch)

Just write them one after another.


In [6]:
simple_pipeline = (dataset.train.p
                     .scale(p=.5, factor=1.5, preserve_shape=True)
                     .rotate(p=.5, angle=60)
                     .salt(p=.5, color=255, p_noise=.05)
                     .elastic_transform(p=.5, alpha=20, sigma=1.8)
)

In [7]:
batch = simple_pipeline.next_batch(BATCH_SIZE)
show_images(batch)


Read the documentation for advanced pipeline techniques.

Use pipeline variables


In [8]:
simple_pipeline = (dataset.train.p
                     .init_variable('angle', 60)
                     .init_variable('factor', 1.5)
                     .init_variable('salt_color', 255)
                     .init_variable('proba', .5)
                     .scale(p=V('proba'), factor=V('factor'), preserve_shape=True)
                     .rotate(p=V('proba'), angle=V('angle'))
                     .salt(p=V('proba'), color=V('salt_color'), p_noise=.05)
                     .elastic_transform(p=V('proba'), alpha=20, sigma=1.8)
)

In [9]:
batch = simple_pipeline.next_batch(BATCH_SIZE)
show_images(batch)


Pipeline configuration

Same result can be achieved with a pipeline config.


In [10]:
config = dict(angle=60, factor=1.5, salt_color=255, proba=.5)

simple_pipeline = (dataset.train.pipeline(config)
                     .scale(p=C('proba'), factor=C('factor'), preserve_shape=True)
                     .rotate(p=C('proba'), angle=C('angle'))
                     .salt(p=C('proba'), color=C('salt_color'), p_noise=.05)
                     .elastic_transform(p=C('proba'), alpha=20, sigma=1.8)
)

In [11]:
batch = simple_pipeline.next_batch(BATCH_SIZE)
show_images(batch)


Sampling random values

Sometimes you might want random values instead of hard-coded constants.

R and P named expressions might come in handy here.


In [12]:
config = dict(salt_color=255, proba=.5)

simple_pipeline = (dataset.train.pipeline(config)
                     .scale(p=C('proba'), factor=P(R('normal', 1.5, .2)), preserve_shape=True)
                     .rotate(p=C('proba'), angle=R('uniform', -45, 45))
                     .salt(p=C('proba'), color=C('salt_color'), p_noise=.05)
                     .elastic_transform(p=C('proba'), alpha=20, sigma=1.8)
)

In [13]:
batch = simple_pipeline.next_batch(BATCH_SIZE)
show_images(batch)


The difference between R(...) and P(R(...)) is that the former gives a single random value for all batch items, while the latter gives a random value for each batch item.

Running pipelines

See the documentation or batch operations tutorial for a the detailed description.

next_batch


In [14]:
for i in range(5):
    batch = simple_pipeline.next_batch(BATCH_SIZE, n_epochs=1)
    show_images(batch)


See batch operations tutorial for more info about next_batch / gen_batch and their parameters (shuffle, drop_last, n_epochs, etc)

gen_batch

While next_batch is an ordinary method returning processed batches, gen_batch is a generator


In [15]:
for batch in simple_pipeline.gen_batch(BATCH_SIZE, n_epochs=1):
    # do whatever you want with the batch
    pass

Executing this cell might take a lot of time, depending on your hardware, pipeline content and the dataset size.

If you want to use large batches with heavy actions (or with I/O operations) then consider using target='threads'. It might gain considerable boost for multi-CPU platforms.

run

It is just a concise form of for batch in pipeline.gen_batch(...)


In [16]:
BATCH_SIZE=100

In [17]:
simple_pipeline.run(BATCH_SIZE, n_epochs=1, shuffle=True, drop_last=True, bar=True, prefetch=2)


100%|█████████▉| 599/600 [01:35<00:00,  6.28it/s]
Out[17]:
<batchflow.pipeline.Pipeline at 0x7fa4546e4dd8>

There is also a prefetch option for a faster execution and a bar option to show progress.

Now you might want to train a model or return to the table of contents.