If you have a lot of training data, you probably don't want to load it all into memory at once. The QueueRunner in TensorFlow is a tool to efficiently employ a queue data-structure in a multi-threaded way.
In [1]:
import tensorflow as tf
import numpy as np
We will be running multiple threads, so let's figure out the number of CPUs:
In [2]:
import multiprocessing
NUM_THREADS = multiprocessing.cpu_count()
Generate some fake data to work with:
In [3]:
xs = np.random.randn(100, 3)
ys = np.random.randint(0, 2, size=100)
Here's a couple concrete examples of our data:
In [4]:
xs_and_ys = zip(xs, ys)
for _ in range(5):
x, y = next(xs_and_ys)
print('Input {} ---> Output {}'.format(x, y))
Define a queue:
In [5]:
queue = tf.FIFOQueue(capacity=1000, dtypes=[tf.float32, tf.int32])
Set up the enqueue and dequeue ops:
In [6]:
enqueue_op = queue.enqueue_many([xs, ys])
x_op, y_op = queue.dequeue()
Define a QueueRunner:
In [7]:
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
Now that all variables and ops have been defined, let's get started with a session:
In [8]:
sess = tf.InteractiveSession()
Create threads for the QueueRunner:
In [9]:
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
Test out dequeueing:
In [10]:
for _ in range(100):
if coord.should_stop():
break
x, y = sess.run([x_op, y_op])
print(x, y)
coord.request_stop()
coord.join(enqueue_threads)