This notebook illustrates some of the basic functionality of pescador: a package to facilitate iterative learning from data streams (implemented as python generators).
In [6]:
import pescador
import numpy as np
np.set_printoptions(precision=4)
import sklearn
import sklearn.datasets
import sklearn.linear_model
import sklearn.metrics
import sklearn.model_selection
In [7]:
def batch_sampler(X, Y, batch_size=20, scale = 1e-1):
'''A gaussian noise generator for data
Parameters
----------
X : ndarray
features, n_samples by dimensions
Y : ndarray
labels, n_samples
batch_size : int
size of the minibatches to generate
scale : float > 0
scale of the noise to add
Generates
---------
data
An infinite stream of data dictionaries
batch = dict(X=X[i], Y=Y[i])
'''
X = np.atleast_2d(X)
Y = np.atleast_1d(Y)
n, d = X.shape
while True:
i = np.random.randint(0, n, size=batch_size)
noise = scale * np.random.randn(batch_size, d)
yield {'X': X[i] + noise, 'Y': Y[i]}
In [8]:
# Load up the iris dataset for the demo
data = sklearn.datasets.load_iris()
X, Y = data.data, data.target
classes = np.unique(Y)
In [9]:
# What does the data stream look like?
# First, we'll wrap the generator function in a Streamer object.
# This is necessary for a few reasons, notably so that we can re-instantiate
# the generator multiple times (eg once per epoch)
batches = pescador.Streamer(batch_sampler, X, Y)
for q in batches(max_iter=3):
print(q)
In [10]:
%%time
ss = sklearn.model_selection.ShuffleSplit(n_splits=2, test_size=0.2)
for train, test in ss.split(np.arange(len(X))):
# Make an SGD learner, nothing fancy here
classifier = sklearn.linear_model.SGDClassifier(verbose=0,
loss='log',
penalty='l1',
n_iter=1)
# Again, build a streamer object
batches = pescador.Streamer(batch_sampler, X[train], Y[train])
# And train the model on the stream.
n_steps = 0
for batch in batches(max_iter=5e3):
classifier.partial_fit(batch['X'], batch['Y'], classes=classes)
n_steps += 1
# How's it do on the test set?
print('Test-set accuracy: {:.3f}'.format(sklearn.metrics.accuracy_score(Y[test], classifier.predict(X[test]))))
print('# Steps: ', n_steps)
It's possible that the learner is more or less efficient than the data generator. If the data generator has higher latency than the learner (SGDClassifier), then this will slow down the learning.
Pescador uses zeromq to parallelize data stream generation, effectively decoupling it from the learner.
In [11]:
%%time
ss = sklearn.model_selection.ShuffleSplit(n_splits=2, test_size=0.2)
for train, test in ss.split(np.arange(len(X))):
# Make an SGD learner, nothing fancy here
classifier = sklearn.linear_model.SGDClassifier(verbose=0,
loss='log',
penalty='l1',
n_iter=1)
# First, turn the data_generator function into a Streamer object
batches = pescador.Streamer(batch_sampler, X[train], Y[train])
# Then, send this thread to a second process
zmq_stream = pescador.ZMQStreamer(batches, 5156)
# And train the model on the stream.
n_steps = 0
for batch in zmq_stream(max_iter=5e3):
classifier.partial_fit(batch['X'], batch['Y'], classes=classes)
n_steps += 1
# How's it do on the test set?
print('Test-set accuracy: {:.3f}'.format(sklearn.metrics.accuracy_score(Y[test], classifier.predict(X[test]))))
print('# Steps: ', n_steps)