Data Generators

author: Jacob Schreiber
contact: jmschreiber91@gmail.com

The most common way to pass data to machine learning models, like those defined using scikit-learn or keras, is in the format of a numpy array. However, there are times when this is either not possible, like in the case where the data set is so big that it cannot fit in memory, or simply inconvenient, like in the case where the data exists in some other format. Further, once the numpy array is passed in, control over the training process becomes limited to whatever built-in commands the package happens to support.

The concept of a data generator is a solution to both of these problems. Essentially, a data generator wraps a data set and yields batches of data. The pomegranate models then simply request batches from the generators, rather than handling the work of partitioning the data set themselves. Thus, generators become a flexible solution to a variety of problems because the user can define how exactly these batches are generated, including how data is read in, what pre-processing steps are performed on it, and when the epoch ends.

The internal fitting loop of most pomegranate models looks something like this:

with Parallel(n_jobs=n_jobs, backend='threading') as parallel:
    f = delayed(self.summarize, check_pickle=False)

    while improvement > stop_threshold and iteration < max_iterations + 1:
        # Update parameters from the stored sufficient statistics
        self.from_summaries(step_size, pseudocount)

        # Calculate new sufficient statistics from the data
        log_probability_sum = sum(parallel(f(*batch) for batch in 
            data_generator.batches()))

The code is fairly simple. First, the model creates a thread pool for parallel processing (defaulting back to sequential execution if n_jobs=1). Then, there is a loop that continues until either convergence or the maximum number of iterations is reached. Inside that loop, there is a parameter update step and a batch summarization step. This may seem out of order but is written this way for efficiency reasons we won't get into here. However, the important aspect is that the data generator has a method, batches, that generates batches for each epoch. These batches are then processed, either sequentially or in parallel, by the summarize method. Importantly, it is the data generator that returns these batches rather than some hard-coded process.


In [1]:
%matplotlib inline
import numpy
import pandas
import matplotlib.pyplot as plt
import seaborn; seaborn.set_style('whitegrid')

from pomegranate import *

numpy.random.seed(0)
numpy.set_printoptions(suppress=True)

%load_ext watermark
%watermark -m -n -p numpy,scipy,pomegranate|


Sun Dec 01 2019 

numpy 1.17.2
scipy 1.3.1
pomegranate| not installed

compiler   : GCC 7.3.0
system     : Linux
release    : 4.15.0-66-generic
machine    : x86_64
processor  : x86_64
CPU cores  : 8
interpreter: 64bit

Using a pre-defined generator

Currently, there are three generators that are built-in to pomegranate: DataGenerator, SequenceGenerator, and DataFrameGenerator. DataGenerator is a somewhat bland generator that takes in a data set and yields sequential chunks of data until the entire data set has been seen. SequenceGenerator takes a list of sequences and yields them one at a time, for use mostly for HiddenMarkovModel objects. Finally, DataFrameGenerator takes a pandas DataFrame and yields batches from it.

Let's take a look at learning a Bayesian network using a pandas DataFrame.


In [2]:
numpy.random.seed(0)

X = numpy.array([
    numpy.random.choice([True, False], size=10),
    numpy.random.choice(['A', 'B'], size=10),
    numpy.random.choice(2, size=10),
    numpy.random.choice(4, size=10),
    numpy.random.choice(['orange', 'blue', 'red', 'green'], size=10),
    numpy.random.choice(2, size=10),
    numpy.random.choice(2, size=10),
    numpy.random.choice(3, size=10)
], dtype=object).T.copy()

X_df = pandas.DataFrame(X, columns=['bool', 'str', 'int1', 'int2', 'color', 'int3', 'int4', 'int5'])
X_df.head()


Out[2]:
bool str int1 int2 color int3 int4 int5
0 True B 0 1 red 0 1 1
1 False A 1 0 green 1 0 1
2 False A 1 3 orange 1 1 2
3 True B 0 0 blue 1 0 0
4 False A 0 3 green 1 1 0

In [3]:
model1 = BayesianNetwork.from_samples(X_df, state_names=X_df.columns)
model1.structure


Out[3]:
((), (0,), (1,), (5,), (), (), (), (5,))

In [4]:
model2 = BayesianNetwork.from_samples(X, state_names=X_df.columns)
model2.structure


Out[4]:
((), (0,), (1,), (5,), (), (), (), (5,))

We can see that we're getting the same structure regardless of if we use a numpy array or a DataFrame. This is because, internally, each object that is not a data generator gets wrapped by DataGenerator before being used internally.


In [5]:
model1.predict_proba({'color': 'blue', 'str': 'B'})


Out[5]:
array([{
    "class" :"Distribution",
    "dtype" :"bool",
    "name" :"DiscreteDistribution",
    "parameters" :[
        {
            "True" :0.6666666666666666,
            "False" :0.3333333333333333
        }
    ],
    "frozen" :false
},
       'B',
       {
    "class" :"Distribution",
    "dtype" :"int",
    "name" :"DiscreteDistribution",
    "parameters" :[
        {
            "0" :1.0,
            "1" :0.0
        }
    ],
    "frozen" :false
},
       {
    "class" :"Distribution",
    "dtype" :"int",
    "name" :"DiscreteDistribution",
    "parameters" :[
        {
            "0" :0.29999999999999993,
            "1" :0.2000000000000001,
            "2" :0.1000000000000001,
            "3" :0.3999999999999999
        }
    ],
    "frozen" :false
},
       'blue',
       {
    "class" :"Distribution",
    "dtype" :"int",
    "name" :"DiscreteDistribution",
    "parameters" :[
        {
            "0" :0.4000000000000002,
            "1" :0.5999999999999999
        }
    ],
    "frozen" :false
},
       {
    "class" :"Distribution",
    "dtype" :"int",
    "name" :"DiscreteDistribution",
    "parameters" :[
        {
            "1" :0.30000000000000004,
            "0" :0.6999999999999998
        }
    ],
    "frozen" :false
},
       {
    "class" :"Distribution",
    "dtype" :"int",
    "name" :"DiscreteDistribution",
    "parameters" :[
        {
            "0" :0.29999999999999993,
            "1" :0.4,
            "2" :0.3
        }
    ],
    "frozen" :false
}], dtype=object)

It is important to note that Bayesian network structure learning does not operate in a batch-wise fashion. Rather, the data set is entirely generated by exhausting the generator in the batches method. In this case, the generator serves primarily as a way to convert a pandas DataFrame to a numpy array for use by the model.

How do you define a data generator?

The BaseGenerator looks basically like the following:

class BaseGenerator(object):
    def __init__(self):
        pass

    def __len__(self):
        return NotImplementedError

    @property
    def shape(self):
        return NotImplementedError

    @property
    def ndim(self):
        return NotImplementedError

    def batches(self):
        return NotImplementedError

    def unlabeled_batches(self):
        return NotImplementedError

    def labeled_batches(self):
        return NotImplementedError

Your custom data generator has to implement the subset of methods that will be relevant for your task.

The first few methods are related to managing the generator. The __init__ method should take in the parameters involved with loading the data set and how batches are to be generated, e.g. batch size, shuffling, etc.. The __len__ method should define the total number of examples in the data set, even when they aren't all read into disk. The shape and ndim methods are the counterparts to the numpy methods. While shape and __len__ are somewhat redundant with each other they are both necessary because some operations within pomegranate need to work regardless if a list, a numpy array, or a data generator have been passed in.

The remaining methods involve generating batches of data to operate on. The batches method should be a generator that yields batches until the epoch is over. This can be as simple as contiguous chunks of data until the entire data set has been seen (as in DataGenerator) or as complicated as randomly sampling examples until some pre-specified number of examples have been seen. Because each iteration of the training loop involves another call to batches the generator becomes reset each epoch. The unlabeled_batches and labeled_batches are used in the case of semi-supervised learning and, respectively, generate only examples that do are unlabeled from the data set and those that are labeled. These do not have to be implemented if the use case will never rely on semi-supervised learning.

Building a custom out-of-core data generator

As an example, let's build an out-of-core data generator that wraps a numpy array that lives on disk. The purpose of this generator would be to be able to generate batches from a data set without having to read the entire thing into memory. Because we are writing a data generator to do this, we don't have to rely on pomegranate internally being able to handle out-of-core operations; all we have to do is define how one would generate batches from it.


In [6]:
from pomegranate.io import BaseGenerator

class MemoryMapGenerator(BaseGenerator):
    def __init__(self, filename, weights=None, batch_size=None):
        self.X = numpy.load(filename, mmap_mode='r')
        
        if weights is None:
            self.weights = numpy.ones(self.X.shape[0])
        else:
            self.weights = numpy.load(weights, mmap_mode='r')
        
        if batch_size is None:
            self.batch_size = self.X.shape[0]
        else:
            self.batch_size = batch_size
            
    def __len__(self):
        return len(self.X)

    @property
    def shape(self):
        return self.X.shape

    @property
    def ndim(self):
        return self.X.ndim

    def batches(self):
        start, end = 0, self.batch_size
        iteration = 0

        while start < len(self):
            yield self.X[start:end], self.weights[start:end]

            start += self.batch_size
            end += self.batch_size
            iteration += 1

The implementation here was fairly straightforward, mostly because numpy is already super convenient when it comes to handling out-of-core data sets that are stored as numpy formatted arrays. All the generator does is load up slices of data (and corresponding slices of weights) and yields them until all examples have been seen.


In [7]:
X = numpy.random.randn(10000, 70)
numpy.save("X_test.npy", X)

X_generator = MemoryMapGenerator("X_test.npy", batch_size=1000)

Let's train a model using the original data set and by using a generator defined on the data set that now lives on disk


In [8]:
from pomegranate import GeneralMixtureModel, MultivariateGaussianDistribution

d = MultivariateGaussianDistribution
model1 = GeneralMixtureModel.from_samples(d, 2, X, verbose=True, batch_size=1000, max_iterations=10, init='first-k')


[1] Improvement: 22728.663993260125	Time (s): 0.01303
[2] Improvement: 351.98156800703146	Time (s): 0.01358
[3] Improvement: 196.71634020493366	Time (s): 0.01322
[4] Improvement: 136.9181309074629	Time (s): 0.0132
[5] Improvement: 98.06255334301386	Time (s): 0.01312
[6] Improvement: 73.51235836837441	Time (s): 0.01331
[7] Improvement: 57.17729074554518	Time (s): 0.01358
[8] Improvement: 46.950419555883855	Time (s): 0.01334
[9] Improvement: 40.84007772023324	Time (s): 0.01323
[10] Improvement: 36.80416104476899	Time (s): 0.01291
Total Improvement: 23767.626893157372
Total Time (s): 0.1473

In [9]:
model2 = GeneralMixtureModel.from_samples(d, 2, X_generator, verbose=True, max_iterations=10, init='first-k')


[1] Improvement: 22728.663993260125	Time (s): 0.01284
[2] Improvement: 351.98156800703146	Time (s): 0.01327
[3] Improvement: 196.71634020493366	Time (s): 0.013
[4] Improvement: 136.9181309074629	Time (s): 0.01306
[5] Improvement: 98.06255334301386	Time (s): 0.01332
[6] Improvement: 73.51235836837441	Time (s): 0.01341
[7] Improvement: 57.17729074554518	Time (s): 0.01342
[8] Improvement: 46.950419555883855	Time (s): 0.01333
[9] Improvement: 40.84007772023324	Time (s): 0.01305
[10] Improvement: 36.80416104476899	Time (s): 0.01355
Total Improvement: 23767.626893157372
Total Time (s): 0.1456

It looks like we're getting the same improvements and that it's not actually significantly slower to load up batches in an out-of-core manner than to generate them from a data set that lives in memory.