In [1]:
# %%bash
# pip install tensorflow==1.7
# pip install google-cloud-dataflow==2.3
# pip install tensorflow-hub
This bigquery-public-data:hacker_news contains all stories and comments from Hacker News from its launch in 2006. Each story contains a story id, url, the title of the story, tthe author that made the post, when it was written, and the number of points the story received.
The objective is, given the title of the story, we want to build an ML model that can predict the source of this story.
This notebook illustrates how to build a Beam pipeline using tf.transform to prepare ML 'train' and 'eval' datasets. The pipeline includes the following steps:
In [2]:
import os
class Params:
pass
# Set to run on GCP
Params.GCP_PROJECT_ID = 'ksalama-gcp-playground'
Params.REGION = 'europe-west1'
Params.BUCKET = 'ksalama-gcs-cloudml'
Params.PLATFORM = 'local' # local | GCP
Params.DATA_DIR = 'data/news' if Params.PLATFORM == 'local' else 'gs://{}/data/news'.format(Params.BUCKET)
Params.TRANSFORMED_DATA_DIR = os.path.join(Params.DATA_DIR, 'transformed')
Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX = os.path.join(Params.TRANSFORMED_DATA_DIR, 'train')
Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX = os.path.join(Params.TRANSFORMED_DATA_DIR, 'eval')
Params.TEMP_DIR = os.path.join(Params.DATA_DIR, 'tmp')
Params.MODELS_DIR = 'models/news' if Params.PLATFORM == 'local' else 'gs://{}/models/news'.format(Params.BUCKET)
Params.TRANSFORM_ARTEFACTS_DIR = os.path.join(Params.MODELS_DIR,'transform')
Params.TRANSFORM = True
In [3]:
import apache_beam as beam
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.coders as tft_coders
from tensorflow.contrib.learn.python.learn.utils import input_fn_utils
from tensorflow_transform.beam import impl
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.saved import saved_transform_io
In [4]:
bq_query = '''
SELECT
key,
REGEXP_REPLACE(title, '[^a-zA-Z0-9 $.-]', ' ') AS title,
source
FROM
(
SELECT
ARRAY_REVERSE(SPLIT(REGEXP_EXTRACT(url, '.*://(.[^/]+)/'), '.'))[OFFSET(1)] AS source,
title,
ABS(FARM_FINGERPRINT(title)) AS Key
FROM
`bigquery-public-data.hacker_news.stories`
WHERE
REGEXP_CONTAINS(REGEXP_EXTRACT(url, '.*://(.[^/]+)/'), '.com$')
AND LENGTH(title) > 10
)
WHERE (source = 'github' OR source = 'nytimes' OR source = 'techcrunch')
'''
def get_source_query(step):
if step == 'train':
source_query = 'SELECT * FROM ({}) WHERE MOD(key,100) <= 75'.format(bq_query)
else:
source_query = 'SELECT * FROM ({}) WHERE MOD(key,100) > 75'.format(bq_query)
return source_query
In [5]:
RAW_HEADER = 'key,title,source'.split(',')
RAW_DEFAULTS = [['NA'],['NA'],['NA']]
TARGET_FEATURE_NAME = 'source'
TARGET_LABELS = ['github', 'nytimes', 'techcrunch']
TEXT_FEATURE_NAME = 'title'
KEY_COLUMN = 'key'
VOCAB_SIZE = 20000
TRAIN_SIZE = 73124
EVAL_SIZE = 23079
DELIMITERS = '.,!?() '
raw_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema({
KEY_COLUMN: dataset_schema.ColumnSchema(
tf.string, [], dataset_schema.FixedColumnRepresentation()),
TEXT_FEATURE_NAME: dataset_schema.ColumnSchema(
tf.string, [], dataset_schema.FixedColumnRepresentation()),
TARGET_FEATURE_NAME: dataset_schema.ColumnSchema(
tf.string, [], dataset_schema.FixedColumnRepresentation()),
}))
In [6]:
def get_features(bq_row):
CSV_HEADER = 'key,title,source'.split(',')
input_features = {}
for feature_name in CSV_HEADER:
input_features[feature_name] = str(bq_row[feature_name]).lower()
return input_features
def preprocessing_fn(input_features):
text = input_features[TEXT_FEATURE_NAME]
text_tokens = tf.string_split(text, DELIMITERS)
text_tokens_indcies = tft.string_to_int(text_tokens, top_k=VOCAB_SIZE)
bag_of_words_indices, text_weight = tft.tfidf(text_tokens_indcies, VOCAB_SIZE + 1)
output_features = {}
output_features[TEXT_FEATURE_NAME] = input_features[TEXT_FEATURE_NAME]
output_features['bow'] = bag_of_words_indices
output_features['weight'] = text_weight
output_features[TARGET_FEATURE_NAME] = input_features[TARGET_FEATURE_NAME]
return output_features
In [7]:
import apache_beam as beam
def run_pipeline(runner, opts):
print("Sink train data files: {}".format(Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX))
print("Sink data files: {}".format(Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX))
print("Temporary directory: {}".format(Params.TEMP_DIR))
print("")
with beam.Pipeline(runner, options=opts) as pipeline:
with impl.Context(Params.TEMP_DIR):
###### analyze & transform train #########################################################
if(runner=='DirectRunner'):
print("")
print("Transform training data....")
print("")
step = 'train'
source_query = get_source_query(step)
# Read raw train data from BQ and cleanup
raw_train_data = (
pipeline
| '{} - Read Data from BigQuery'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=source_query, use_standard_sql=True))
| '{} - Extract Features'.format(step) >> beam.Map(get_features)
)
# create a train dataset from the data and schema
raw_train_dataset = (raw_train_data, raw_metadata)
# analyze and transform raw_train_dataset to produced transformed_train_dataset and transform_fn
transformed_train_dataset, transform_fn = (
raw_train_dataset
| '{} - Analyze & Transform'.format(step) >> impl.AnalyzeAndTransformDataset(preprocessing_fn)
)
# get data and schema separately from the transformed_train_dataset
transformed_train_data, transformed_metadata = transformed_train_dataset
# write transformed train data to sink
_ = (
transformed_train_data
| '{} - Write Transformed Data as tfrecords'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
file_path_prefix=Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX,
file_name_suffix=".tfrecords",
num_shards=25,
coder=tft_coders.example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
)
# #### TEST write transformed AS TEXT train data to sink
# _ = (
# transformed_train_data
# | '{} - Write Transformed Data as Text'.format(step) >> beam.io.textio.WriteToText(
# file_path_prefix=Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX,
# file_name_suffix=".csv")
# )
# ##################################################
###### transform eval ##################################################################
if(runner=='DirectRunner'):
print("")
print("Transform eval data....")
print("")
step = 'eval'
source_query = get_source_query(step)
# Read raw eval data from BQ and cleanup
raw_eval_data = (
pipeline
| '{} - Read Data from BigQuery'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=source_query, use_standard_sql=True))
| '{} - Extract Features'.format(step) >> beam.Map(get_features)
)
# create a eval dataset from the data and schema
raw_eval_dataset = (raw_eval_data, raw_metadata)
# transform eval data based on produced transform_fn (from analyzing train_data)
transformed_eval_dataset = (
(raw_eval_dataset, transform_fn)
| '{} - Transform'.format(step) >> impl.TransformDataset()
)
# get data from the transformed_eval_dataset
transformed_eval_data, _ = transformed_eval_dataset
# write transformed eval data to sink
_ = (
transformed_eval_data
| '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
file_path_prefix=Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX,
file_name_suffix=".tfrecords",
num_shards=10,
coder=tft_coders.example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
)
###### write transformation metadata #######################################################
if(runner=='DirectRunner'):
print("")
print("Saving transformation artefacts ....")
print("")
# write transform_fn as tf.graph
_ = (
transform_fn
| 'Write Transform Artefacts' >> transform_fn_io.WriteTransformFn(Params.TRANSFORM_ARTEFACTS_DIR)
)
if runner=='DataflowRunner':
pipeline.run()
In [8]:
from datetime import datetime
import shutil
job_name = 'preprocess-hackernews-data' + '-' + datetime.utcnow().strftime('%y%m%d-%H%M%S')
options = {
'region': Params.REGION,
'staging_location': os.path.join(Params.TEMP_DIR, 'staging'),
'temp_location': Params.TEMP_DIR,
'job_name': job_name,
'project': Params.GCP_PROJECT_ID
}
tf.logging.set_verbosity(tf.logging.ERROR)
opts = beam.pipeline.PipelineOptions(flags=[], **options)
runner = 'DirectRunner' if Params.PLATFORM == 'local' else 'DirectRunner'
if Params.TRANSFORM:
if Params.PLATFORM == 'local':
shutil.rmtree(Params.TRANSFORMED_DATA_DIR, ignore_errors=True)
shutil.rmtree(Params.TRANSFORM_ARTEFACTS_DIR, ignore_errors=True)
shutil.rmtree(Params.TEMP_DIR, ignore_errors=True)
print 'Launching {} job {} ... hang on'.format(runner, job_name)
run_pipeline(runner, opts)
print "Pipline completed."
else:
print "Transformation skipped!"
In [9]:
%%bash
echo "** transformed data:"
ls data/news/transformed
echo ""
echo "** transform artefacts:"
ls models/news/transform
echo ""
echo "** transform assets:"
ls models/news/transform/transform_fn/assets
echo ""
head models/news/transform/transform_fn/assets/vocab_string_to_int_uniques
In [ ]: