TODO: Complete the lab notebook #TODO sections. You can refer to the solutions/ notebook for reference.
While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.
In [ ]:
# Ensure the right version of Tensorflow is installed.
!pip freeze | grep tensorflow==2.1
In [ ]:
%pip install apache-beam[gcp]==2.13.0
After installing Apache Beam, restart your kernel by selecting "Kernel" from the menu and clicking "Restart kernel..."
Make sure the Dataflow API is enabled by going to this link. Ensure that you've installed Beam by importing it and printing the version number.
In [ ]:
import apache_beam as beam
print(beam.__version__)
You may receive a UserWarning
about the Apache Beam SDK for Python 3 as not being yet fully supported. Don't worry about this.
In [ ]:
# change these to try this notebook out
BUCKET = 'cloud-training-demos-ml'
PROJECT = 'cloud-training-demos'
REGION = 'us-central1'
In [ ]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION
In [ ]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
gsutil mb -l ${REGION} gs://${BUCKET}
fi
The data is natality data (record of births in the US). My goal is to predict the baby's weight given a number of factors about the pregnancy and the baby's mother. Later, we will want to split the data into training and eval datasets. The hash of the year-month will be used for that.
In [ ]:
# Create SQL query using natality data after the year 2000
query = """
SELECT
weight_pounds,
is_male,
mother_age,
plurality,
gestation_weeks,
FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
publicdata.samples.natality
WHERE year > 2000
"""
In [ ]:
# Call BigQuery and examine in dataframe
from google.cloud import bigquery
df = bigquery.Client().query(query + " LIMIT 100").to_dataframe()
df.head()
Instead of using Beam/Dataflow, I had three other options:
However, in this case, I want to do some preprocessing, modifying data so that we can simulate what is known if no ultrasound has been performed. If I didn't need preprocessing, I could have used the web console. Also, I prefer to script it out rather than run queries on the user interface, so I am using Cloud Dataflow for the preprocessing.
Note that after you launch this, the actual processing is happening on the cloud. Go to the GCP webconsole to the Dataflow section and monitor the running job. It took about 20 minutes for me.
If you wish to continue without doing this step, you can copy my preprocessed output:
gsutil -m cp -r gs://cloud-training-demos/babyweight/preproc gs://your-bucket/
In [ ]:
import datetime, os
def to_csv(rowdict):
# Pull columns from BQ and create a line
import hashlib
import copy
CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks'.split(',')
# Create synthetic data where we assume that no ultrasound has been performed
# and so we don't know sex of the baby. Let's assume that we can tell the difference
# between single and multiple, but that the errors rates in determining exact number
# is difficult in the absence of an ultrasound.
no_ultrasound = copy.deepcopy(rowdict)
w_ultrasound = copy.deepcopy(rowdict)
# TODO create logic for no_ultrasound where we only know whether its a single baby or multiple (but not how many multiple)
no_ultrasound['is_male'] = 'Unknown'
if # TODO create logic check for multiples
no_ultrasound['plurality'] = 'Multiple(2+)'
else: # TODO create logic check for single
no_ultrasound['plurality'] = 'Single(1)'
# Change the plurality column to strings
w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality'] - 1]
# Write out two rows for each input row, one with ultrasound and one without
for result in [no_ultrasound, w_ultrasound]:
data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
key = hashlib.sha224(data.encode('utf-8')).hexdigest() # hash the columns to form a key
yield str('{},{}'.format(data, key))
def preprocess(in_test_mode):
import shutil, os, subprocess
job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
if in_test_mode:
print('Launching local job ... hang on')
OUTPUT_DIR = './preproc'
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
os.makedirs(OUTPUT_DIR)
else:
print('Launching Dataflow job {} ... hang on'.format(job_name))
OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)
try:
subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
except:
pass
options = {
'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
'job_name': job_name,
'project': PROJECT,
'region': REGION,
'teardown_policy': 'TEARDOWN_ALWAYS',
'no_save_main_session': True,
'max_num_workers': 6
}
opts = beam.pipeline.PipelineOptions(flags = [], **options)
if in_test_mode:
RUNNER = 'DirectRunner'
else:
RUNNER = 'DataflowRunner'
p = beam.Pipeline(RUNNER, options = opts)
query = """
SELECT
weight_pounds,
is_male,
mother_age,
plurality,
gestation_weeks,
FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
publicdata.samples.natality
WHERE year > 2000
AND weight_pounds > 0
AND mother_age > 0
AND plurality > 0
AND gestation_weeks > 0
AND month > 0
"""
if in_test_mode:
query = query + ' LIMIT 100'
for step in ['train', 'eval']:
if step == 'train':
selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query)
else:
selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)
(p
| '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))
| '{}_csv'.format(step) >> beam.FlatMap(to_csv)
| '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
)
job = p.run()
if in_test_mode:
job.wait_until_finish()
print("Done!")
preprocess(in_test_mode = False)
The above step will take 20+ minutes. Go to the GCP web console, navigate to the Dataflow section and wait for the job to finish before you run the following step.
In [ ]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*
Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License