In [ ]:
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Overview

Time-series forecasting problems are ubiquitous throughout the business world and can be posed as a supervised machine learning problem.

A common approach to creating features and labels is to use a sliding window where the features are historical entries and the label(s) represent entries in the future. As any data-scientist that works with time-series knows, this sliding window approach can be tricky to get right.

In this notebook we share a workflow to tackle time-series problems.

Dataset

For this demo we will be using New York City real estate data obtained from nyc.gov. The data starts in 2003. The data can be loaded into BigQuery with the following code:

# Read data. Data was collected from nyc open data repository.
import pandas as pd
dfr = pd.read_csv('https://storage.googleapis.com/asl-testing/data/nyc_open_data_real_estate.csv')

# Upload to BigQuery.
PROJECT = 'YOUR-PROJECT-HERE'
DATASET = 'nyc_real_estate'
TABLE = 'residential_sales'
dfr.to_gbq('{}.{}'.format(DATASET, TABLE), PROJECT)

Objective

The goal of the notebook is to show how to forecast using Pandas and BigQuery. The steps achieve in this notebook are the following:

1 Building a machine learning (ML) forecasting model locally

  • Create features and labels on a subsample of data
  • Train a model using sklearn

2 Building and scaling out a ML using Google BigQuery

  • Create features and labels on the full dataset using BigQuery.
  • Train the model on the entire dataset using BigQuery ML

3 Building an advanced forecasting modeling using recurrent neural network (RNN) model

  • Create features and labels on the full dataset using BigQuery.
  • Train a model using TensorFlow

Costs

This tutorial uses billable components of Google Cloud Platform (GCP):

  • BigQuery
  • Cloud storage
  • AI Platform

The BigQuery and Cloud Storage costs are < \$0.05 and the AI Platform training job uses approximately 0.68 ML units or ~\$0.33.

Pandas: Rolling window for time-series forecasting

We have created a Pandas solution create_rolling_features_label function that automatically creates the features/label setup. This is suitable for smaller datasets and for local testing before training on the Cloud. And we have also created a BigQuery script that creates these rolling windows suitable for large datasets.

Data Exploration

This notebook is self-contained so let's clone the training-data-analyst repo so we can have access to the feature and label creation functions in time_series.py and scalable_time_series.py. We'll be using the pandas_gbq package so make sure that it is installed.


In [ ]:
!pip3 install pandas-gbq

In [13]:
%%bash 
git clone https://github.com/GoogleCloudPlatform/training-data-analyst.git \
 --depth 1

cd training-data-analyst/blogs/gcp_forecasting


Cloning into 'training-data-analyst'...
Checking out files: 100% (3693/3693), done.

After cloning the above repo we can important pandas and our custom module time_series.py.


In [ ]:
%matplotlib inline
import pandas as pd
import pandas_gbq as gbq

from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge

import time_series

In [ ]:
# Allow you to easily have Python variables in SQL query.
from IPython.core.magic import register_cell_magic
from IPython import get_ipython


@register_cell_magic('with_globals')
def with_globals(line, cell):
    contents = cell.format(**globals())
    if 'print' in line:
        print(contents)
    get_ipython().run_cell(contents)

For this demo we will be using New York City real estate data obtained from nyc.gov. This public dataset starts in 2003. The data can be loaded into BigQuery with the following code:


In [ ]:
dfr = pd.read_csv('https://storage.googleapis.com/asl-testing/data/nyc_open_data_real_estate.csv')
 
# Upload to BigQuery.
PROJECT = "[your-project-id]"
DATASET = 'nyc_real_estate'
TABLE = 'residential_sales'
BUCKET = "[your-bucket]"  # Used later.
gbq.to_gbq(dfr, '{}.{}'.format(DATASET, TABLE), PROJECT, if_exists='replace')

Since we are just doing local modeling, let's just use a subsample of the data. Later we will train on all of the data in the cloud.


In [ ]:
SOURCE_TABLE = TABLE
FILTER = '''residential_units = 1 AND sale_price > 10000 
AND sale_date > TIMESTAMP('2010-12-31 00:00:00')'''

In [ ]:
%%with_globals
%%bigquery --project {PROJECT} df
SELECT 
  borough,
  neighborhood,
  building_class_category,
  tax_class_at_present,
  block,
  lot,
  ease_ment,
  building_class_at_present,
  address,
  apartment_number,
  zip_code,
  residential_units,
  commercial_units,total_units,
  land_square_feet,
  gross_square_feet,
  year_built,
  tax_class_at_time_of_sale,
  building_class_at_time_of_sale,
  sale_price,
  sale_date,
  price_per_sq_ft
FROM
  {SOURCE_TABLE}
WHERE
  {FILTER}
ORDER BY
  sale_date
LIMIT
  100

In [ ]:
df.head()

In [ ]:
%%with_globals
%%bigquery --project {PROJECT} df
SELECT
  neighborhood,
  COUNT(*) AS cnt
FROM
  {SOURCE_TABLE}
WHERE
  {FILTER}
GROUP BY
  neighborhood
ORDER BY
  cnt

The most sales are from the upper west side, midtown west, and the upper east side.


In [ ]:
ax = df.set_index('neighborhood').cnt\
    .tail(10)\
    .plot(kind='barh');
ax.set_xlabel('total sales');

SOHO and Civic Center are the most expensive neighborhoods.


In [ ]:
%%with_globals
%%bigquery --project {PROJECT} df
SELECT
  neighborhood,
  APPROX_QUANTILES(sale_price, 100)[
OFFSET
  (50)] AS median_price
FROM
  {SOURCE_TABLE}
WHERE
  {FILTER}
GROUP BY
  neighborhood
ORDER BY
  median_price

In [ ]:
ax = df.set_index('neighborhood').median_price\
    .tail(10)\
    .plot(kind='barh');
ax.set_xlabel('median price')

Build features

Let's create features for building a machine learning model:

  1. Aggregate median sales for each week. Prices are noisy and by grouping by week, we will smooth out irregularities.
  2. Create a rolling window to split the single long time series into smaller windows. One feature vector will contain a single window and the label will be a single observation (or window for multiple predictions) occuring after the window.

In [ ]:
%%with_globals
%%bigquery --project asl-testing-217717 df
SELECT
  sale_week,
  APPROX_QUANTILES(sale_price, 100)[
OFFSET
  (50)] AS median_price
FROM (
  SELECT
    TIMESTAMP_TRUNC(sale_date, week) AS sale_week,
    sale_price
  FROM
    {SOURCE_TABLE}
  WHERE
    {FILTER})
GROUP BY
  sale_week
ORDER BY
  sale_week

In [ ]:
sales = pd.Series(df.median_price)
sales.index= pd.DatetimeIndex(df.sale_week.dt.date)
sales.head()

In [ ]:
ax = sales.plot(figsize=(8,4), label='median_price')
ax = sales.rolling(10).mean().plot(ax=ax, label='10 week rolling average')
ax.legend()

Sliding window

Let's create our features. We will use the create_rolling_features_label function that automatically creates the features/label setup.

Create the features and labels.


In [ ]:
WINDOW_SIZE = 52 * 1
HORIZON = 4*6
MONTHS = 0
WEEKS = 1
LABELS_SIZE = 1

In [ ]:
df = time_series.create_rolling_features_label(sales, window_size=WINDOW_SIZE, pred_offset=HORIZON)
df = time_series.add_date_features(df, df.index, months=MONTHS, weeks=WEEKS)
df.head()

Let's train our model using all weekly median prices from 2003 -- 2015. Then we will test our model's performance on prices from 2016 -- 2018


In [ ]:
# Features, label.
X = df.drop('label', axis=1)
y = df['label']

# Train/test split. Splitting on time.
train_ix = time_series.is_between_dates(y.index,
                                  end='2015-12-30')
test_ix = time_series.is_between_dates(y.index,
                                 start='2015-12-30',
                                 end='2018-12-30 08:00:00')
X_train, y_train = X.iloc[train_ix], y.iloc[train_ix]
X_test, y_test = X.iloc[test_ix], y.iloc[test_ix]

print(X_train.shape, X_test.shape)

Z-score normalization for the features for training.


In [ ]:
mean = X_train.mean()
std = X_train.std()

def zscore(X):
    return (X-mean)/std

X_train = zscore(X_train)
X_test = zscore(X_test)

Initial model

Baseline

Build naive model that just uses the mean of training set.


In [ ]:
df_baseline = y_test.to_frame(name='label')
df_baseline['pred'] = y_train.mean()

In [ ]:
# Join mean predictions with test labels.
baseline_global_metrics = time_series.Metrics(df_baseline.pred,
                                              df_baseline.label)
baseline_global_metrics.report("Global Baseline Model")

In [ ]:
# Train model.
cl = RandomForestRegressor(n_estimators=500, max_features='sqrt',
                              random_state=10, criterion='mse')
cl = Ridge(100)
cl = GradientBoostingRegressor ()

cl.fit(X_train, y_train)
pred = cl.predict(X_test)
random_forest_metrics = time_series.Metrics(y_test,
                                            pred)
random_forest_metrics.report("Forest Model")

The regression model performs 35% better than the baseline model.

Observations:

  • Linear Regression does okay for this dataset (Regularization helps generalize the model)
  • Random Forest is better -- doesn't require a lot of tuning. It performs a bit better than regression.
  • Gradient Boosting does do better than regression

Interpret results


In [ ]:
# Data frame to query for plotting
df_res = pd.DataFrame({'pred': pred, 'baseline': df_baseline.pred, 'y_test': y_test})
metrics = time_series.Metrics(df_res.y_test, df_res.pred)
ax = df_res.iloc[:].plot(y=[ 'pred', 'y_test'],
                 style=['b-','k-'],
                 figsize=(10,5))
ax.set_title('rmse: {:2.2f}'.format(metrics.rmse), size=16);
ax.set_ylim(20,)
df_res.head()

BigQuery modeling

We have observed there is signal in our data and our smaller, local model is working better. Let's scale this model out to the cloud. Let's train a BigQuery Machine Learning (BQML) on the full dataset.

Set up your GCP project

The following steps are required, regardless of your notebook environment.

  1. Select or create a GCP project.

  2. Make sure that billing is enabled for your project.

  3. BigQuery is automatically enabled in new projects. To activate BigQuery in a pre-existing project, go to Enable the BigQuery API.

  4. Enter your project ID in the cell below.

Authenticate your GCP account

If you are using AI Platform Notebooks, your environment is already authenticated. Skip this step.

If you are using Colab, run the cell below and follow the instructions when prompted to authenticate your account via oAuth.

Otherwise, follow these steps:

  1. In the GCP Console, go to the Create service account key page.

  2. From the Service account drop-down list, select New service account.

  3. In the Service account name field, enter a name.

  4. From the Role drop-down list, select BigQuery > BigQuery Admin and Storage > Storage Object Admin.

  5. Click Create. A JSON file that contains your key downloads to your computer.

  6. Enter the path to your service account key as the GOOGLE_APPLICATION_CREDENTIALS variable in the cell below.

Import libraries

Import supporting modules:


In [ ]:
# Import BigQuery module
from google.cloud import bigquery

# Import external custom module containing SQL queries
import scalable_time_series

In [ ]:
# Define hyperparameters
value_name = "med_sales_price"
downsample_size = 7 # 7 days into 1 week
window_size = 52
labels_size = 1
horizon = 1

In [ ]:
# Construct a BigQuery client object.
client = bigquery.Client()

# Set dataset_id to the ID of the dataset to create.
sink_dataset_name = "temp_forecasting_dataset"
dataset_id = "{}.{}".format(client.project, sink_dataset_name)

# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset.from_string(dataset_id)

# Specify the geographic location where the dataset should reside.
dataset.location = "US"

# Send the dataset to the API for creation.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
try:
    dataset = client.create_dataset(dataset)  # API request
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
except Exception as e:
    print("Dataset {}.{} already exists".format(
        client.project, dataset.dataset_id))

We need to create a date range table in BigQuery so that we can join our data to that to get the correct sequences.


In [ ]:
# Call BigQuery and examine in dataframe
source_dataset = "nyc_real_estate"
source_table_name = "all_sales"
query_create_date_range = scalable_time_series.create_date_range(
    client.project, source_dataset, source_table_name)
df = client.query(query_create_date_range + "LIMIT 100").to_dataframe()
df.head(5)

Execute query and write to BigQuery table.


In [ ]:
job_config = bigquery.QueryJobConfig()
# Set the destination table
table_name = "start_end_timescale_date_range"
table_ref = client.dataset(sink_dataset_name).table(table_name)
job_config.destination = table_ref
job_config.write_disposition = "WRITE_TRUNCATE"

# Start the query, passing in the extra configuration.
query_job = client.query(
    query=query_create_date_range,
    # Location must match that of the dataset(s) referenced in the query
    # and of the destination table.
    location="US",
    job_config=job_config)  # API request - starts the query

query_job.result()  # Waits for the query to finish
print("Query results loaded to table {}".format(table_ref.path))

Now that we have the date range table created we can create our training dataset for BQML.


In [ ]:
# Call BigQuery and examine in dataframe
sales_dataset_table = source_dataset + "." + source_table_name
query_bq_sub_sequences = scalable_time_series.bq_create_rolling_features_label(
    client.project, sink_dataset_name, table_name, sales_dataset_table,
    value_name, downsample_size, window_size, horizon, labels_size)
print(query_bq_sub_sequences[0:500])

In [ ]:
%%with_globals
%%bigquery --project $PROJECT
{query_bq_sub_sequences}
LIMIT 100

Create BigQuery dataset

Prior to now we've just been reading an existing BigQuery table, now we're going to create our own so so we need some place to put it. In BigQuery parlance, Dataset means a folder for tables.

We will take advantage of BigQuery's Python Client to create the dataset.


In [ ]:
bq = bigquery.Client(project = PROJECT)

dataset = bigquery.Dataset(bq.dataset("bqml_forecasting"))
try:
    bq.create_dataset(dataset) # will fail if dataset already exists
    print("Dataset created")
except:
    print("Dataset already exists")

Split dataset into a train and eval set.


In [ ]:
feature_list = ["price_ago_{time}".format(time=time)
                for time in range(window_size, 0, -1)]
label_list = ["price_ahead_{time}".format(time=time)
              for time in range(1, labels_size + 1)]
select_list = ",".join(feature_list + label_list)
select_string = "SELECT {select_list} FROM ({query})".format(
    select_list=select_list,
    query=query_bq_sub_sequences)
concat_vars = []
concat_vars.append("CAST(feat_seq_start_date AS STRING)")
concat_vars.append("CAST(lab_seq_end_date AS STRING)")
farm_finger = "FARM_FINGERPRINT(CONCAT({concat_vars}))".format(
    concat_vars=", ".join(concat_vars))
sampling_clause = "ABS(MOD({farm_finger}, 100))".format(
    farm_finger=farm_finger)
bqml_train_query = "{select_string} WHERE {sampling_clause} < 80".format(
    select_string=select_string, sampling_clause=sampling_clause)
bqml_eval_query = "{select_string} WHERE {sampling_clause} >= 80".format(
    select_string=select_string, sampling_clause=sampling_clause)

Create model

To create a model

  1. Use CREATE MODEL and provide a destination table for resulting model. Alternatively we can use CREATE OR REPLACE MODEL which allows overwriting an existing model.
  2. Use OPTIONS to specify the model type (linear_reg or logistic_reg). There are many more options we could specify, such as regularization and learning rate, but we'll accept the defaults.
  3. Provide the query which fetches the training data

Have a look at Step Two of this tutorial to see another example.

The query will take about two minutes to complete


In [ ]:
%%with_globals
%%bigquery --project $PROJECT
CREATE or REPLACE MODEL bqml_forecasting.nyc_real_estate
OPTIONS(model_type = "linear_reg",
        input_label_cols = ["price_ahead_1"]) AS
{bqml_train_query}

Get training statistics

Because the query uses a CREATE MODEL statement to create a table, you do not see query results. The output is an empty string.

To get the training results we use the ML.TRAINING_INFO function.

Have a look at Step Three and Four of this tutorial to see a similar example.


In [ ]:
%%bigquery --project $PROJECT
SELECT
    {select_list}
FROM
    ML.TRAINING_INFO(MODEL `bqml_forecasting.nyc_real_estate`)

'eval_loss' is reported as mean squared error, so our RMSE is 291178. Your results may vary.


In [ ]:
%%with_globals
%%bigquery --project $PROJECT
#standardSQL
SELECT
  {select_list}
FROM
  ML.EVALUATE(MODEL `bqml_forecasting.nyc_real_estate`, ({bqml_eval_query}))

Predict

To use our model to make predictions, we use ML.PREDICT. Let's, use the nyc_real_estate you trained above to infer median sales price of all of our data.

Have a look at Step Five of this tutorial to see another example.


In [ ]:
%%with_globals
%%bigquery --project $PROJECT df
#standardSQL
SELECT
    predicted_price_ahead_1
FROM
    ML.PREDICT(MODEL `bqml_forecasting.nyc_real_estate`, ({bqml_eval_query}))

TensorFlow Sequence Model

If you might want to use a more custom model, then Keras or TensorFlow may be helpful. Below we are going to create a custom LSTM sequence-to-one model that will read our input data in via CSV files and will train and evaluate.

Create temporary BigQuery dataset


In [ ]:
# Construct a BigQuery client object.
client = bigquery.Client()

# Set dataset_id to the ID of the dataset to create.
sink_dataset_name = "temp_forecasting_dataset"
dataset_id = "{}.{}".format(client.project, sink_dataset_name)

# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset.from_string(dataset_id)

# Specify the geographic location where the dataset should reside.
dataset.location = "US"

# Send the dataset to the API for creation.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
try:
    dataset = client.create_dataset(dataset)  # API request
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
except:
    print("Dataset {}.{} already exists".format(
        client.project, dataset.dataset_id))

We need to create a date range table in BigQuery so that we can join our data to that to get the correct sequences.


In [ ]:
# Call BigQuery and examine in dataframe
source_dataset = "nyc_real_estate"
source_table_name = "all_sales"
query_create_date_range = scalable_time_series.create_date_range(
    client.project, source_dataset, source_table_name)
df = client.query(query_create_date_range + "LIMIT 100").to_dataframe()
df.head(5)

Execute query and write to BigQuery table.


In [ ]:
job_config = bigquery.QueryJobConfig()
# Set the destination table
table_name = "start_end_timescale_date_range"
table_ref = client.dataset(sink_dataset_name).table(table_name)
job_config.destination = table_ref
job_config.write_disposition = "WRITE_TRUNCATE"

# Start the query, passing in the extra configuration.
query_job = client.query(
    query=query_create_date_range,
    # Location must match that of the dataset(s) referenced in the query
    # and of the destination table.
    location="US",
    job_config=job_config)  # API request - starts the query

query_job.result()  # Waits for the query to finish
print("Query results loaded to table {}".format(table_ref.path))

Now that we have the date range table created we can create our training dataset.


In [ ]:
# Call BigQuery and examine in dataframe
sales_dataset_table = source_dataset + "." + source_table_name
downsample_size = 7
query_csv_sub_seqs = scalable_time_series.csv_create_rolling_features_label(
    client.project, sink_dataset_name, table_name, sales_dataset_table,
    value_name, downsample_size, window_size, horizon, labels_size)
df = client.query(query_csv_sub_seqs + "LIMIT 100").to_dataframe()
df.head(20)

Now let's write the our training data table to BigQuery for train and eval so that we can export to CSV for TensorFlow.


In [ ]:
job_config = bigquery.QueryJobConfig()
csv_select_list = "med_sales_price_agg, labels_agg"
for step in ["train", "eval"]:
    if step == "train":
        selquery = "SELECT {csv_select_list} FROM ({}) WHERE {} < 80".format(
            query_csv_sub_sequences, sampling_clause)
    else:
        selquery = "SELECT {csv_select_list} FROM ({}) WHERE {} >= 80".format(
            query_csv_sub_sequences, sampling_clause)
    # Set the destination table
    table_name = "nyc_real_estate_{}".format(step)
    table_ref = client.dataset(sink_dataset_name).table(table_name)
    job_config.destination = table_ref
    job_config.write_disposition = "WRITE_TRUNCATE"

    # Start the query, passing in the extra configuration.
    query_job = client.query(
        query=selquery,
        # Location must match that of the dataset(s) referenced in the query
        # and of the destination table.
        location="US",
        job_config=job_config)  # API request - starts the query

    query_job.result()  # Waits for the query to finish
    print("Query results loaded to table {}".format(table_ref.path))

Export BigQuery table to CSV in GCS.


In [ ]:
dataset_ref = client.dataset(
    dataset_id=sink_dataset_name, project=client.project)

for step in ["train", "eval"]:
    destination_uri = "gs://{}/{}".format(
        BUCKET, "forecasting/nyc_real_estate/data/{}*.csv".format(step))
    table_name = "nyc_real_estate_{}".format(step)
    table_ref = dataset_ref.table(table_name)
    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        # Location must match that of the source table.
        location="US",
    )  # API request
    extract_job.result()  # Waits for job to complete.

    print("Exported {}:{}.{} to {}".format(
        client.project, sink_dataset_name, table_name, destination_uri))

In [ ]:
!gsutil -m cp gs://asl-testing-bucket/forecasting/nyc_real_estate/data/*.csv .

In [ ]:
!head train*.csv

Train TensorFlow on Google Cloud AI Platform.


In [ ]:
import os
PROJECT = PROJECT # REPLACE WITH YOUR PROJECT ID
BUCKET = BUCKET # REPLACE WITH A BUCKET NAME
REGION = "us-central1" # REPLACE WITH YOUR REGION e.g. us-central1

# Import os environment variables
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] =  BUCKET
os.environ["REGION"] = REGION
os.environ["TF_VERSION"] = "1.13"
os.environ["SEQ_LEN"] = str(WINDOW_SIZE)

In [ ]:
%%bash
OUTDIR=gs://$BUCKET/forecasting/nyc_real_estate/trained_model
JOBNAME=nyc_real_estate$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ai-platform jobs submit training $JOBNAME \
  --region=$REGION \
  --module-name=trainer.task \
  --package-path=$PWD/tf_module/trainer \
  --job-dir=$OUTDIR \
  --staging-bucket=gs://$BUCKET \
  --scale-tier=STANDARD_1 \
  --runtime-version=$TF_VERSION \
  -- \
  --train_file_pattern="gs://asl-testing-bucket/forecasting/nyc_real_estate/data/train*.csv" \
  --eval_file_pattern="gs://asl-testing-bucket/forecasting/nyc_real_estate/data/eval*.csv" \
  --output_dir=$OUTDIR \
  --job-dir=./tmp \
  --seq_len=$SEQ_LEN \
  --train_batch_size=32 \
  --eval_batch_size=32 \
  --train_steps=1000 \
  --learning_rate=0.01 \
  --start_delay_secs=60 \
  --throttle_secs=60 \
  --lstm_hidden_units="32,16,8"