In [ ]:
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Time-series forecasting problems are ubiquitous throughout the business world and can be posed as a supervised machine learning problem.
A common approach to creating features and labels is to use a sliding window where the features are historical entries and the label(s) represent entries in the future. As any data-scientist that works with time-series knows, this sliding window approach can be tricky to get right.
In this notebook we share a workflow to tackle time-series problems.
For this demo we will be using New York City real estate data obtained from nyc.gov. The data starts in 2003. The data can be loaded into BigQuery with the following code:
# Read data. Data was collected from nyc open data repository.
import pandas as pd
dfr = pd.read_csv('https://storage.googleapis.com/asl-testing/data/nyc_open_data_real_estate.csv')
# Upload to BigQuery.
PROJECT = 'YOUR-PROJECT-HERE'
DATASET = 'nyc_real_estate'
TABLE = 'residential_sales'
dfr.to_gbq('{}.{}'.format(DATASET, TABLE), PROJECT)
The goal of the notebook is to show how to forecast using Pandas and BigQuery. The steps achieve in this notebook are the following:
1 Building a machine learning (ML) forecasting model locally
2 Building and scaling out a ML using Google BigQuery
3 Building an advanced forecasting modeling using recurrent neural network (RNN) model
This tutorial uses billable components of Google Cloud Platform (GCP):
The BigQuery and Cloud Storage costs are < \$0.05 and the AI Platform training job uses approximately 0.68 ML units or ~\$0.33.
We have created a Pandas solution create_rolling_features_label
function that automatically creates the features/label setup. This is suitable for smaller datasets and for local testing before training on the Cloud. And we have also created a BigQuery script that creates these rolling windows suitable for large datasets.
This notebook is self-contained so let's clone the training-data-analyst repo so we can have access to the feature and label creation functions in time_series.py
and scalable_time_series.py
. We'll be using the pandas_gbq package so make sure that it is installed.
In [ ]:
!pip3 install pandas-gbq
In [13]:
%%bash
git clone https://github.com/GoogleCloudPlatform/training-data-analyst.git \
--depth 1
cd training-data-analyst/blogs/gcp_forecasting
After cloning the above repo we can important pandas and our custom module time_series.py
.
In [ ]:
%matplotlib inline
import pandas as pd
import pandas_gbq as gbq
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge
import time_series
In [ ]:
# Allow you to easily have Python variables in SQL query.
from IPython.core.magic import register_cell_magic
from IPython import get_ipython
@register_cell_magic('with_globals')
def with_globals(line, cell):
contents = cell.format(**globals())
if 'print' in line:
print(contents)
get_ipython().run_cell(contents)
For this demo we will be using New York City real estate data obtained from nyc.gov. This public dataset starts in 2003. The data can be loaded into BigQuery with the following code:
In [ ]:
dfr = pd.read_csv('https://storage.googleapis.com/asl-testing/data/nyc_open_data_real_estate.csv')
# Upload to BigQuery.
PROJECT = "[your-project-id]"
DATASET = 'nyc_real_estate'
TABLE = 'residential_sales'
BUCKET = "[your-bucket]" # Used later.
gbq.to_gbq(dfr, '{}.{}'.format(DATASET, TABLE), PROJECT, if_exists='replace')
Since we are just doing local modeling, let's just use a subsample of the data. Later we will train on all of the data in the cloud.
In [ ]:
SOURCE_TABLE = TABLE
FILTER = '''residential_units = 1 AND sale_price > 10000
AND sale_date > TIMESTAMP('2010-12-31 00:00:00')'''
In [ ]:
%%with_globals
%%bigquery --project {PROJECT} df
SELECT
borough,
neighborhood,
building_class_category,
tax_class_at_present,
block,
lot,
ease_ment,
building_class_at_present,
address,
apartment_number,
zip_code,
residential_units,
commercial_units,total_units,
land_square_feet,
gross_square_feet,
year_built,
tax_class_at_time_of_sale,
building_class_at_time_of_sale,
sale_price,
sale_date,
price_per_sq_ft
FROM
{SOURCE_TABLE}
WHERE
{FILTER}
ORDER BY
sale_date
LIMIT
100
In [ ]:
df.head()
In [ ]:
%%with_globals
%%bigquery --project {PROJECT} df
SELECT
neighborhood,
COUNT(*) AS cnt
FROM
{SOURCE_TABLE}
WHERE
{FILTER}
GROUP BY
neighborhood
ORDER BY
cnt
The most sales are from the upper west side, midtown west, and the upper east side.
In [ ]:
ax = df.set_index('neighborhood').cnt\
.tail(10)\
.plot(kind='barh');
ax.set_xlabel('total sales');
SOHO and Civic Center are the most expensive neighborhoods.
In [ ]:
%%with_globals
%%bigquery --project {PROJECT} df
SELECT
neighborhood,
APPROX_QUANTILES(sale_price, 100)[
OFFSET
(50)] AS median_price
FROM
{SOURCE_TABLE}
WHERE
{FILTER}
GROUP BY
neighborhood
ORDER BY
median_price
In [ ]:
ax = df.set_index('neighborhood').median_price\
.tail(10)\
.plot(kind='barh');
ax.set_xlabel('median price')
Let's create features for building a machine learning model:
In [ ]:
%%with_globals
%%bigquery --project asl-testing-217717 df
SELECT
sale_week,
APPROX_QUANTILES(sale_price, 100)[
OFFSET
(50)] AS median_price
FROM (
SELECT
TIMESTAMP_TRUNC(sale_date, week) AS sale_week,
sale_price
FROM
{SOURCE_TABLE}
WHERE
{FILTER})
GROUP BY
sale_week
ORDER BY
sale_week
In [ ]:
sales = pd.Series(df.median_price)
sales.index= pd.DatetimeIndex(df.sale_week.dt.date)
sales.head()
In [ ]:
ax = sales.plot(figsize=(8,4), label='median_price')
ax = sales.rolling(10).mean().plot(ax=ax, label='10 week rolling average')
ax.legend()
Let's create our features. We will use the create_rolling_features_label
function that automatically creates the features/label setup.
Create the features and labels.
In [ ]:
WINDOW_SIZE = 52 * 1
HORIZON = 4*6
MONTHS = 0
WEEKS = 1
LABELS_SIZE = 1
In [ ]:
df = time_series.create_rolling_features_label(sales, window_size=WINDOW_SIZE, pred_offset=HORIZON)
df = time_series.add_date_features(df, df.index, months=MONTHS, weeks=WEEKS)
df.head()
Let's train our model using all weekly median prices from 2003 -- 2015. Then we will test our model's performance on prices from 2016 -- 2018
In [ ]:
# Features, label.
X = df.drop('label', axis=1)
y = df['label']
# Train/test split. Splitting on time.
train_ix = time_series.is_between_dates(y.index,
end='2015-12-30')
test_ix = time_series.is_between_dates(y.index,
start='2015-12-30',
end='2018-12-30 08:00:00')
X_train, y_train = X.iloc[train_ix], y.iloc[train_ix]
X_test, y_test = X.iloc[test_ix], y.iloc[test_ix]
print(X_train.shape, X_test.shape)
Z-score normalization for the features for training.
In [ ]:
mean = X_train.mean()
std = X_train.std()
def zscore(X):
return (X-mean)/std
X_train = zscore(X_train)
X_test = zscore(X_test)
In [ ]:
df_baseline = y_test.to_frame(name='label')
df_baseline['pred'] = y_train.mean()
In [ ]:
# Join mean predictions with test labels.
baseline_global_metrics = time_series.Metrics(df_baseline.pred,
df_baseline.label)
baseline_global_metrics.report("Global Baseline Model")
In [ ]:
# Train model.
cl = RandomForestRegressor(n_estimators=500, max_features='sqrt',
random_state=10, criterion='mse')
cl = Ridge(100)
cl = GradientBoostingRegressor ()
cl.fit(X_train, y_train)
pred = cl.predict(X_test)
random_forest_metrics = time_series.Metrics(y_test,
pred)
random_forest_metrics.report("Forest Model")
The regression model performs 35% better than the baseline model.
Observations:
In [ ]:
# Data frame to query for plotting
df_res = pd.DataFrame({'pred': pred, 'baseline': df_baseline.pred, 'y_test': y_test})
metrics = time_series.Metrics(df_res.y_test, df_res.pred)
ax = df_res.iloc[:].plot(y=[ 'pred', 'y_test'],
style=['b-','k-'],
figsize=(10,5))
ax.set_title('rmse: {:2.2f}'.format(metrics.rmse), size=16);
ax.set_ylim(20,)
df_res.head()
The following steps are required, regardless of your notebook environment.
BigQuery is automatically enabled in new projects. To activate BigQuery in a pre-existing project, go to Enable the BigQuery API.
Enter your project ID in the cell below.
If you are using AI Platform Notebooks, your environment is already authenticated. Skip this step.
If you are using Colab, run the cell below and follow the instructions when prompted to authenticate your account via oAuth.
Otherwise, follow these steps:
In the GCP Console, go to the Create service account key page.
From the Service account drop-down list, select New service account.
In the Service account name field, enter a name.
From the Role drop-down list, select BigQuery > BigQuery Admin and Storage > Storage Object Admin.
Click Create. A JSON file that contains your key downloads to your computer.
Enter the path to your service account key as the GOOGLE_APPLICATION_CREDENTIALS
variable in the cell below.
In [ ]:
# Import BigQuery module
from google.cloud import bigquery
# Import external custom module containing SQL queries
import scalable_time_series
In [ ]:
# Define hyperparameters
value_name = "med_sales_price"
downsample_size = 7 # 7 days into 1 week
window_size = 52
labels_size = 1
horizon = 1
In [ ]:
# Construct a BigQuery client object.
client = bigquery.Client()
# Set dataset_id to the ID of the dataset to create.
sink_dataset_name = "temp_forecasting_dataset"
dataset_id = "{}.{}".format(client.project, sink_dataset_name)
# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset.from_string(dataset_id)
# Specify the geographic location where the dataset should reside.
dataset.location = "US"
# Send the dataset to the API for creation.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
try:
dataset = client.create_dataset(dataset) # API request
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
except Exception as e:
print("Dataset {}.{} already exists".format(
client.project, dataset.dataset_id))
We need to create a date range table in BigQuery so that we can join our data to that to get the correct sequences.
In [ ]:
# Call BigQuery and examine in dataframe
source_dataset = "nyc_real_estate"
source_table_name = "all_sales"
query_create_date_range = scalable_time_series.create_date_range(
client.project, source_dataset, source_table_name)
df = client.query(query_create_date_range + "LIMIT 100").to_dataframe()
df.head(5)
Execute query and write to BigQuery table.
In [ ]:
job_config = bigquery.QueryJobConfig()
# Set the destination table
table_name = "start_end_timescale_date_range"
table_ref = client.dataset(sink_dataset_name).table(table_name)
job_config.destination = table_ref
job_config.write_disposition = "WRITE_TRUNCATE"
# Start the query, passing in the extra configuration.
query_job = client.query(
query=query_create_date_range,
# Location must match that of the dataset(s) referenced in the query
# and of the destination table.
location="US",
job_config=job_config) # API request - starts the query
query_job.result() # Waits for the query to finish
print("Query results loaded to table {}".format(table_ref.path))
Now that we have the date range table created we can create our training dataset for BQML.
In [ ]:
# Call BigQuery and examine in dataframe
sales_dataset_table = source_dataset + "." + source_table_name
query_bq_sub_sequences = scalable_time_series.bq_create_rolling_features_label(
client.project, sink_dataset_name, table_name, sales_dataset_table,
value_name, downsample_size, window_size, horizon, labels_size)
print(query_bq_sub_sequences[0:500])
In [ ]:
%%with_globals
%%bigquery --project $PROJECT
{query_bq_sub_sequences}
LIMIT 100
Prior to now we've just been reading an existing BigQuery table, now we're going to create our own so so we need some place to put it. In BigQuery parlance, Dataset
means a folder for tables.
We will take advantage of BigQuery's Python Client to create the dataset.
In [ ]:
bq = bigquery.Client(project = PROJECT)
dataset = bigquery.Dataset(bq.dataset("bqml_forecasting"))
try:
bq.create_dataset(dataset) # will fail if dataset already exists
print("Dataset created")
except:
print("Dataset already exists")
Split dataset into a train and eval set.
In [ ]:
feature_list = ["price_ago_{time}".format(time=time)
for time in range(window_size, 0, -1)]
label_list = ["price_ahead_{time}".format(time=time)
for time in range(1, labels_size + 1)]
select_list = ",".join(feature_list + label_list)
select_string = "SELECT {select_list} FROM ({query})".format(
select_list=select_list,
query=query_bq_sub_sequences)
concat_vars = []
concat_vars.append("CAST(feat_seq_start_date AS STRING)")
concat_vars.append("CAST(lab_seq_end_date AS STRING)")
farm_finger = "FARM_FINGERPRINT(CONCAT({concat_vars}))".format(
concat_vars=", ".join(concat_vars))
sampling_clause = "ABS(MOD({farm_finger}, 100))".format(
farm_finger=farm_finger)
bqml_train_query = "{select_string} WHERE {sampling_clause} < 80".format(
select_string=select_string, sampling_clause=sampling_clause)
bqml_eval_query = "{select_string} WHERE {sampling_clause} >= 80".format(
select_string=select_string, sampling_clause=sampling_clause)
To create a model
CREATE MODEL
and provide a destination table for resulting model. Alternatively we can use CREATE OR REPLACE MODEL
which allows overwriting an existing model.OPTIONS
to specify the model type (linear_reg or logistic_reg). There are many more options we could specify, such as regularization and learning rate, but we'll accept the defaults.Have a look at Step Two of this tutorial to see another example.
The query will take about two minutes to complete
In [ ]:
%%with_globals
%%bigquery --project $PROJECT
CREATE or REPLACE MODEL bqml_forecasting.nyc_real_estate
OPTIONS(model_type = "linear_reg",
input_label_cols = ["price_ahead_1"]) AS
{bqml_train_query}
Because the query uses a CREATE MODEL
statement to create a table, you do not see query results. The output is an empty string.
To get the training results we use the ML.TRAINING_INFO
function.
Have a look at Step Three and Four of this tutorial to see a similar example.
In [ ]:
%%bigquery --project $PROJECT
SELECT
{select_list}
FROM
ML.TRAINING_INFO(MODEL `bqml_forecasting.nyc_real_estate`)
'eval_loss' is reported as mean squared error, so our RMSE is 291178. Your results may vary.
In [ ]:
%%with_globals
%%bigquery --project $PROJECT
#standardSQL
SELECT
{select_list}
FROM
ML.EVALUATE(MODEL `bqml_forecasting.nyc_real_estate`, ({bqml_eval_query}))
To use our model to make predictions, we use ML.PREDICT
. Let's, use the nyc_real_estate
you trained above to infer median sales price of all of our data.
Have a look at Step Five of this tutorial to see another example.
In [ ]:
%%with_globals
%%bigquery --project $PROJECT df
#standardSQL
SELECT
predicted_price_ahead_1
FROM
ML.PREDICT(MODEL `bqml_forecasting.nyc_real_estate`, ({bqml_eval_query}))
Create temporary BigQuery dataset
In [ ]:
# Construct a BigQuery client object.
client = bigquery.Client()
# Set dataset_id to the ID of the dataset to create.
sink_dataset_name = "temp_forecasting_dataset"
dataset_id = "{}.{}".format(client.project, sink_dataset_name)
# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset.from_string(dataset_id)
# Specify the geographic location where the dataset should reside.
dataset.location = "US"
# Send the dataset to the API for creation.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
try:
dataset = client.create_dataset(dataset) # API request
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
except:
print("Dataset {}.{} already exists".format(
client.project, dataset.dataset_id))
We need to create a date range table in BigQuery so that we can join our data to that to get the correct sequences.
In [ ]:
# Call BigQuery and examine in dataframe
source_dataset = "nyc_real_estate"
source_table_name = "all_sales"
query_create_date_range = scalable_time_series.create_date_range(
client.project, source_dataset, source_table_name)
df = client.query(query_create_date_range + "LIMIT 100").to_dataframe()
df.head(5)
Execute query and write to BigQuery table.
In [ ]:
job_config = bigquery.QueryJobConfig()
# Set the destination table
table_name = "start_end_timescale_date_range"
table_ref = client.dataset(sink_dataset_name).table(table_name)
job_config.destination = table_ref
job_config.write_disposition = "WRITE_TRUNCATE"
# Start the query, passing in the extra configuration.
query_job = client.query(
query=query_create_date_range,
# Location must match that of the dataset(s) referenced in the query
# and of the destination table.
location="US",
job_config=job_config) # API request - starts the query
query_job.result() # Waits for the query to finish
print("Query results loaded to table {}".format(table_ref.path))
Now that we have the date range table created we can create our training dataset.
In [ ]:
# Call BigQuery and examine in dataframe
sales_dataset_table = source_dataset + "." + source_table_name
downsample_size = 7
query_csv_sub_seqs = scalable_time_series.csv_create_rolling_features_label(
client.project, sink_dataset_name, table_name, sales_dataset_table,
value_name, downsample_size, window_size, horizon, labels_size)
df = client.query(query_csv_sub_seqs + "LIMIT 100").to_dataframe()
df.head(20)
Now let's write the our training data table to BigQuery for train and eval so that we can export to CSV for TensorFlow.
In [ ]:
job_config = bigquery.QueryJobConfig()
csv_select_list = "med_sales_price_agg, labels_agg"
for step in ["train", "eval"]:
if step == "train":
selquery = "SELECT {csv_select_list} FROM ({}) WHERE {} < 80".format(
query_csv_sub_sequences, sampling_clause)
else:
selquery = "SELECT {csv_select_list} FROM ({}) WHERE {} >= 80".format(
query_csv_sub_sequences, sampling_clause)
# Set the destination table
table_name = "nyc_real_estate_{}".format(step)
table_ref = client.dataset(sink_dataset_name).table(table_name)
job_config.destination = table_ref
job_config.write_disposition = "WRITE_TRUNCATE"
# Start the query, passing in the extra configuration.
query_job = client.query(
query=selquery,
# Location must match that of the dataset(s) referenced in the query
# and of the destination table.
location="US",
job_config=job_config) # API request - starts the query
query_job.result() # Waits for the query to finish
print("Query results loaded to table {}".format(table_ref.path))
Export BigQuery table to CSV in GCS.
In [ ]:
dataset_ref = client.dataset(
dataset_id=sink_dataset_name, project=client.project)
for step in ["train", "eval"]:
destination_uri = "gs://{}/{}".format(
BUCKET, "forecasting/nyc_real_estate/data/{}*.csv".format(step))
table_name = "nyc_real_estate_{}".format(step)
table_ref = dataset_ref.table(table_name)
extract_job = client.extract_table(
table_ref,
destination_uri,
# Location must match that of the source table.
location="US",
) # API request
extract_job.result() # Waits for job to complete.
print("Exported {}:{}.{} to {}".format(
client.project, sink_dataset_name, table_name, destination_uri))
In [ ]:
!gsutil -m cp gs://asl-testing-bucket/forecasting/nyc_real_estate/data/*.csv .
In [ ]:
!head train*.csv
In [ ]:
import os
PROJECT = PROJECT # REPLACE WITH YOUR PROJECT ID
BUCKET = BUCKET # REPLACE WITH A BUCKET NAME
REGION = "us-central1" # REPLACE WITH YOUR REGION e.g. us-central1
# Import os environment variables
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TF_VERSION"] = "1.13"
os.environ["SEQ_LEN"] = str(WINDOW_SIZE)
In [ ]:
%%bash
OUTDIR=gs://$BUCKET/forecasting/nyc_real_estate/trained_model
JOBNAME=nyc_real_estate$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ai-platform jobs submit training $JOBNAME \
--region=$REGION \
--module-name=trainer.task \
--package-path=$PWD/tf_module/trainer \
--job-dir=$OUTDIR \
--staging-bucket=gs://$BUCKET \
--scale-tier=STANDARD_1 \
--runtime-version=$TF_VERSION \
-- \
--train_file_pattern="gs://asl-testing-bucket/forecasting/nyc_real_estate/data/train*.csv" \
--eval_file_pattern="gs://asl-testing-bucket/forecasting/nyc_real_estate/data/eval*.csv" \
--output_dir=$OUTDIR \
--job-dir=./tmp \
--seq_len=$SEQ_LEN \
--train_batch_size=32 \
--eval_batch_size=32 \
--train_steps=1000 \
--learning_rate=0.01 \
--start_delay_secs=60 \
--throttle_secs=60 \
--lstm_hidden_units="32,16,8"