Time series prediction using RNNs + Estimators

This notebook illustrates how to:

  1. Creating a Recurrent Neural Network in TensorFlow
  2. Creating a Custom Estimator in tf.contrib.learn

Dependecies


In [1]:
#!/usr/bin/env python

# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# original code from: https://github.com/GoogleCloudPlatform/training-data-analyst/tree/master/blogs/timeseries
# modified by: Marianne Linhares, monteirom@google.com, May 2017

# tensorflow
import tensorflow as tf
import tensorflow.contrib.learn as tflearn
import tensorflow.contrib.layers as tflayers
from tensorflow.contrib.learn.python.learn import learn_runner
import tensorflow.contrib.metrics as metrics
import tensorflow.contrib.rnn as rnn

# visualization
import seaborn as sns
import matplotlib.pyplot as plt

# helpers
import numpy as np
import csv

# enable tensorflow logs
tf.logging.set_verbosity(tf.logging.INFO)

Generating time-series data

Essentially a set of sinusoids with random amplitudes and frequencies.

Each series will consist of 10 (SEQ_LEN) numbers.


In [6]:
TRAIN = 10000
VALID = 50
TEST = 5

SEQ_LEN = 10

# helpers
def create_time_series():
    freq = (np.random.random()*0.5) + 0.1  # 0.1 to 0.6
    ampl = np.random.random() + 0.5  # 0.5 to 1.5
    x = np.sin(np.arange(0,SEQ_LEN) * freq) * ampl
    return x

def to_csv(filename, N):
    with open(filename, 'w') as ofp:
        for lineno in range(0, N):
            seq = create_time_series()
            line = ",".join(map(str, seq))
            ofp.write(line + '\n')

def read_csv(filename):
    with open(filename, 'rt') as csvfile:
        reader = csv.reader(csvfile)
        data = []
        for row in reader:
            data.append([float(x) for x in row])
        return data
            
# Creating datasets
to_csv('train.csv', TRAIN)
to_csv('valid.csv', VALID)
to_csv('test.csv', TEST)

# Example
for i in range(5): sns.tsplot(create_time_series())
plt.show()


Read datasets


In [39]:
DEFAULTS = [[0.0] for x in range(0, SEQ_LEN)]
BATCH_SIZE = 20
TIMESERIES_COL = 'rawdata'
N_OUTPUTS = 2  # in each sequence, 1-8 are features, and 9-10 is label
N_INPUTS = SEQ_LEN - N_OUTPUTS

# -------- read data and convert to needed format -----------
def read_dataset(filename, mode=tf.estimator.ModeKeys.TRAIN):  
    num_epochs = 100 if mode == tf.estimator.ModeKeys.TRAIN else 1

    data = np.asarray(read_csv(filename))

    # all_data is a list of tensors
    x = []
    y = []
    for row in data:
        x.append(np.asarray(row[:len(row)-N_OUTPUTS], dtype=np.float32))
        y.append(np.asarray(data[len(row)-N_OUTPUTS: ], dtype=np.float32))

    x = np.asarray(x, dtype=np.float32)
    y = np.asarray(y, dtype=np.float32)
    #print(inputs)
    # from list of tensors to tensor with one more dimension
    #x = tf.concat(inputs, axis=1)
    #y = tf.concat(label, axis=1)
    #print(inputs)
    #print('inputs={}'.format(inputs))
    
    return tf.contrib.learn.io.numpy_input_fn({TIMESERIES_COL: x}, y, num_epochs=num_epochs)

def get_train():
  return read_dataset('train.csv', mode=tf.estimator.ModeKeys.TRAIN)

def get_valid():
  return read_dataset('valid.csv', mode=tf.estimator.ModeKeys.EVAL)

def get_test():
  return read_dataset('test.csv', mode=tf.estimator.ModeKeys.EVAL)

RNN Model


In [8]:
LSTM_SIZE = 3  # number of hidden layers in each of the LSTM cells

def simple_rnn(features, targets, mode, params):
  # 0. Reformat input shape to become a sequence
  x = tf.split(features[TIMESERIES_COL], N_INPUTS, 1)
    
  # 1. configure the RNN
  lstm_cell = rnn.BasicLSTMCell(LSTM_SIZE, forget_bias=1.0)
  outputs, _ = rnn.static_rnn(lstm_cell, x, dtype=tf.float32)

  # slice to keep only the last cell of the RNN
  outputs = outputs[-1]
  #print 'last outputs={}'.format(outputs)
  
  # output is result of linear activation of last layer of RNN
  weight = tf.Variable(tf.random_normal([LSTM_SIZE, N_OUTPUTS]))
  bias = tf.Variable(tf.random_normal([N_OUTPUTS]))
  predictions = tf.matmul(outputs, weight) + bias
    
  # 2. Define the loss function for training/evaluation
  #print 'targets={}'.format(targets)
  #print 'preds={}'.format(predictions)
  loss = tf.losses.mean_squared_error(targets, predictions)
  eval_metric_ops = {
      "rmse": tf.metrics.root_mean_squared_error(targets, predictions)
  }
  
  # 3. Define the training operation/optimizer
  train_op = tf.contrib.layers.optimize_loss(
      loss=loss,
      global_step=tf.contrib.framework.get_global_step(),
      learning_rate=0.01,
      optimizer="SGD")

  # 4. Create predictions
  predictions_dict = {"predicted": predictions}
  
  # 5. return ModelFnOps
  return tflearn.ModelFnOps(
      mode=mode,
      predictions=predictions_dict,
      loss=loss,
      train_op=train_op,
      eval_metric_ops=eval_metric_ops)

def serving_input_fn():
    feature_placeholders = {
        TIMESERIES_COL: tf.placeholder(tf.float32, [None, N_INPUTS])
    }
  
    features = {
      key: tf.expand_dims(tensor, -1)
      for key, tensor in feature_placeholders.items()
    }

    return tflearn.utils.input_fn_utils.InputFnOps(
      features,
      None,
      feature_placeholders
    )

Running model


In [40]:
nn = tf.contrib.learn.Estimator(model_fn=simple_rnn)

# ---------- Training -------------
print('---------- Training ------------')
nn.fit(input_fn=get_train(), steps=10000)

# ---------- Evaluating -------------
print('---------- Evaluating ------------')
ev = nn.evaluate(input_fn=get_valid())
print(ev)

# ---------- Testing ----------------
print('---------- Testing ------------')
predictions = []
for p in nn.predict(input_fn=get_test()):
    print(p)
    predictions.append(p["predicted"])


INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': None, '_keep_checkpoint_max': 5, '_save_checkpoints_secs': 600, '_num_worker_replicas': 0, '_environment': 'local', '_master': '', '_tf_config': gpu_options {
  per_process_gpu_memory_fraction: 1
}
, '_task_type': None, '_evaluation_master': '', '_tf_random_seed': None, '_task_id': 0, '_is_chief': True, '_keep_checkpoint_every_n_hours': 10000, '_save_checkpoints_steps': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f83e5cb3080>, '_save_summary_steps': 100, '_num_ps_replicas': 0}
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpujycpqn2
WARNING:tensorflow:Estimator's model_fn (<function simple_rnn at 0x7f83e5baf158>) includes params argument, but params are not passed to Estimator.
---------- Training ------------
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/usr/local/lib/python3.4/dist-packages/tensorflow/python/framework/op_def_library.py in apply_op(self, op_type_name, name, **keywords)
    435                 preferred_dtype=default_dtype,
--> 436                 as_ref=input_arg.is_ref)
    437             if input_arg.number_attr and len(

/usr/local/lib/python3.4/dist-packages/tensorflow/python/framework/ops.py in internal_convert_n_to_tensor(values, dtype, name, as_ref, preferred_dtype)
    764             as_ref=as_ref,
--> 765             preferred_dtype=preferred_dtype))
    766   return ret

/usr/local/lib/python3.4/dist-packages/tensorflow/python/framework/ops.py in internal_convert_to_tensor(value, dtype, name, as_ref, preferred_dtype)
    703         if ret is None:
--> 704           ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref)
    705 

/usr/local/lib/python3.4/dist-packages/tensorflow/python/framework/ops.py in _TensorTensorConversionFunction(t, dtype, name, as_ref)
    576         "Tensor conversion requested dtype %s for Tensor with dtype %s: %r"
--> 577         % (dtype.name, t.dtype.name, str(t)))
    578   return t

ValueError: Tensor conversion requested dtype float64 for Tensor with dtype float32: 'Tensor("rnn/BasicLSTMCellZeroState/zeros_1:0", shape=(?, 3), dtype=float32)'

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-40-a1eb564e7b64> in <module>()
      3 # ---------- Training -------------
      4 print('---------- Training ------------')
----> 5 nn.fit(input_fn=get_train(), steps=10000)
      6 
      7 # ---------- Evaluating -------------

/usr/local/lib/python3.4/dist-packages/tensorflow/python/util/deprecation.py in new_func(*args, **kwargs)
    279             _call_location(), decorator_utils.get_qualified_name(func),
    280             func.__module__, arg_name, date, instructions)
--> 281       return func(*args, **kwargs)
    282     new_func.__doc__ = _add_deprecated_arg_notice_to_docstring(
    283         func.__doc__, date, instructions)

/usr/local/lib/python3.4/dist-packages/tensorflow/contrib/learn/python/learn/estimators/estimator.py in fit(self, x, y, input_fn, steps, batch_size, monitors, max_steps)
    428       hooks.append(basic_session_run_hooks.StopAtStepHook(steps, max_steps))
    429 
--> 430     loss = self._train_model(input_fn=input_fn, hooks=hooks)
    431     logging.info('Loss for final step: %s.', loss)
    432     return self

/usr/local/lib/python3.4/dist-packages/tensorflow/contrib/learn/python/learn/estimators/estimator.py in _train_model(self, input_fn, hooks)
    925       features, labels = input_fn()
    926       self._check_inputs(features, labels)
--> 927       model_fn_ops = self._get_train_ops(features, labels)
    928       ops.add_to_collection(ops.GraphKeys.LOSSES, model_fn_ops.loss)
    929       all_hooks.extend([

/usr/local/lib/python3.4/dist-packages/tensorflow/contrib/learn/python/learn/estimators/estimator.py in _get_train_ops(self, features, labels)
   1130       `ModelFnOps` object.
   1131     """
-> 1132     return self._call_model_fn(features, labels, model_fn_lib.ModeKeys.TRAIN)
   1133 
   1134   def _get_eval_ops(self, features, labels, metrics):

/usr/local/lib/python3.4/dist-packages/tensorflow/contrib/learn/python/learn/estimators/estimator.py in _call_model_fn(self, features, labels, mode)
   1101     if 'model_dir' in model_fn_args:
   1102       kwargs['model_dir'] = self.model_dir
-> 1103     model_fn_results = self._model_fn(features, labels, **kwargs)
   1104 
   1105     if isinstance(model_fn_results, model_fn_lib.ModelFnOps):

<ipython-input-8-d4e0b0fbefdf> in simple_rnn(features, targets, mode, params)
      7   # 1. configure the RNN
      8   lstm_cell = rnn.BasicLSTMCell(LSTM_SIZE, forget_bias=1.0)
----> 9   outputs, _ = rnn.static_rnn(lstm_cell, x, dtype=tf.float32)
     10 
     11   # slice to keep only the last cell of the RNN

/usr/local/lib/python3.4/dist-packages/tensorflow/contrib/rnn/python/ops/core_rnn.py in static_rnn(cell, inputs, initial_state, dtype, sequence_length, scope)
    195             state_size=cell.state_size)
    196       else:
--> 197         (output, state) = call_cell()
    198 
    199       outputs.append(output)

/usr/local/lib/python3.4/dist-packages/tensorflow/contrib/rnn/python/ops/core_rnn.py in <lambda>()
    182       if time > 0: varscope.reuse_variables()
    183       # pylint: disable=cell-var-from-loop
--> 184       call_cell = lambda: cell(input_, state)
    185       # pylint: enable=cell-var-from-loop
    186       if sequence_length is not None:

/usr/local/lib/python3.4/dist-packages/tensorflow/contrib/rnn/python/ops/core_rnn_cell_impl.py in __call__(self, inputs, state, scope)
    239       else:
    240         c, h = array_ops.split(value=state, num_or_size_splits=2, axis=1)
--> 241       concat = _linear([inputs, h], 4 * self._num_units, True)
    242 
    243       # i = input_gate, j = new_input, f = forget_gate, o = output_gate

/usr/local/lib/python3.4/dist-packages/tensorflow/contrib/rnn/python/ops/core_rnn_cell_impl.py in _linear(args, output_size, bias, bias_start)
   1046       res = math_ops.matmul(args[0], weights)
   1047     else:
-> 1048       res = math_ops.matmul(array_ops.concat(args, 1), weights)
   1049     if not bias:
   1050       return res

/usr/local/lib/python3.4/dist-packages/tensorflow/python/ops/array_ops.py in concat(values, axis, name)
   1032   return gen_array_ops._concat_v2(values=values,
   1033                                   axis=axis,
-> 1034                                   name=name)
   1035 
   1036 

/usr/local/lib/python3.4/dist-packages/tensorflow/python/ops/gen_array_ops.py in _concat_v2(values, axis, name)
    517   """
    518   result = _op_def_lib.apply_op("ConcatV2", values=values, axis=axis,
--> 519                                 name=name)
    520   return result
    521 

/usr/local/lib/python3.4/dist-packages/tensorflow/python/framework/op_def_library.py in apply_op(self, op_type_name, name, **keywords)
    462                                 (prefix, dtype.name))
    463               else:
--> 464                 raise TypeError("%s that don't all match." % prefix)
    465             else:
    466               raise TypeError("%s that are invalid." % prefix)

TypeError: Tensors in list passed to 'values' of 'ConcatV2' Op have types [float64, float32] that don't all match.

Visualizing predictions


In [105]:
test_data = read_csv('test.csv')

# update predictions with features
# preds = test_data[:INPUTS] concat with predictions
preds = [] 
for i in range(len(predictions)):
    preds.append(list(test_data[i][:N_INPUTS]) + list(predictions[i]))

# visualizing predictions
for d in test_data: sns.tsplot(d[N_INPUTS:])
for p in preds: sns.tsplot(p[N_INPUTS:], color="red")
plt.show()    
    
# visualizing all the series
for d in test_data: sns.tsplot(d)
for p in preds: sns.tsplot(p, color="red")
plt.show()