Building and running a preprocessing pipeline

In this example, an image processing pipeline is created and then executed in a manner that maximize throughput.


In [ ]:
from PIL import Image, ImageOps
import seqtools

In [ ]:
! [[ -f owl.jpg ]] || curl -s "https://cdn.pixabay.com/photo/2017/04/07/01/05/owl-2209827_640.jpg" -o owl.jpg
! [[ -f rooster.jpg ]] || curl -s "https://cdn.pixabay.com/photo/2018/08/26/14/05/hahn-3632299_640.jpg" -o rooster.jpg
! [[ -f duck.jpg ]] || curl -s "https://cdn.pixabay.com/photo/2018/09/02/10/03/violet-duck-3648415_640.jpg" -o duck.jpg
! [[ -f bird.jpg ]] || curl -s "https://cdn.pixabay.com/photo/2018/08/21/05/15/tit-3620632_640.jpg" -o bird.jpg
! [[ -f dog.jpg ]] || curl -s "https://cdn.pixabay.com/photo/2018/09/04/18/07/pug-3654360_640.jpg" -o dog.jpg
! [[ -f hedgehog.jpg ]] || curl -s "https://cdn.pixabay.com/photo/2018/09/04/18/52/hedgehog-3654434_640.jpg" -o hedgehog.jpg

Initial data loading

SeqTools works with list-like indexable objects, so the first step is to create one that maps to our samples, then this object will be passed to functions that apply the desired transformations. In this example, we represent our samples with their file names and store them in a list.


In [ ]:
labels = ['owl', 'rooster', 'duck', 'bird', 'dog', 'hedgehog']
# We artificially increase the size of the dataset for the example
labels = [labels[i % len(labels)] for i in range(200)]

image_files = [l + '.jpg' for l in labels]

Let's load the full resolution images, the result cannot normally fit into memory, but with SeqTools the evaluation is delayed until the images are actually accessed.


In [ ]:
raw_images = seqtools.smap(Image.open, image_files)

We can verify the result for one sample, this will trigger its evaluation and return it:


In [ ]:
raw_images[0]

Mapping transformations

As a first preprocessing stage, we can normalize the size:


In [ ]:
def normalize_size(im):
    w, h = im.size
    left_crop = w // 2 - h // 2
    return im.resize((200, 200), Image.BILINEAR, box=(left_crop, 1, h, h))

small_images = seqtools.smap(normalize_size, raw_images)

small_images[1]

then apply common preprocessing steps:


In [ ]:
contrasted = seqtools.smap(ImageOps.autocontrast, small_images)
equalized = seqtools.smap(ImageOps.equalize, contrasted)
grayscale = seqtools.smap(ImageOps.grayscale, equalized)

grayscale[0]

That preprocessing seems a bit over the top... let's check where it went wrong:


In [ ]:
equalized[0]

In [ ]:
contrasted[0]

For each sample, the minimal set of computations was run to produce the requested item. We find here that equalization is inappropriate and autocontrast is too weak, let's fix this.


In [ ]:
grayscale = seqtools.smap(ImageOps.grayscale, small_images)
contrasted = seqtools.smap(lambda im: ImageOps.autocontrast(im, cutoff=3), grayscale)

contrasted[0]

Combining datasets

Then we want to augment the dataset by flipping:


In [ ]:
# Generate flipped versions of the images
flipped = seqtools.smap(ImageOps.mirror, contrasted)

# Combine with the original dataset
augmented_dataset = seqtools.concatenate([contrasted, flipped])

augmented_dataset[-1]

Evaluation

Once satisfied with our preprocessing pipeline, evaluating all values is simply done by iterating over the elements or forcing the conversion to a list:


In [ ]:
%time computed_values = list(augmented_dataset);

This above evaluation is a bit slow, probably due to the IO operations when loading the images from the hard drive. Maybe using multiple threads could help keep the CPU busy?


In [ ]:
fast_dataset = seqtools.prefetch(augmented_dataset, max_buffered=10, nworkers=2)
%time computed_values = list(fast_dataset)

The CPU time is the same because the computations are the same (plus some threading overhead), but wall time is cut down because image processing continues for some images while others are being loaded.

However, we could spare some IO by not reading the same image a twice when generating the augmented version, and by the same token save some shared transformations. To avoid having one thread taking the regular image and another the flipped one in parallel, which would incur a cache miss for the latter, we propose to simply compute the transformed image and its flipped version in one step:


In [ ]:
regular_and_flipped = seqtools.smap(lambda im: (im, ImageOps.mirror), contrasted)
fast_dataset = seqtools.prefetch(regular_and_flipped, max_buffered=10, nworkers=2)

The output now contains pairs of images, to flatten them into a sequence of images, we can "unbatch" them:


In [ ]:
fast_dataset = seqtools.add_cache(fast_dataset, cache_size=1)
flat_dataset = seqtools.unbatch(fast_dataset, batch_size=2)

The cache is here to avoid recomputing the pair of images when the second one is accessed, indeed, SeqTools works in a stateless on-demand fashion and recomputes everything by default.

Please note that concatenation would be inappropriate to replace unbatching here. Concatenation checks the length of the sequences to join, which requires in this situation to compute each element from fast_dataset before-hand. Besides, unbatching has slightly faster access times because it assumes that all batches have the same size.


In [ ]:
%time computed_values = list(flat_dataset)

CPU time is now cut by half, leaving more room for other processes.

Notes

SeqTools facilitates the manipulation of sequences and reduces the boilerplate from using multi-threading or multi-processing. However, users must remain conscious of the mechanic under the hood to avoid pitfalls. Here are a few points to keep in mind while programming with this library:

  • Avoid spurious computations when reading the same element multiple times, maybe use cache.
  • Check for thread/process under-utilization, avoid having threads waiting for each other or doing redundant work.
  • Multi-processing has limitations: communication over-head, no shared cache between processes...
  • Threading has limitations: no more than one thread can execute python code at any given time, concurrency is mostly limited to system IO operations.