In [ ]:
%matplotlib inline
Simplify downloading, storing, iterating over and preprocessing data used to train machine learning models.
We'll go over a quick example to see what Fuel is capable of.
Let's start by creating some random data to act as features and targets. We'll pretend that we have eight 2x2 grayscale images separated into four classes.
In [ ]:
import numpy
seed = 1234
rng = numpy.random.RandomState(seed)
features = rng.randint(256, size=(8, 2, 2))
targets = rng.randint(4, size=(8, 1))
Our goal is to use Fuel to interface with this data, iterate over it in various ways and apply transformations to it on the fly.
There are four basic tasks that Fuel needs to handle:
Each of those four tasks is delegated to a particular class of objects, which we'll be introducing in order.
The simplest Dataset
subclass is IterableDataset
, which interfaces with iterable objects.
It is created by passing a dict
mapping source names to their associated data and, optionally, a dict
mapping source names to tuples of axis labels.
In [ ]:
from fuel.datasets import IterableDataset
dataset = IterableDataset(
iterables={'features': features, 'targets': targets},
axis_labels={'features': ('batch', 'height', 'width'),
'targets': ('batch', 'index')})
We can ask the dataset what sources of data it provides by accessing its sources
attribute. We can also know which axes correspond to what by accessing its axis_labels
attribute. It also has a num_examples
property telling us the number of examples it contains.
In [ ]:
print('Sources are {}.'.format(dataset.sources))
print('Axis labels are {}.'.format(dataset.axis_labels))
print('Dataset contains {} examples.'.format(dataset.num_examples))
Datasets themselves are stateless objects (as opposed to, say, an open file handle, or an iterator object). In order to request data from the dataset, we need to ask it to instantiate some stateful object with which it will interact. This is done through the open
method:
In [ ]:
state = dataset.open()
print(state.__class__.__name__)
We see that in IterableDataset
's case the state is an iterator object. We can now visit the examples this dataset contains using its get_data
method.
In [ ]:
print(dataset.get_data(state=state))
(Note that the return order depends on the order of dataset.sources
, which is nondeterministic if you use dict
instances. In order to have deterministic behaviour, it is recommended that you use OrderedDict
instances instead.)
Eventually, the iterator is depleted and it raises a StopIteration
exception. We can iterate over the dataset again by requesting a fresh iterator through the dataset's reset
method.
In [ ]:
while True:
try:
dataset.get_data(state=state)
except StopIteration:
print('Iteration over')
break
state = dataset.reset(state=state)
print(dataset.get_data(state=state))
dataset.close(state=state)
The IterableDataset
implementation is pretty minimal. For instance, it only lets you iterate sequentially and examplewise over your data.
If your data happens to be indexable (e.g. a list, or a numpy array), then IndexableDataset
will let you do much more.
We instantiate IndexableDataset
just like IterableDataset
.
In [ ]:
from fuel.datasets import IndexableDataset
from collections import OrderedDict
dataset = IndexableDataset(
indexables=OrderedDict([('features', features), ('targets', targets)]),
axis_labels={'features': ('batch', 'height', 'width'), 'targets': ('batch', 'index')})
The main advantage of IndexableDataset
over IterableDataset
is that it allows random access of the data it contains. In order to do so, we need to pass an additional request
argument to get_data
in the form of a list of indices.
In [ ]:
state = dataset.open()
print('State is {}'.format(state))
print(dataset.get_data(state=state, request=[0, 1]))
dataset.close(state=state)
(See how IndexableDataset
returns a None
state: this is because there's no actual state to maintain in this case.)
In [ ]:
restricted_dataset = IndexableDataset(
indexables=OrderedDict([('features', features), ('targets', targets)]),
axis_labels={'features': ('batch', 'height', 'width'), 'targets': ('batch', 'index')},
sources=('features',))
state = restricted_dataset.open()
print(restricted_dataset.get_data(state=state, request=[0, 1]))
restricted_dataset.close(state=state)
You can see that in this case only the features are returned by get_data
.
Encapsulating and accessing our data is good, but if we're to integrate it into a training loop, we need to be able to iterate over the data. For that, we need to decide which indices to request and in which order. This is accomplished via an IterationScheme
subclass.
At its most basic level, an iteration scheme is responsible, through its get_request_iterator
method, for building an iterator that will return requests. Here are some examples:
In [ ]:
from fuel.schemes import (SequentialScheme, ShuffledScheme,
SequentialExampleScheme, ShuffledExampleScheme)
schemes = [SequentialScheme(examples=8, batch_size=4),
ShuffledScheme(examples=8, batch_size=4),
SequentialExampleScheme(examples=8),
ShuffledExampleScheme(examples=8)]
for scheme in schemes:
print([request for request in scheme.get_request_iterator()])
We can therefore use an iteration scheme to visit a dataset in some order.
In [ ]:
state = dataset.open()
scheme = ShuffledScheme(examples=dataset.num_examples, batch_size=4)
for request in scheme.get_request_iterator():
data = dataset.get_data(state=state, request=request)
print(data[0].shape, data[1].shape)
dataset.close(state)
Iteration schemes offer a more convenient way to visit the dataset than accessing the data by hand, but we can do better: the act of getting a fresh state from the dataset, getting a request iterator from the iteration scheme, using both to access the data and closing the state is repetitive. To automate this, we have data streams, which are subclasses of AbstractDataStream
.
The most common AbstractDataStream
subclass is DataStream
. It is instantiated with a dataset and an iteration scheme, and returns an epoch iterator through its get_epoch_iterator
method, which iterates over the dataset in the order defined by the iteration scheme.
In [ ]:
from fuel.streams import DataStream
data_stream = DataStream(dataset=dataset, iteration_scheme=scheme)
for data in data_stream.get_epoch_iterator():
print(data[0].shape, data[1].shape)
Some data streams take data streams as input. We call them transformers, and they enable us to build complex data preprocessing pipelines.
Transformers are Transformer
subclasses. Most of the the transformers you'll encounter are located in the fuel.transformers
module. Here are some commonly used ones:
Flatten
: flattens the input into a matrix (for batch input) or a vector (for examplewise input).ScaleAndShift
: scales and shifts the input by scalar quantities.Cast
: casts the input into some data type.As an example, let's standardize the images we have by substracting their mean and dividing by their standard deviation.
In [ ]:
from fuel.transformers import ScaleAndShift
# Note: ScaleAndShift applies (batch * scale) + shift, as
# opposed to (batch + shift) * scale.
scale = 1.0 / features.std()
shift = - scale * features.mean()
standardized_stream = ScaleAndShift(data_stream=data_stream,
scale=scale, shift=shift,
which_sources=('features',))
The resulting data stream can be used to iterate over the dataset just like before, but this time features will be standardized on-the-fly.
In [ ]:
for batch in standardized_stream.get_epoch_iterator():
print(batch)
Now, let's imagine that for some reason (e.g. running Theano code on GPU) we need features to have a data type of float32
.
In [ ]:
from fuel.transformers import Cast
cast_standardized_stream = Cast(data_stream=standardized_stream,
dtype='float32', which_sources=('features',))
As you can see, Fuel makes it easy to chain transformations to form a preprocessing pipeline. The complete pipeline now looks like this:
In [ ]:
data_stream = Cast(
ScaleAndShift(
DataStream(
dataset=dataset, iteration_scheme=scheme),
scale=scale, shift=shift, which_sources=('features',)),
dtype='float32', which_sources=('features',))
for batch in data_stream.get_epoch_iterator():
print(batch)
You now know enough to find your way around Fuel. Let's cover some more advanced use cases.
Sometimes, the dataset you're working on is too big to fit in memory. In that case, you'll want to use another common Dataset
subclass, H5PYDataset
.
As the name implies, H5PYDataset
is a dataset class that interfaces with HDF5 files using the h5py
library.
HDF5 is a wonderful storage format, as it is organizable and self-documentable. This allows us to make a basic set of assumptions about the structure of an HDF5 file which, if met, greatly simplify creating new datasets and interacting with them. We won't go through these assumptions right now, but if you're curious, the online documentation offers an in-depth tutorial on how to create new H5PYDataset
-compatible files.
Let's create new random data. This time, we'll pretend that we're given a training set and a test set.
In [ ]:
train_image_features = rng.randint(256, size=(90, 3, 32, 32)).astype('uint8')
train_vector_features = rng.normal(size=(90, 16))
train_targets = rng.randint(10, size=(90, 1)).astype('uint8')
test_image_features = rng.randint(256, size=(10, 3, 32, 32)).astype('uint8')
test_vector_features = rng.normal(size=(10, 16))
test_targets = rng.randint(10, size=(10, 1)).astype('uint8')
We now create an HDF5 file and populate it with our data.
In [ ]:
import h5py
from fuel.converters.base import fill_hdf5_file
f = h5py.File('dataset.hdf5', mode='w')
data = (('train', 'image_features', train_image_features),
('train', 'vector_features', train_vector_features),
('train', 'targets', train_targets),
('test', 'image_features', test_image_features),
('test', 'vector_features', test_vector_features),
('test', 'targets', test_targets))
fill_hdf5_file(f, data)
The fill_hdf5_file
function fills the HDF5 file with our data and sets up metadata so H5PYDataset
is able to recover our train and test splits.
Before closing the file, let's also tag axes with their label. The populated HDF5 file features one dataset per data source (in our case, image_features
, vector_features
and targets
), whose dimensions we can tag with a name. H5PYDataset
is able to recover this information and create an axis_labels
dict for us.
In [ ]:
for i, label in enumerate(('batch', 'channel', 'height', 'width')):
f['image_features'].dims[i].label = label
for i, label in enumerate(('batch', 'feature')):
f['vector_features'].dims[i].label = label
for i, label in enumerate(('batch', 'index')):
f['targets'].dims[i].label = label
f.flush()
f.close()
We now have everything we need to load this HDF5 file in Fuel.
We'll instantiate H5PYDataset
by passing it the path to our HDF5 file as well as a tuple of splits to use. For now, we'll just load the train and test sets separately, but note that it is also possible to concatenate splits that way (e.g. concatenate the training and validation sets).
In [ ]:
from fuel.datasets import H5PYDataset
train_dataset = H5PYDataset('dataset.hdf5', which_sets=('train',))
test_dataset = H5PYDataset('dataset.hdf5', which_sets=('test',))
H5PYDataset
instances allow the same level of introspection as IndexableDataset
instances.
In [ ]:
print('Sources are {}.'.format(train_dataset.sources))
print('Axis labels are {}.'.format(train_dataset.axis_labels))
print('Training set contains {} examples.'.format(train_dataset.num_examples))
print('Test set contains {} examples.'.format(test_dataset.num_examples))
We can iterate over data the same way as well.
In [ ]:
train_stream = DataStream(
dataset=train_dataset,
iteration_scheme=ShuffledScheme(
examples=train_dataset.num_examples, batch_size=10))
for batch in train_stream.get_epoch_iterator():
print([source.shape for source in batch])
The H5PYDataset
class isn't suitable only to large datasets. In fact, most of Fuel's built-in datasets rely on HDF5 for storage.
At first sight, this might seem inefficient (data in an HDF5 file is read off disk instead of being stored in memory, which is considerably slower), but H5PYDataset
features a load_in_memory
constructor argument which, when set to True
, reads data off disk once and stores it in memory as a numpy array.
Fuel aims to facilitate iterating over and transforming data, which we've covered up to now, but one of its goals is also to make it easy to download, convert and store often-used datasets. This is what will be covered in this section.
Built-in datasets are datasets which can be obtained through Fuel's automated downloading and conversion tools. Here are some built-in datasets available in Fuel:
Fuel implements specific Dataset
subclasses for each of the built-in datasets. They all expect their corresponding data files to be contained inside one of the directories defined in the Fuel data path.
You can define this data path by setting the data_path
variable in ~/.fuelrc
:
You can override it by setting the FUEL_DATA_PATH
environment variable.
In both cases, Fuel expects a sequence of paths separated by an OS-specific delimiter (:
for Linux / Mac OS, ;
for Windows).
Let's create a directory in which to put our data files and set it as our Fuel data path.
In [ ]:
!mkdir fuel_data
import os
os.environ['FUEL_DATA_PATH'] = os.path.abspath('./fuel_data')
In [ ]:
!fuel-download mnist -d $FUEL_DATA_PATH
In [ ]:
!fuel-convert mnist -d $FUEL_DATA_PATH -o $FUEL_DATA_PATH
In [ ]:
from fuel.datasets import MNIST
from matplotlib import pyplot, cm
dataset = MNIST(('train',), sources=('features',))
state = dataset.open()
image, = dataset.get_data(state=state, request=[1234])
pyplot.imshow(image.reshape((28, 28)), cmap=cm.Greys_r, interpolation='nearest')
pyplot.show()
dataset.close(state)
Datasets can define a convenience transformer pipeline, which can automatically be applied when instantiating a data stream by using the alternative DataStream.default_stream
constructor. We call these default transformers. Use cases for default transformers include the following:
uint8
bytes, which is space-efficient. However, this means that pixel values range from 0 to 255, as opposed to the [0.0, 1.0]
range machine learning practitioners are used to. In order to reduce the amount of boilerplate code users have to write to use these datasets, their default transformers divide features by 255 and cast them as floatX
.Default transformers are defined through the default_transformers
class attribute. It is expected to be a tuple with one element per transformer in the pipeline. Each element is a tuple with three elements:
Transformer
subclass to apply,Let's look at what MNIST defines as a default transformer:
In [ ]:
print(MNIST.default_transformers)
Like explained above, MNIST defines a two-transformer pipeline as its default transformers. The first transformer scales the features by 1 / 255 so that they range between 0 and 1, and the second transformer casts the features to floatX
.
Let's compare the output of a data stream with and without the default transformers applied.
In [ ]:
vanilla_stream = DataStream(
dataset=dataset,
iteration_scheme=SequentialExampleScheme(dataset.num_examples))
print(next(vanilla_stream.get_epoch_iterator())[0].max())
default_stream = DataStream.default_stream(
dataset=dataset,
iteration_scheme=SequentialExampleScheme(dataset.num_examples))
print(next(default_stream.get_epoch_iterator())[0].max())
New dataset classes are implemented by subclassing Dataset
and implementing a get_data
method. If your dataset interacts with stateful objects (e.g. files on disk), then you should also override the open
and close
methods.
If your data fits in memory, you can save yourself some time by inheriting from IndexableDataset
. In that case, all you need to do is load the data as a dict
mapping source names to their corresponding data and pass it to the superclass as the indexables
argument.
For instance, here's how you would implement a specialized class to interface with .npy
files.
In [ ]:
from six import iteritems
class NPYDataset(IndexableDataset):
def __init__(self, source_paths, **kwargs):
indexables = dict(
[(source, numpy.load(path)) for
source, path in iteritems(source_paths)])
super(NPYDataset, self).__init__(indexables, **kwargs)
Here's this class in action:
In [ ]:
numpy.save('fuel_data/npy_dataset_features.npy',
numpy.arange(40).reshape((10, 4)))
numpy.save('fuel_data/npy_dataset_targets.npy',
numpy.arange(10).reshape((10, 1)))
dataset = NPYDataset({'features': 'fuel_data/npy_dataset_features.npy',
'targets': 'fuel_data/npy_dataset_targets.npy'})
state = dataset.open()
print(dataset.get_data(state=state, request=[0, 1, 2, 3]))
dataset.close(state)
An important thing to know about data streams is that they distinguish between two types of outputs: single examples, and batches of examples. Depending on your choice of iteration scheme, a data stream's produces_examples
property will either be True
(it produces examples) or False
(it produces batches).
Transformers are aware of this, and as such implement two distinct methods: transform_example
and transform_batch
. A new transformer is typically implemented by subclassing Transformer
and implementing one or both of these methods.
As an example, here's how you would double the value of the 'features'
data source.
In [ ]:
from fuel.transformers import Transformer
class FeaturesDoubler(Transformer):
def __init__(self, data_stream, **kwargs):
super(FeaturesDoubler, self).__init__(
data_stream=data_stream,
produces_examples=data_stream.produces_examples,
**kwargs)
def transform_example(self, example):
if 'features' in self.sources:
example = list(example)
index = self.sources.index('features')
example[index] *= 2
example = tuple(example)
return example
def transform_batch(self, batch):
if 'features' in self.sources:
batch = list(batch)
index = self.sources.index('features')
batch[index] *= 2
batch = tuple(batch)
return batch
Most transformers you'll implement will call their superclass constructor by passing the data stream and declaring whether they produce examples or batches. Since we wish to support both batches and examples, we'll declare our output type to be the same as our data stream's output type.
If you were to build a transformer that only works on batches, you would pass produces_examples=False
and implement only transform_batch
. If anyone tried to use your transformer on an example data stream, an error would automatically be raised.
Let's test our doubler on some dummy dataset. Note that the this implementation is brittle and only works on numpy arrays.
In [ ]:
dataset = IndexableDataset(
indexables={'features': numpy.array([1, 2, 3, 4]),
'targets': numpy.array([-1, 1, -1, 1])})
example_scheme = SequentialExampleScheme(examples=dataset.num_examples)
example_stream = FeaturesDoubler(
data_stream=DataStream(
dataset=dataset, iteration_scheme=example_scheme))
batch_scheme = SequentialScheme(
examples=dataset.num_examples, batch_size=2)
batch_stream = FeaturesDoubler(
data_stream=DataStream(
dataset=dataset, iteration_scheme=batch_scheme))
print([example for example in example_stream.get_epoch_iterator()])
print([batch for batch in batch_stream.get_epoch_iterator()])
If you think the transform_example
and transform_batch
implementations are repetitive, you're right! In cases where the example and batch implementations of a transformer are the same, you can subclass from AgnosticTransformer
instead. It requires that you implement a transform_any
method, which will be called by both transform_example
and transform_batch
.
In [ ]:
from fuel.transformers import AgnosticTransformer
class FeaturesDoubler(AgnosticTransformer):
def __init__(self, data_stream, **kwargs):
super(FeaturesDoubler, self).__init__(
data_stream=data_stream,
produces_examples=data_stream.produces_examples,
**kwargs)
def transform_any(self, data):
if 'features' in self.sources:
data = list(data)
index = self.sources.index('features')
data[index] *= 2
data = tuple(data)
return data
Our transformer could be more general: what if we want to double 'features'
and 'targets'
, or only 'targets'
?
Transformers which are applied sourcewise like our doubler should usually subclass from SourcewiseTransformer
. Their constructor takes an additional which_sources
keyword argument specifying which sources to apply the transformer to. It's expected to be a tuple of source names. If which_sources
is None
, then the transformer is applied to all sources. Subclasses of SourcewiseTransformer
should implement a transform_source_example
method and/or a transform_source_batch
method, which apply on an individual source.
There also exists an AgnosticSourcewiseTransformer
class for cases where the example and batch implementations of a sourcewise transformer are the same. This class requires a transform_any_source
method to be implemented.
In [ ]:
from fuel.transformers import AgnosticSourcewiseTransformer
class Doubler(AgnosticSourcewiseTransformer):
def __init__(self, data_stream, **kwargs):
super(Doubler, self).__init__(
data_stream=data_stream,
produces_examples=data_stream.produces_examples,
**kwargs)
def transform_any_source(self, source, _):
return 2 * source
Let's try this implementation on our dummy dataset.
In [ ]:
target_stream = Doubler(
data_stream=DataStream(
dataset=dataset,
iteration_scheme=batch_scheme),
which_sources=('targets',))
all_stream = Doubler(
data_stream=DataStream(
dataset=dataset,
iteration_scheme=batch_scheme),
which_sources=None)
print([batch for batch in target_stream.get_epoch_iterator()])
print([batch for batch in all_stream.get_epoch_iterator()])
Finally, there exists a Mapping
transformer which acts as a swiss-army knife transformer. In addition to a data stream, its constructor accepts a function which will be applied to data coming from the stream.
Here's how you would implement the feature doubler using Mapping
.
In [ ]:
from fuel.transformers import Mapping
features_index = dataset.sources.index('features')
def double(data):
data = list(data)
data[features_index] *= 2
return tuple(data)
mapping_stream = Mapping(
data_stream=DataStream(
dataset=dataset, iteration_scheme=batch_scheme),
mapping=double)
print([batch for batch in mapping_stream.get_epoch_iterator()])
New iteration schemes are implemented by subclassing IterationScheme
and implementing a get_request_iterator
method, which should return an iterator that returns lists of indices.
Two subclasses of IterationScheme
typically serve as a basis for other iteration schemes: IndexScheme
(for schemes requesting examples) and BatchScheme
(for schemes requesting batches). Both subclasses are instantiated by providing a list of indices or a number of examples, and BatchScheme
accepts an additional batch_size
argument.
Here's how you would implement an iteration scheme that iterates over even examples:
In [ ]:
from fuel.schemes import IndexScheme, BatchScheme
# `iter_` : A picklable version of `iter`
from picklable_itertools import iter_, imap
# Partition all elements of a sequence into tuples of length at most n
from picklable_itertools.extras import partition_all
class ExampleEvenScheme(IndexScheme):
def get_request_iterator(self):
indices = list(self.indices)[::2]
return iter_(indices)
class BatchEvenScheme(BatchScheme):
def get_request_iterator(self):
indices = list(self.indices)[::2]
return imap(list, partition_all(self.batch_size, indices))
Here are the two iteration scheme classes in action:
In [ ]:
print(list(ExampleEvenScheme(10).get_request_iterator()))
print(list(BatchEvenScheme(10, 2).get_request_iterator()))
Fuel allows to parallelize data processing in a separate process. This feature is still under development, but it is already pretty useful.
Implementing a parallelized preprocessing pipeline is done in two steps. At first, you should write a Python script that sets up the data processing pipeline and spawns a server that listens to requests. See the fuel_server
notebook for more details on that.
Once the server is up and running, you'll need to instantiate a ServerDataStream
instance, which will connect to the server and make requests.
In [ ]:
import argparse
import time
from fuel.streams import DataStream, ServerDataStream
from fuel.transformers import Transformer
class Bottleneck(Transformer):
def __init__(self, data_stream, **kwargs):
self.slowdown = kwargs.pop('slowdown', 0)
super(Bottleneck, self).__init__(
data_stream, data_stream.produces_examples, **kwargs)
def get_data(self, request=None):
if request is not None:
raise ValueError
time.sleep(self.slowdown)
return next(self.child_epoch_iterator)
dataset = IndexableDataset({'features': [[0] * 128] * 1000})
iteration_scheme = ShuffledScheme(examples=1000, batch_size=100)
regular_data_stream = Bottleneck(
data_stream=DataStream(
dataset=dataset, iteration_scheme=iteration_scheme),
slowdown=0.005)
In [ ]:
def time_iteration(parallel):
if parallel:
data_stream = ServerDataStream(('features',), produces_examples=False)
else:
data_stream = regular_data_stream
start_time = time.time()
for i in range(10):
for data in data_stream.get_epoch_iterator(): time.sleep(0.01)
stop_time = time.time()
print('Training took {} seconds'.format(stop_time - start_time))
In [ ]:
time_iteration(False)
In [ ]:
time_iteration(True)