TensorFlow queues and input pipelines

Initialization


In [24]:
import numpy as np
import tensorflow as tf
import threading
import time

import os
from cStringIO import StringIO
import numpy as np
from functools import partial

from IPython.display import clear_output, Image, display, HTML

import tensorflow as tf

def strip_consts(graph_def, max_const_size=32):
    """Strip large constant values from graph_def."""
    strip_def = tf.GraphDef()
    for n0 in graph_def.node:
        n = strip_def.node.add() 
        n.MergeFrom(n0)
        if n.op == 'Const':
            tensor = n.attr['value'].tensor
            size = len(tensor.tensor_content)
            if size > max_const_size:
                tensor.tensor_content = "<stripped %d bytes>"%size
    return strip_def
  
def rename_nodes(graph_def, rename_func):
    res_def = tf.GraphDef()
    for n0 in graph_def.node:
        n = res_def.node.add() 
        n.MergeFrom(n0)
        n.name = rename_func(n.name)
        for i, s in enumerate(n.input):
            n.input[i] = rename_func(s) if s[0]!='^' else '^'+rename_func(s[1:])
    return res_def
def show_graph(graph_def, max_const_size=32):
    """Visualize TensorFlow graph."""
    if hasattr(graph_def, 'as_graph_def'):
        graph_def = graph_def.as_graph_def()
    strip_def = strip_consts(graph_def, max_const_size=max_const_size)
    code = """
        <script>
          function load() {{
            document.getElementById("{id}").pbtxt = {data};
          }}
        </script>
        <link rel="import" href="https://tensorboard.appspot.com/tf-graph-basic.build.html" onload=load()>
        <div style="height:600px">
          <tf-graph-basic id="{id}"></tf-graph-basic>
        </div>
    """.format(data=repr(str(strip_def)), id='graph'+str(np.random.rand()))
  
    iframe = """
        <iframe seamless style="width:1200px;height:620px;border:0" srcdoc="{}"></iframe>
    """.format(code.replace('"', '&quot;'))
    display(HTML(iframe))
    
def create_session():
    sess = tf.InteractiveSession(config=tf.ConfigProto(operation_timeout_in_ms=3000))
    return sess

In [25]:
tf.reset_default_graph()
a = tf.constant(1)
b = tf.constant(2)
c = a+b
d = c*a
e = d-c

In [26]:
show_graph(tf.get_default_graph().as_graph_def())


Simple queue


In [27]:
tf.reset_default_graph()
q = tf.FIFOQueue(capacity=20, dtypes=[tf.int32])
enqueue_placeholder = tf.placeholder(tf.int32)
enqueue_op = q.enqueue(enqueue_placeholder)
sess = create_session()
for i in range(10):
    sess.run(enqueue_op, feed_dict={enqueue_placeholder:i})
    print "Queue size is now: "+str(sess.run(q.size()))
sess.run(q.close())


Queue size is now: 1
Queue size is now: 2
Queue size is now: 3
Queue size is now: 4
Queue size is now: 5
Queue size is now: 6
Queue size is now: 7
Queue size is now: 8
Queue size is now: 9
Queue size is now: 10

In [5]:
try:
    for i in range(20):
        print sess.run(q.dequeue())
except tf.errors.OutOfRangeError:
    print "Done"


0
1
2
3
4
5
6
7
8
9
Done

Simple queue with multiple enqueues in parallel


In [6]:
tf.reset_default_graph()
random_number = tf.random_uniform(shape=())
q = tf.FIFOQueue(capacity=20, dtypes=[tf.float32])
enqueue_op = q.enqueue(random_number)

sess = create_session()
print sess.run(q.size())
def run():
  for i in range(5):
    sess.run(enqueue_op)

threads = [threading.Thread(target=run, args=()) for i in range(2)]
[t.start() for t in threads]
print sess.run(q.size())
time.sleep(0.5)
print sess.run(q.size())


0
0
Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x7fadf8bb7a10>> ignored
10

Setting up range_input_producer


In [7]:
inn = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)

In [8]:
show_graph(tf.get_default_graph().as_graph_def())


Lookup name of size operation in GraphDef

If you don't want to create new Node with "op.size", you can find name of exiting node by looking for with op="QueueSize" in the graph definition, and fetching it directly. In this case, proper run call will look like this -- sess.run("input_producer/fraction_of_32_full/fraction_of_32_full_Size:0")


In [9]:
tf.get_default_graph().as_graph_def()


Out[9]:
node {
  name: "random_uniform/shape"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_INT32
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_INT32
        tensor_shape {
          dim {
          }
        }
      }
    }
  }
}
node {
  name: "random_uniform/min"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_FLOAT
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_FLOAT
        tensor_shape {
        }
        float_val: 0.0
      }
    }
  }
}
node {
  name: "random_uniform/max"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_FLOAT
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_FLOAT
        tensor_shape {
        }
        float_val: 1.0
      }
    }
  }
}
node {
  name: "random_uniform/RandomUniform"
  op: "RandomUniform"
  input: "random_uniform/shape"
  attr {
    key: "T"
    value {
      type: DT_INT32
    }
  }
  attr {
    key: "dtype"
    value {
      type: DT_FLOAT
    }
  }
  attr {
    key: "seed"
    value {
      i: 0
    }
  }
  attr {
    key: "seed2"
    value {
      i: 0
    }
  }
}
node {
  name: "random_uniform/sub"
  op: "Sub"
  input: "random_uniform/max"
  input: "random_uniform/min"
  attr {
    key: "T"
    value {
      type: DT_FLOAT
    }
  }
}
node {
  name: "random_uniform/mul"
  op: "Mul"
  input: "random_uniform/RandomUniform"
  input: "random_uniform/sub"
  attr {
    key: "T"
    value {
      type: DT_FLOAT
    }
  }
}
node {
  name: "random_uniform"
  op: "Add"
  input: "random_uniform/mul"
  input: "random_uniform/min"
  attr {
    key: "T"
    value {
      type: DT_FLOAT
    }
  }
}
node {
  name: "fifo_queue"
  op: "FIFOQueue"
  attr {
    key: "capacity"
    value {
      i: 20
    }
  }
  attr {
    key: "component_types"
    value {
      list {
        type: DT_FLOAT
      }
    }
  }
  attr {
    key: "container"
    value {
      s: ""
    }
  }
  attr {
    key: "shapes"
    value {
      list {
      }
    }
  }
  attr {
    key: "shared_name"
    value {
      s: ""
    }
  }
}
node {
  name: "fifo_queue_enqueue"
  op: "QueueEnqueue"
  input: "fifo_queue"
  input: "random_uniform"
  attr {
    key: "Tcomponents"
    value {
      list {
        type: DT_FLOAT
      }
    }
  }
  attr {
    key: "timeout_ms"
    value {
      i: -1
    }
  }
}
node {
  name: "fifo_queue_Size"
  op: "QueueSize"
  input: "fifo_queue"
}
node {
  name: "fifo_queue_Size_1"
  op: "QueueSize"
  input: "fifo_queue"
}
node {
  name: "fifo_queue_Size_2"
  op: "QueueSize"
  input: "fifo_queue"
}
node {
  name: "input_producer/range/start"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_INT32
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_INT32
        tensor_shape {
        }
        int_val: 0
      }
    }
  }
}
node {
  name: "input_producer/range/limit"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_INT32
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_INT32
        tensor_shape {
        }
        int_val: 10
      }
    }
  }
}
node {
  name: "input_producer/range/delta"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_INT32
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_INT32
        tensor_shape {
        }
        int_val: 1
      }
    }
  }
}
node {
  name: "input_producer/range"
  op: "Range"
  input: "input_producer/range/start"
  input: "input_producer/range/limit"
  input: "input_producer/range/delta"
}
node {
  name: "input_producer/fraction_of_32_full/limit_epochs/Const"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_INT64
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_INT64
        tensor_shape {
        }
        int64_val: 0
      }
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/limit_epochs/epochs"
  op: "Variable"
  attr {
    key: "container"
    value {
      s: ""
    }
  }
  attr {
    key: "dtype"
    value {
      type: DT_INT64
    }
  }
  attr {
    key: "shape"
    value {
      shape {
      }
    }
  }
  attr {
    key: "shared_name"
    value {
      s: ""
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/limit_epochs/epochs/Assign"
  op: "Assign"
  input: "input_producer/fraction_of_32_full/limit_epochs/epochs"
  input: "input_producer/fraction_of_32_full/limit_epochs/Const"
  attr {
    key: "T"
    value {
      type: DT_INT64
    }
  }
  attr {
    key: "_class"
    value {
      list {
        s: "loc:@input_producer/fraction_of_32_full/limit_epochs/epochs"
      }
    }
  }
  attr {
    key: "use_locking"
    value {
      b: true
    }
  }
  attr {
    key: "validate_shape"
    value {
      b: true
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/limit_epochs/epochs/read"
  op: "Identity"
  input: "input_producer/fraction_of_32_full/limit_epochs/epochs"
  attr {
    key: "T"
    value {
      type: DT_INT64
    }
  }
  attr {
    key: "_class"
    value {
      list {
        s: "loc:@input_producer/fraction_of_32_full/limit_epochs/epochs"
      }
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/limit_epochs/CountUpTo"
  op: "CountUpTo"
  input: "input_producer/fraction_of_32_full/limit_epochs/epochs"
  attr {
    key: "T"
    value {
      type: DT_INT64
    }
  }
  attr {
    key: "limit"
    value {
      i: 1
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/limit_epochs"
  op: "Identity"
  input: "input_producer/range"
  input: "^input_producer/fraction_of_32_full/limit_epochs/CountUpTo"
  attr {
    key: "T"
    value {
      type: DT_INT32
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/fraction_of_32_full"
  op: "FIFOQueue"
  attr {
    key: "capacity"
    value {
      i: 32
    }
  }
  attr {
    key: "component_types"
    value {
      list {
        type: DT_INT32
      }
    }
  }
  attr {
    key: "container"
    value {
      s: ""
    }
  }
  attr {
    key: "shapes"
    value {
      list {
        shape {
        }
      }
    }
  }
  attr {
    key: "shared_name"
    value {
      s: ""
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/fraction_of_32_full_EnqueueMany"
  op: "QueueEnqueueMany"
  input: "input_producer/fraction_of_32_full/fraction_of_32_full"
  input: "input_producer/fraction_of_32_full/limit_epochs"
  attr {
    key: "Tcomponents"
    value {
      list {
        type: DT_INT32
      }
    }
  }
  attr {
    key: "timeout_ms"
    value {
      i: -1
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/fraction_of_32_full_Close"
  op: "QueueClose"
  input: "input_producer/fraction_of_32_full/fraction_of_32_full"
  attr {
    key: "cancel_pending_enqueues"
    value {
      b: false
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/fraction_of_32_full_Close_1"
  op: "QueueClose"
  input: "input_producer/fraction_of_32_full/fraction_of_32_full"
  attr {
    key: "cancel_pending_enqueues"
    value {
      b: true
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/fraction_of_32_full_Size"
  op: "QueueSize"
  input: "input_producer/fraction_of_32_full/fraction_of_32_full"
}
node {
  name: "input_producer/fraction_of_32_full/Cast"
  op: "Cast"
  input: "input_producer/fraction_of_32_full/fraction_of_32_full_Size"
  attr {
    key: "DstT"
    value {
      type: DT_FLOAT
    }
  }
  attr {
    key: "SrcT"
    value {
      type: DT_INT32
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/mul/y"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_FLOAT
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_FLOAT
        tensor_shape {
        }
        float_val: 0.03125
      }
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/mul"
  op: "Mul"
  input: "input_producer/fraction_of_32_full/Cast"
  input: "input_producer/fraction_of_32_full/mul/y"
  attr {
    key: "T"
    value {
      type: DT_FLOAT
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/ScalarSummary/tags"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_STRING
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_STRING
        tensor_shape {
        }
        string_val: "queue/input_producer/fraction_of_32_full/fraction_of_32_full/input_producer/"
      }
    }
  }
}
node {
  name: "input_producer/fraction_of_32_full/ScalarSummary"
  op: "ScalarSummary"
  input: "input_producer/fraction_of_32_full/ScalarSummary/tags"
  input: "input_producer/fraction_of_32_full/mul"
  attr {
    key: "T"
    value {
      type: DT_FLOAT
    }
  }
}
versions {
  producer: 9
}

In [10]:
tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)


Out[10]:
[<tensorflow.python.training.queue_runner.QueueRunner at 0x7fadf8b8c090>]

In [11]:
threads = tf.train.start_queue_runners()

In [12]:
sess.graph.version


Out[12]:
32

In [13]:
sess.run(inn.size())


ERROR:tensorflow:Exception in QueueRunner: Attempting to use uninitialized value input_producer/fraction_of_32_full/limit_epochs/epochs
	 [[Node: input_producer/fraction_of_32_full/limit_epochs/CountUpTo = CountUpTo[T=DT_INT64, limit=1, _device="/job:localhost/replica:0/task:0/cpu:0"](input_producer/fraction_of_32_full/limit_epochs/epochs)]]
Caused by op u'input_producer/fraction_of_32_full/limit_epochs/CountUpTo', defined at:
  File "/usr/lib/python2.7/runpy.py", line 162, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/__main__.py", line 3, in <module>
    app.launch_new_instance()
  File "/usr/local/lib/python2.7/dist-packages/traitlets/config/application.py", line 596, in launch_instance
    app.start()
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/kernelapp.py", line 442, in start
    ioloop.IOLoop.instance().start()
  File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/ioloop.py", line 162, in start
    super(ZMQIOLoop, self).start()
  File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 883, in start
    handler_func(fd_obj, events)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 440, in _handle_events
    self._handle_recv()
  File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 472, in _handle_recv
    self._run_callback(callback, msg)
  File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 414, in _run_callback
    callback(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/kernelbase.py", line 276, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/kernelbase.py", line 228, in dispatch_shell
    handler(stream, idents, msg)
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/kernelbase.py", line 391, in execute_request
    user_expressions, allow_stdin)
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/ipkernel.py", line 199, in do_execute
    shell.run_cell(code, store_history=store_history, silent=silent)
  File "/usr/local/lib/python2.7/dist-packages/IPython/core/interactiveshell.py", line 2723, in run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "/usr/local/lib/python2.7/dist-packages/IPython/core/interactiveshell.py", line 2825, in run_ast_nodes
    if self.run_code(code, result):
  File "/usr/local/lib/python2.7/dist-packages/IPython/core/interactiveshell.py", line 2885, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-7-80576f99bb36>", line 1, in <module>
    inn = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/training/input.py", line 221, in range_input_producer
    shared_name, name, "fraction_of_%d_full" % capacity)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/training/input.py", line 131, in input_producer
    input_tensor = limit_epochs(input_tensor, num_epochs)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/training/input.py", line 82, in limit_epochs
    counter = epochs.count_up_to(num_epochs)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variables.py", line 572, in count_up_to
    return state_ops.count_up_to(self._variable, limit=limit)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/gen_state_ops.py", line 110, in count_up_to
    return _op_def_lib.apply_op("CountUpTo", ref=ref, limit=limit, name=name)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/op_def_library.py", line 655, in apply_op
    op_def=op_def)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/ops.py", line 2154, in create_op
    original_op=self._default_original_op, op_def=op_def)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/ops.py", line 1154, in __init__
    self._traceback = _extract_stack()

Out[13]:
0

In [14]:
sess.graph.version


Out[14]:
33
Exception in thread Thread-7:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/training/queue_runner.py", line 185, in _run
    sess.run(enqueue_op)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 340, in run
    run_metadata_ptr)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 564, in _run
    feed_dict_string, options, run_metadata)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 637, in _do_run
    target_list, options, run_metadata)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 659, in _do_call
    e.code)
FailedPreconditionError: Attempting to use uninitialized value input_producer/fraction_of_32_full/limit_epochs/epochs
	 [[Node: input_producer/fraction_of_32_full/limit_epochs/CountUpTo = CountUpTo[T=DT_INT64, limit=1, _device="/job:localhost/replica:0/task:0/cpu:0"](input_producer/fraction_of_32_full/limit_epochs/epochs)]]
Caused by op u'input_producer/fraction_of_32_full/limit_epochs/CountUpTo', defined at:
  File "/usr/lib/python2.7/runpy.py", line 162, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/__main__.py", line 3, in <module>
    app.launch_new_instance()
  File "/usr/local/lib/python2.7/dist-packages/traitlets/config/application.py", line 596, in launch_instance
    app.start()
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/kernelapp.py", line 442, in start
    ioloop.IOLoop.instance().start()
  File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/ioloop.py", line 162, in start
    super(ZMQIOLoop, self).start()
  File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 883, in start
    handler_func(fd_obj, events)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 440, in _handle_events
    self._handle_recv()
  File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 472, in _handle_recv
    self._run_callback(callback, msg)
  File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/zmqstream.py", line 414, in _run_callback
    callback(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/kernelbase.py", line 276, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/kernelbase.py", line 228, in dispatch_shell
    handler(stream, idents, msg)
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/kernelbase.py", line 391, in execute_request
    user_expressions, allow_stdin)
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/ipkernel.py", line 199, in do_execute
    shell.run_cell(code, store_history=store_history, silent=silent)
  File "/usr/local/lib/python2.7/dist-packages/IPython/core/interactiveshell.py", line 2723, in run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "/usr/local/lib/python2.7/dist-packages/IPython/core/interactiveshell.py", line 2825, in run_ast_nodes
    if self.run_code(code, result):
  File "/usr/local/lib/python2.7/dist-packages/IPython/core/interactiveshell.py", line 2885, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-7-80576f99bb36>", line 1, in <module>
    inn = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/training/input.py", line 221, in range_input_producer
    shared_name, name, "fraction_of_%d_full" % capacity)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/training/input.py", line 131, in input_producer
    input_tensor = limit_epochs(input_tensor, num_epochs)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/training/input.py", line 82, in limit_epochs
    counter = epochs.count_up_to(num_epochs)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/variables.py", line 572, in count_up_to
    return state_ops.count_up_to(self._variable, limit=limit)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/gen_state_ops.py", line 110, in count_up_to
    return _op_def_lib.apply_op("CountUpTo", ref=ref, limit=limit, name=name)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/op_def_library.py", line 655, in apply_op
    op_def=op_def)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/ops.py", line 2154, in create_op
    original_op=self._default_original_op, op_def=op_def)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/ops.py", line 1154, in __init__
    self._traceback = _extract_stack()



In [15]:
"input_producer/fraction_of_32_full/fraction_of_32_full_Size:0"


Out[15]:
'input_producer/fraction_of_32_full/fraction_of_32_full_Size:0'

Alternatives to "wait forever"

Set session timeout


In [16]:
tf.reset_default_graph()
queue = tf.FIFOQueue(capacity=5, dtypes=[tf.int32])
config = tf.ConfigProto()
config.operation_timeout_in_ms=2000
sess = tf.InteractiveSession("", config=config)
try:
    sess.run(queue.dequeue())
except tf.errors.DeadlineExceededError:
    print "DeadlineExceededError"


Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x7fadf8b95390>> ignored
DeadlineExceededError

Close the queue before reading


In [17]:
tf.reset_default_graph()
queue = tf.FIFOQueue(capacity=5, dtypes=[tf.int32])
config = tf.ConfigProto()
config.operation_timeout_in_ms=2000
sess = tf.InteractiveSession("", config=config)
sess.run(queue.close())
try: 
    sess.run(queue.dequeue())
except tf.errors.OutOfRangeError as e:
    print "OutOfRangeError"


OutOfRangeError
Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x7fadf4259b90>> ignored

Close the queue before writing


In [18]:
tf.reset_default_graph()
queue = tf.FIFOQueue(capacity=5, dtypes=[tf.int32])
config = tf.ConfigProto()
config.operation_timeout_in_ms=2000
sess = tf.InteractiveSession("", config=config)
sess.run(queue.close())
try:
    sess.run(queue.enqueue(5))
except tf.errors.AbortedError:
    print "AbortedError"


AbortedError
Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x7fadf4259ad0>> ignored

Simple batching example


In [19]:
tf.reset_default_graph()
ranges = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)
number = ranges.dequeue()
numeric_batch = tf.train.batch_join([[number]]*3, batch_size=2)
batch_number = numeric_batch

sess = tf.InteractiveSession(config=tf.ConfigProto(operation_timeout_in_ms=6000))
sess.run(tf.initialize_all_variables())
tf.train.start_queue_runners()


Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x7fadf4259c50>> ignored
Out[19]:
[<Thread(Thread-8, started daemon 140384554444544)>,
 <Thread(Thread-9, started daemon 140384546051840)>,
 <Thread(Thread-10, started daemon 140384537659136)>,
 <Thread(Thread-11, started daemon 140384529266432)>]

In [20]:
tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)


Out[20]:
[<tensorflow.python.training.queue_runner.QueueRunner at 0x7fadf426eed0>,
 <tensorflow.python.training.queue_runner.QueueRunner at 0x7fadf4259e10>]

Python/TensorFlow gotcha


In [21]:
tf.reset_default_graph()
ranges = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)
number = ranges.dequeue()
sess = tf.InteractiveSession(config=tf.ConfigProto(operation_timeout_in_ms=6000))
sess.run(tf.initialize_all_variables())
tf.train.start_queue_runners()

print sess.run([number]*3)


[0, 0, 0]
Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x7fadf4259ad0>> ignored

Solution 1


In [22]:
tf.reset_default_graph()
ranges = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)
number = ranges.dequeue()
sess = tf.InteractiveSession(config=tf.ConfigProto(operation_timeout_in_ms=6000))
sess.run(tf.initialize_all_variables())
tf.train.start_queue_runners()

print sess.run([ranges.dequeue()]*3)


Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x7fadf42529d0>> ignored
[0, 0, 0]

Solution 2


In [23]:
tf.reset_default_graph()
ranges = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)
number = ranges.dequeue()
sess = tf.InteractiveSession(config=tf.ConfigProto(operation_timeout_in_ms=6000))
sess.run(tf.initialize_all_variables())
tf.train.start_queue_runners()

print sess.run([ranges.dequeue(), ranges.dequeue(), ranges.dequeue()])


Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x7fadf4205790>> ignored

NotFoundErrorTraceback (most recent call last)
<ipython-input-23-61885746734a> in <module>()
      6 tf.train.start_queue_runners()
      7 
----> 8 print sess.run([ranges.dequeue(), ranges.dequeue(), ranges.dequeue()])

/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in run(self, fetches, feed_dict, options, run_metadata)
    338     try:
    339       result = self._run(None, fetches, feed_dict, options_ptr,
--> 340                          run_metadata_ptr)
    341       if run_metadata:
    342         proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)

/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in _run(self, handle, fetches, feed_dict, options, run_metadata)
    562     try:
    563       results = self._do_run(handle, target_list, unique_fetches,
--> 564                              feed_dict_string, options, run_metadata)
    565     finally:
    566       # The movers are no longer used. Delete them.

/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in _do_run(self, handle, target_list, fetch_list, feed_dict, options, run_metadata)
    635     if handle is None:
    636       return self._do_call(_run_fn, self._session, feed_dict, fetch_list,
--> 637                            target_list, options, run_metadata)
    638     else:
    639       return self._do_call(_prun_fn, self._session, handle, feed_dict,

/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in _do_call(self, fn, *args)
    657       # pylint: disable=protected-access
    658       raise errors._make_specific_exception(node_def, op, error_message,
--> 659                                             e.code)
    660       # pylint: enable=protected-access
    661 

NotFoundError: FetchOutputs node fraction_of_32_full_Dequeue_3:0: not found