Feed Data with Queue from HDFS

Populate HDFS with Sample Dataset


In [ ]:
%%bash

hadoop fs -copyFromLocal /root/datasets/linear /

In [ ]:
%%bash

hadoop fs -ls /linear

Create TensorFlow Session


In [ ]:
import tensorflow as tf

tf.reset_default_graph()

sess = tf.Session()
print(sess)

Create Queue and Feed Tensorflow from HDFS

The HDFS Namenode is running locally and listening on port 39000.


In [ ]:
filename_queue = tf.train.string_input_producer([
  "hdfs://127.0.0.1:39000/linear/training.csv",
  "hdfs://127.0.0.1:39000/linear/validation.csv",
])

Parse HDFS File(s)


In [ ]:
reader = tf.TextLineReader()
filename, text = reader.read(filename_queue)
x_observed, y_observed = tf.decode_csv(text, [[0.0],[0.0]])

In [ ]:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, 
                                       coord=coord)
n = 20

print('First %s Training Examples...' % n)
print('')

from tabulate import tabulate

examples = []
try:

    for i in range(n):
        features, label = sess.run([x_observed, y_observed])
        examples.append([features, label])
    print(tabulate(examples, headers=["x_observed", "y_observed"]))

finally:
    coord.request_stop()
    coord.join(threads)