Deep LSTM RNNs for N-D Time Series Analysis with TensorFlow


In [37]:
from __future__ import print_function
import numpy as np
from collections import defaultdict
import pandas as pd
import tensorflow as tf
from tensorflow.contrib.learn import learn_runner
import shutil
from tensorflow import metrics
from tensorflow.contrib.metrics import streaming_root_mean_squared_error
from tensorflow.contrib.learn import Experiment, Estimator, MetricSpec

Data Loading


In [38]:
SEQ_LEN = 10
def create_time_series():
    freq = (np.random.random()*0.5) + 0.1  # 0.1 to 0.6
    ampl = np.random.random() + 0.5  # 0.5 to 1.5
    x = np.sin(np.arange(0,SEQ_LEN) * freq) * ampl
    return x

In [39]:
with open('train.csv', 'w') as train_file:
    for counter in range(0, 100):
        for item in create_time_series():
            train_file.write("%s " % round(item, 3))
        train_file.write("\n")
train_file.close()

with open('test.csv', 'w') as test_file:
    for counter in range(0, 100):
        for item in create_time_series():
            test_file.write("%s " % round(item, 3))
        test_file.write("\n")
test_file.close()

In [40]:
DEFAULTS = [[0.0] for x in xrange(0, SEQ_LEN)]
BATCH_SIZE = 20
TIMESERIES_COL = 'rawdata'
N_OUTPUTS = 2  # in each sequence, 1-8 are features, and 9-10 is label
N_INPUTS = SEQ_LEN - N_OUTPUTS
# read data and convert to needed format

def read_dataset(filename, mode=tf.contrib.learn.ModeKeys.TRAIN):
    def _input_fn():
        num_epochs = 100 if mode == tf.contrib.learn.ModeKeys.TRAIN else 1
        
# could be a path to one file or a file pattern.
filename = "train.csv"
num_epochs = 10
input_file_names = tf.train.match_filenames_once(filename)

filename_queue = tf.train.string_input_producer(input_file_names, num_epochs=num_epochs, shuffle=True)

reader = tf.TextLineReader()
# akey, value = reader.read_up_to(filename_queue, num_records=BATCH_SIZE)
# value_column = tf.expand_dims(value, -1)

In [41]:
## peek through file names that got loaded
# sess = tf.Session()
# init_op = tf.local_variables_initializer()
# sess.run(init_op)
# print(sess.run(input_file_names))

akey, value = reader.read_up_to(filename_queue, num_records=BATCH_SIZE)
value_column = tf.expand_dims(value, -1)
print('value: ', value)

all_data = tf.decode_csv(value_column, record_defaults=DEFAULTS)
print('all_data: ', all_data)
# sess = tf.Session()
# init_op = tf.local_variables_initializer()
# sess.run(all_data)

print(len(all_data))


value:  Tensor("ReaderReadUpToV2_1:1", shape=(?,), dtype=string)
all_data:  [<tf.Tensor 'DecodeCSV_1:0' shape=(?, 1) dtype=float32>, <tf.Tensor 'DecodeCSV_1:1' shape=(?, 1) dtype=float32>, <tf.Tensor 'DecodeCSV_1:2' shape=(?, 1) dtype=float32>, <tf.Tensor 'DecodeCSV_1:3' shape=(?, 1) dtype=float32>, <tf.Tensor 'DecodeCSV_1:4' shape=(?, 1) dtype=float32>, <tf.Tensor 'DecodeCSV_1:5' shape=(?, 1) dtype=float32>, <tf.Tensor 'DecodeCSV_1:6' shape=(?, 1) dtype=float32>, <tf.Tensor 'DecodeCSV_1:7' shape=(?, 1) dtype=float32>, <tf.Tensor 'DecodeCSV_1:8' shape=(?, 1) dtype=float32>, <tf.Tensor 'DecodeCSV_1:9' shape=(?, 1) dtype=float32>]
10

In [42]:
# sess = tf.Session()

In [43]:
# with tf.Session() as sess:
#     init_op = tf.local_variables_initializer()
#     sess.run(init_op)
#     tf.global_variables_initializer().run()
    # adding these 2 lines fixed the hang forever problem
#     coord = tf.train.Coordinator()
#     threads = tf.train.start_queue_runners(sess=sess, coord=coord)

#     sess.run(all_data)

In [44]:
# all_data = tf.decode_csv(value_column, record_defaults=DEFAULTS)
def get_dict_features_and_labels(all_data):
    inputs = all_data[:len(all_data)-N_OUTPUTS]  # first few values
    label = all_data[len(all_data)-N_OUTPUTS : ] # last few values

    # from list of tensors to tensor with one more dimension
    inputs = tf.concat(inputs, axis=1)
    label = tf.concat(label, axis=1)
    print('inputs={}'.format(inputs))

    return {TIMESERIES_COL: inputs}, label   # dict of features, label

d, l = get_dict_features_and_labels(all_data)
print('d: ', d, '\nl:', l)


inputs=Tensor("concat_2:0", shape=(?, 8), dtype=float32)
d:  {'rawdata': <tf.Tensor 'concat_2:0' shape=(?, 8) dtype=float32>} 
l: Tensor("concat_3:0", shape=(?, 2), dtype=float32)

In [45]:
LSTM_SIZE = 3
# create the inference model
def simple_rnn(features, targets, mode):
    # 0. Reformat input shape to become a sequence
    x = tf.split(features[TIMESERIES_COL], N_INPUTS, 1)
    #print 'x={}'.format(x)
    
    # 1. configure the RNN
    lstm_cell = rnn.BasicLSTMCell(LSTM_SIZE, forget_bias=1.0)
    outputs, _ = rnn.static_rnn(lstm_cell, x, dtype=tf.float32)

    # slice to keep only the last cell of the RNN
    outputs = outputs[-1]
    #print 'last outputs={}'.format(outputs)
  
    # output is result of linear activation of last layer of RNN
    weight = tf.Variable(tf.random_normal([LSTM_SIZE, N_OUTPUTS]))
    bias = tf.Variable(tf.random_normal([N_OUTPUTS]))
    predictions = tf.matmul(outputs, weight) + bias
    
    # 2. Define the loss function for training/evaluation
    #print 'targets={}'.format(targets)
    #print 'preds={}'.format(predictions)
    loss = tf.losses.mean_squared_error(targets, predictions)
    eval_metric_ops = {"rmse": tf.metrics.root_mean_squared_error(targets, predictions)}
  
    # 3. Define the training operation/optimizer
    train_op = tf.contrib.layers.optimize_loss(
        loss=loss,
        global_step=tf.contrib.framework.get_global_step(),
        learning_rate=0.01,
        optimizer="SGD")

    # 4. Create predictions
    predictions_dict = {"predicted": predictions}
  
    # 5. return ModelFnOps
    return tflearn.ModelFnOps(
      mode=mode,
      predictions=predictions_dict,
      loss=loss,
      train_op=train_op,
      eval_metric_ops=eval_metric_ops)

In [46]:
def get_train():
    return read_dataset('train.csv', mode=tf.contrib.learn.ModeKeys.TRAIN)

def get_valid():
    return read_dataset('valid.csv', mode=tf.contrib.learn.ModeKeys.EVAL)

def experiment_fn(output_dir):
    # run experiment
    return Experiment(
        Estimator(model_fn=simple_rnn, model_dir=output_dir),
        train_input_fn=get_train(),
        eval_input_fn=get_valid(),
        eval_metrics={
            'rmse': MetricSpec(
                metric_fn=streaming_root_mean_squared_error
            )
        }
    )

In [47]:
shutil.rmtree('outputdir', ignore_errors=True) # start fresh each time
learn_runner.run(experiment_fn, 'outputdir')


INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 600, '_num_ps_replicas': 0, '_keep_checkpoint_max': 5, '_task_type': None, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x11ac66d90>, '_model_dir': 'outputdir', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_session_config': None, '_tf_random_seed': None, '_save_summary_steps': 100, '_environment': 'local', '_num_worker_replicas': 0, '_task_id': 0, '_log_step_count_steps': 100, '_tf_config': gpu_options {
  per_process_gpu_memory_fraction: 1
}
, '_evaluation_master': '', '_master': ''}
WARNING:tensorflow:From /usr/local/lib/python2.7/site-packages/tensorflow/contrib/learn/python/learn/monitors.py:269: __init__ (from tensorflow.contrib.learn.python.learn.monitors) is deprecated and will be removed after 2016-12-05.
Instructions for updating:
Monitors are deprecated. Please use tf.train.SessionRunHook.
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-47-14588ee2163d> in <module>()
      1 shutil.rmtree('outputdir', ignore_errors=True) # start fresh each time
----> 2 learn_runner.run(experiment_fn, 'outputdir')

/usr/local/lib/python2.7/site-packages/tensorflow/contrib/learn/python/learn/learn_runner.pyc in run(experiment_fn, output_dir, schedule, run_config, hparams)
    207   schedule = schedule or _get_default_schedule(run_config)
    208 
--> 209   return _execute_schedule(experiment, schedule)
    210 
    211 

/usr/local/lib/python2.7/site-packages/tensorflow/contrib/learn/python/learn/learn_runner.pyc in _execute_schedule(experiment, schedule)
     44     logging.error('Allowed values for this experiment are: %s', valid_tasks)
     45     raise TypeError('Schedule references non-callable member %s' % schedule)
---> 46   return task()
     47 
     48 

/usr/local/lib/python2.7/site-packages/tensorflow/contrib/learn/python/learn/experiment.pyc in train_and_evaluate(self)
    498             input_fn=self._eval_input_fn, eval_steps=self._eval_steps,
    499             metrics=self._eval_metrics, every_n_steps=self._min_eval_frequency,
--> 500             name=eval_dir_suffix, hooks=self._eval_hooks
    501         )]
    502       self.train(delay_secs=0)

/usr/local/lib/python2.7/site-packages/tensorflow/contrib/learn/python/learn/monitors.pyc in __init__(self, x, y, input_fn, batch_size, eval_steps, every_n_steps, metrics, hooks, early_stopping_rounds, early_stopping_metric, early_stopping_metric_minimize, name)
    604     # TODO(mdan): Checks like this are already done by evaluate.
    605     if x is None and input_fn is None:
--> 606       raise ValueError("Either x or input_fn should be provided.")
    607     self.x = x
    608     self.y = y

ValueError: Either x or input_fn should be provided.

In [ ]:


In [ ]:


In [ ]: