This notebook can be executed in a notebook hosted in KubeFlow. You can find instructions on how to deploy a KubeFlow cluster and how to access the the KubeFlow UI and the hosted notebooks here: https://www.kubeflow.org/docs/pipelines/pipelines-quickstart/
Please install KubeFlow Pipelines SDK using the following comand:
In [ ]:
!pip3 install 'https://storage.googleapis.com/ml-pipeline/release/0.1.9/kfp.tar.gz'
This notebook generates a KubeFlow pipeline that runs the solution end to end. For more information on KubeFlow pipelines and how to run them in GCP please visit https://github.com/kubeflow/pipelines
In [ ]:
import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.notebook
#Please modify the following values to match your GCP bucket, project, and docker image name.
OUTPUT_DIR = 'gs://pipelinestest/out'
PROJECT_NAME = 'energy-forecasting'
EF_IMAGE='gcr.io/%s/energy:dev' % PROJECT_NAME
In [ ]:
%%docker {EF_IMAGE} {OUTPUT_DIR}
FROM tensorflow/tensorflow:1.10.0-py3
RUN apt-get update
RUN apt-get install -y git
RUN pip3 install --upgrade google-api-python-client
RUN pip3 install --upgrade pyarrow
RUN pip3 install --upgrade google-cloud-bigquery
RUN pip3 install --upgrade google-cloud-storage
RUN pip3 install --upgrade gitpython
In [ ]:
def copy_table(
dataset: str) -> str:
"""Retrieves raw data from competition website.
Retrieves raw data from the competition site and saves it in BigQuery.
Args:
dataset: String specifying the dataset in BigQuery to save the data in.
Returns:
String specifying if the component finished succesfully.
"""
from google.cloud import bigquery
import requests
import pandas as pd
from io import StringIO
from io import BytesIO
import zipfile
bq_client = bigquery.Client()
price_data = pd.read_csv(
StringIO(requests.get(
'http://complatt.smartwatt.net/assets/files/historicalRealData/RealMarketPriceDataPT.csv').text),
sep=';'
)
price_data.columns = ['date_utc', 'price']
bq_client.load_table_from_dataframe(
price_data,
bq_client.dataset(dataset).table(
'MarketPricePT')).result()
weather_zip = zipfile.ZipFile(
BytesIO(requests.get(
'http://complatt.smartwatt.net/assets/files/weatherHistoricalData/WeatherHistoricalData.zip').content))
weather_data = pd.read_csv(
weather_zip.open(
'WeatherHistoricalData/historical_weather.csv'))
bq_client.load_table_from_dataframe(
weather_data,
bq_client.dataset(dataset).table(
'historical_weather')).result()
return('success')
compiler.build_python_component(
component_func = copy_table,
staging_gcs_path = OUTPUT_DIR,
base_image=EF_IMAGE,
target_component_file='copy-table.component.yaml',
target_image = 'gcr.io/' + PROJECT_NAME + '/component-copy-table:latest')
In [ ]:
def export_table(
inp: str,
table: str,
file: str) -> str:
"""Exports table to csv.
Exports BigQuery table into CSV file.
Args:
inp: String containing the output from previous component.
table: String specifying the origin BigQuery table.
file: String specifying the path and name for the csv file.
Returns:
String specifying if the component finished succesfully.
"""
from google.cloud import bigquery
bq_client = bigquery.Client()
bq_client.extract_table(
table,
file).result()
return('success')
compiler.build_python_component(
component_func = export_table,
staging_gcs_path = OUTPUT_DIR,
base_image=EF_IMAGE,
target_component_file='export-table.component.yaml',
target_image = 'gcr.io/' + PROJECT_NAME + '/component-export-table:latest')
In [ ]:
def run_git_python_script(
inp: str,
code_repo: str,
code_folder: str,
script: str,
script_args: str) -> str:
"""Runs Python script from git repository.
Args:
inp: String containing the output from previous component.
code_repo: String specifying the url to the git repository.
code_folder: String specifying the folder for the script.
script: String specifying the name of the script.
script_args: String specifying the arguments for the script.
Returns:
String specifying if the component finished succesfully.
"""
import os
import git
git.Git('').clone(code_repo)
os.chdir(code_folder)
output = os.system(' '.join([
'python -m',
script,
script_args]))
if output == 0:
return('success')
raise Exception('Script failed. The exit status was: {}'.format(output))
compiler.build_python_component(
component_func = run_git_python_script,
staging_gcs_path = OUTPUT_DIR,
base_image=EF_IMAGE,
target_component_file='run-git-python-script.component.yaml',
target_image = 'gcr.io/' + PROJECT_NAME + '/component-run-git-python-script:latest')
In [ ]:
def train_git_cmle_model(
tr_inp: str,
va_inp: str,
code_repo: str,
code_folder: str,
project: str,
bucket: str,
package_folder: str,
cmle_folder: str,
scale_tier: str,
python_module: str,
region: str,
runtime_version: str,
cmle_args: str) -> str:
"""Executes CMLE training job.
Retrieves python file from git repo and launches training job in CMLE.
Args:
tr_inp: String containing the source for the training data.
va_inp: String containing the source for the validation data.
code_repo: String specifying the url to the git repository.
code_folder: String specifying the folder for the job code.
project: String specifying the GCP project where job will run.
bucket: String specifying the GCS bucket where to save the job's outputs.
package_folder: String specifying the python package to run for the job.
cmle_folder: String specifying the folder in GCS where to save outputs.
scale_tier: String specifying compute resources to use for training job.
python_module: String specifying the python module to run for the job.
region: String specifying the GCP region in which to run the job.
runtime_version: String specifying the CMLE version to use for the job.
script_args: String specifying the arguments for the CMLE job.
Returns:
String containing output from running the training job in CMLE.
"""
import os
import git
import tarfile
import datetime
from google.cloud import storage
from googleapiclient import discovery
jobId = 'train' + datetime.datetime.today().strftime('%Y%m%d%H%M%S')
git.Git('').clone(code_repo)
with tarfile.open('code.tar.gz', 'w:gz') as tar:
tar.add(
code_folder,
arcname=os.path.basename(code_folder))
gcs_client = storage.Client()
gcs_bucket = gcs_client.get_bucket(bucket)
blob = gcs_bucket.blob(package_folder + jobId + '.tar.gz')
blob.upload_from_filename('code.tar.gz')
training_inputs = {
'scaleTier': scale_tier,
'pythonModule': python_module,
'args': cmle_args.split(' '),
'region': region,
'packageUris': [
'gs://'+ bucket + '/' + package_folder + jobId + '.tar.gz'],
'jobDir': 'gs://'+ bucket + '/' + cmle_folder + jobId,
'runtimeVersion': runtime_version}
job_spec = {
'jobId': jobId,
'trainingInput': training_inputs}
cloudml = discovery.build('ml', 'v1')
project_id = 'projects/{}'.format(project)
request = cloudml.projects().jobs().create(
body=job_spec,
parent=project_id)
return(str(request.execute()))
compiler.build_python_component(
component_func = train_git_cmle_model,
staging_gcs_path = OUTPUT_DIR,
base_image=EF_IMAGE,
target_component_file='train-git-cmle-model.component.yaml',
target_image = 'gcr.io/' + PROJECT_NAME + '/component-train-git-cmle-model:latest')
In [4]:
@dsl.pipeline(
name='Energy Price Forecasting',
description='Energy Price Forecasting')
def basic_bq_pipeline(
project = dsl.PipelineParam(
'project',
value='energy-forecasting'),
dataset = dsl.PipelineParam(
'dataset',
value='Energy'),
bucket = dsl.PipelineParam(
'bucket',
value='energyforecast'),
code_repo = dsl.PipelineParam(
'code-repo',
value='https://github.com/GoogleCloudPlatform/professional-services.git'),
code_folder = dsl.PipelineParam(
'code-folder',
value='professional-services/examples/cloudml-energy-price-forecasting'),
data_prep_script = dsl.PipelineParam(
'data-prep-script',
value='data_preparation.data_prep'),
data_prep_args = dsl.PipelineParam(
'data-prep-args',
value=' '.join([
'--dataset=Energy',
'--train_table=MLDataTrain',
'--valid_table=MLDataValid',
'--test_table=MLDataTest',
'--prepare_data_file=data_preparation/prepare_data.sql',
'--weather_mean_std_file=data_preparation/weather_mean_std.sql',
'--train_from_date="2015-01-05 00:00:00"',
'--train_to_date="2015-10-04 23:01:00"',
'--valid_from_date="2015-10-05 00:00:00"',
'--valid_to_date="2015-10-11 23:01:00"',
'--test_from_date="2015-10-12 00:00:00"',
'--test_to_date="2015-10-18 23:01:00"',
'--price_scaling=0.01',
'--mean_path=gs://energyforecast/data/pickle/mean.pkl',
'--std_path=gs://energyforecast/data/pickle/std.pkl'])),
package_folder = dsl.PipelineParam(
'package-folder',
value='package/'),
cmle_folder = dsl.PipelineParam(
'cmle-folder',
value='cmle/'),
cmle_args = dsl.PipelineParam(
'cmle-args',
value=' '.join([
'--training_path', 'gs://energyforecast/data/csv/MLDataTrain.csv',
'--validation_path', 'gs://energyforecast/data/csv/MLDataValid.csv',
'--mean_path', 'gs://energyforecast/data/pickle/mean.pkl',
'--std_path', 'gs://energyforecast/data/pickle/std.pkl',
'--dropout' , '0.2',
'--hour_embedding', '20',
'--day_embedding', '10',
'--first_layer_size', '100',
'--number_layers', '3',
'--layer_reduction_fraction', '0.5',
'--learning_rate', '0.01',
'--batch_size', '64',
'--eval_batch_size', '168',
'--max_steps', '5000'])),
scale_tier = dsl.PipelineParam(
'scale-tier',
value='BASIC'),
python_module = dsl.PipelineParam(
'python-module',
value='trainer.task'),
region = dsl.PipelineParam(
'region',
value='us-central1'),
runtime_version = dsl.PipelineParam(
'runtime-version',
value='1.10'),
train_table = dsl.PipelineParam(
'train-table',
value='Energy.MLDataTrain'),
valid_table = dsl.PipelineParam(
'valid-table',
value='Energy.MLDataValid'),
test_table = dsl.PipelineParam(
'test-table',
value='Energy.MLDataTest'),
train_file = dsl.PipelineParam(
'train-file',
value='gs://energyforecast/data/csv/MLDataTrain.csv'),
valid_file = dsl.PipelineParam(
'valid-file',
value='gs://energyforecast/data/csv/MLDataValid.csv'),
test_file = dsl.PipelineParam(
'test-file',
value='gs://energyforecast/data/csv/MLDataTest.csv')):
CopTableOp = kfp.components.load_component('copy-table.component.yaml')
ExpTableOp = kfp.components.load_component('export-table.component.yaml')
DataPrepOp = kfp.components.load_component('run-git-python-script.component.yaml')
TrainModelOp = kfp.components.load_component('train-git-cmle-model.component.yaml')
ct_op = CopTableOp(
dataset).apply(gcp.use_gcp_secret('user-gcp-sa'))
dp_op = DataPrepOp(
ct_op.output,
code_repo,
code_folder,
data_prep_script,
data_prep_args).apply(gcp.use_gcp_secret('user-gcp-sa'))
tr_et_op = ExpTableOp(
dp_op.output,
train_table,
train_file).apply(gcp.use_gcp_secret('user-gcp-sa'))
va_et_op = ExpTableOp(
dp_op.output,
valid_table,
valid_file).apply(gcp.use_gcp_secret('user-gcp-sa'))
te_et_op = ExpTableOp(
dp_op.output,
test_table,
test_file).apply(gcp.use_gcp_secret('user-gcp-sa'))
tm_op = TrainModelOp(
tr_et_op.output,
va_et_op.output,
code_repo,
code_folder,
project,
bucket,
package_folder,
cmle_folder,
scale_tier,
python_module,
region,
runtime_version,
cmle_args).apply(gcp.use_gcp_secret('user-gcp-sa'))
compiler.Compiler().compile(basic_bq_pipeline, 'energy-forecasting.tar.gz')
In [ ]: