Preprocessing using tf.transform and Dataflow

This notebook illustrates:

  1. Creating datasets for Machine Learning using tf.transform and Dataflow </ol>
  2. While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.

    Only specific combinations of TensorFlow/Beam are supported by tf.transform. So make sure to get a combo that is.

    • TFT 0.3
    • TF 1.3
    • Apache Beam [GCP] 2.1.1

    
    
    In [ ]:
    %bash
    pip uninstall -y google-cloud-dataflow
    pip install --upgrade --force tensorflow_transform==0.1.10 apache-beam[gcp]
    
    
    
    In [1]:
    %bash
    pip freeze | grep -e 'flow\|beam'
    
    
    
    
    apache-beam==2.1.1
    tensorflow==1.2.0
    tensorflow-transform==0.1.10
    
    
    
    In [2]:
    import tensorflow as tf
    import apache_beam as beam
    print tf.__version__
    
    
    
    
    1.2.0
    
    No handlers could be found for logger "oauth2client.contrib.multistore_file"
    /usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:44.433221. Please add timezone info to timestamps.
      chunks = self.iterencode(o, _one_shot=True)
    
    
    
    In [3]:
    # change these to try this notebook out
    BUCKET = 'asl-ml-immersion-temp'
    PROJECT = 'asl-ml-immersion'
    REGION = 'us-central1'
    
    
    
    
    /usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:50.945296. Please add timezone info to timestamps.
      chunks = self.iterencode(o, _one_shot=True)
    
    
    
    In [4]:
    import os
    os.environ['BUCKET'] = BUCKET
    os.environ['PROJECT'] = PROJECT
    os.environ['REGION'] = REGION
    
    
    
    
    /usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:52.072607. Please add timezone info to timestamps.
      chunks = self.iterencode(o, _one_shot=True)
    
    
    
    In [5]:
    !gcloud config set project $PROJECT
    
    
    
    
    Updated property [core/project].
    
    /usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:53.495436. Please add timezone info to timestamps.
      chunks = self.iterencode(o, _one_shot=True)
    
    
    
    In [4]:
    %%bash
    if ! gsutil ls | grep -q gs://${BUCKET}/; then
      gsutil mb -l ${REGION} gs://${BUCKET}
    fi
    

    Save the query from earlier

    The data is natality data (record of births in the US). My goal is to predict the baby's weight given a number of factors about the pregnancy and the baby's mother. Later, we will want to split the data into training and eval datasets. The hash of the year-month will be used for that.

    
    
    In [6]:
    query="""
    SELECT
      weight_pounds,
      is_male,
      mother_age,
      mother_race,
      plurality,
      gestation_weeks,
      mother_married,
      ever_born,
      cigarette_use,
      alcohol_use,
      FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
    FROM
      publicdata.samples.natality
    WHERE year > 2000
    """
    
    
    
    
    /usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:57.591717. Please add timezone info to timestamps.
      chunks = self.iterencode(o, _one_shot=True)
    
    
    
    In [7]:
    import google.datalab.bigquery as bq
    df = bq.Query(query + " LIMIT 100").execute().result().to_dataframe()
    df.head()
    
    
    
    
    Out[7]:
    weight_pounds is_male mother_age mother_race plurality gestation_weeks mother_married ever_born cigarette_use alcohol_use hashmonth
    0 6.311835 False 18 3 1 39.0 False 2 None False -7146494315947640619
    1 7.500126 False 19 1 1 38.0 True 1 None False -774501970389208065
    2 7.936641 False 19 1 1 40.0 False 1 None False 8904940584331855459
    3 6.999677 False 19 1 1 39.0 False 3 None False -7170969733900686954
    4 9.169025 False 19 1 1 40.0 False 2 None False -7170969733900686954
    /usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:58.567890. Please add timezone info to timestamps.
      chunks = self.iterencode(o, _one_shot=True)
    

    Create ML dataset using tf.transform and Dataflow

    Let's use Cloud Dataflow to read in the BigQuery data and write it out as CSV files. Along the way, let's use tf.transform to do scaling and transforming. Using tf.transform allows us to save the metadata to ensure that the appropriate transformations get carried out during prediction as well.

    Note that after you launch this, the notebook won't show you progress. Go to the GCP webconsole to the Dataflow section and monitor the running job. It took about 30 minutes for me. If you wish to continue without doing this step, you can copy my preprocessed output:

    gsutil -m cp -r gs://asl-ml-immersion/babyweight/preproc_tft gs://your-bucket/
    

    
    
    In [8]:
    %bash
    # makes sure that the version of tensorflow and tensorflow_transform that we are using is on the worker machines
    pip freeze | grep tensorflow-transform > requirements.txt
    cat requirements.txt
    
    
    
    
    tensorflow-transform==0.1.10
    
    /usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:40:12.436963. Please add timezone info to timestamps.
      chunks = self.iterencode(o, _one_shot=True)
    
    
    
    In [9]:
    import datetime
    import apache_beam as beam
    import tensorflow_transform as tft
    from tensorflow_transform.beam import impl as beam_impl
    
    def preprocess_tft(inputs):
        import copy
        import numpy as np
        def center(x):
              return x - tft.mean(x)
        result = copy.copy(inputs) # shallow copy
        result['mother_age_tft'] = center(inputs['mother_age'])
        result['gestation_weeks_centered'] = tft.scale_to_0_1(inputs['gestation_weeks'])
        result['mother_race_tft'] = tft.string_to_int(inputs['mother_race'])
        return result
        #return inputs
    
    def cleanup(rowdict):
        import copy, hashlib
        CSV_COLUMNS = 'weight_pounds,is_male,mother_age,mother_race,plurality,gestation_weeks,mother_married,cigarette_use,alcohol_use'.split(',')
        STR_COLUMNS = 'key,is_male,mother_race,mother_married,cigarette_use,alcohol_use'.split(',')
        FLT_COLUMNS = 'weight_pounds,mother_age,plurality,gestation_weeks'.split(',')
        
        # add any missing columns, and correct the types
        def tofloat(value, ifnot):
          try:
            return float(value)
          except (ValueError, TypeError):
            return ifnot
    
        result = {
          k : str(rowdict[k]) if k in rowdict else 'None' for k in STR_COLUMNS
        }
        result.update({
            k : tofloat(rowdict[k], -99) if k in rowdict else -99 for k in FLT_COLUMNS
          })
        
        # modify opaque numeric race code into human-readable data
        races = dict(zip([1,2,3,4,5,6,7,18,28,39,48],
                         ['White', 'Black', 'American Indian', 'Chinese', 
                          'Japanese', 'Hawaiian', 'Filipino',
                          'Asian Indian', 'Korean', 'Samaon', 'Vietnamese'])) 
        if 'mother_race' in rowdict and rowdict['mother_race'] in races:
          result['mother_race'] = races[rowdict['mother_race']]
        else:
          result['mother_race'] = 'Unknown'    
        
        # cleanup: write out only the data we that we want to train on
        if result['weight_pounds'] > 0 and result['mother_age'] > 0 and result['gestation_weeks'] > 0 and result['plurality'] > 0:
          data = ','.join([str(result[k]) for k in CSV_COLUMNS])
          result['key'] = hashlib.sha224(data).hexdigest()
          yield result 
      
    def preprocess(query, in_test_mode):
      import os
      import os.path
      import tempfile
      from apache_beam.io import tfrecordio
      from tensorflow_transform.coders import example_proto_coder
      from tensorflow_transform.tf_metadata import dataset_metadata
      from tensorflow_transform.tf_metadata import dataset_schema
      from tensorflow_transform.beam.tft_beam_io import transform_fn_io
    
      job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')    
      if in_test_mode:
        import shutil
        print 'Launching local job ... hang on'
        OUTPUT_DIR = './preproc_tft'
        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      else:
        print 'Launching Dataflow job {} ... hang on'.format(job_name)
        OUTPUT_DIR = 'gs://{0}/babyweight/preproc_tft/'.format(BUCKET)
        import subprocess
        subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split())
        
      options = {
        'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
        'job_name': job_name,
        'project': PROJECT,
        'max_num_workers': 24,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True,
        'requirements_file': 'requirements.txt'
      }
      opts = beam.pipeline.PipelineOptions(flags=[], **options)
      if in_test_mode:
        RUNNER = 'DirectRunner'
      else:
        RUNNER = 'DataflowRunner'
    
      # set up metadata  
      raw_data_schema = {
        colname : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation())
                       for colname in 'key,is_male,mother_race,mother_married,cigarette_use,alcohol_use'.split(',')
      }
      raw_data_schema.update({
          colname : dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())
                       for colname in 'weight_pounds,mother_age,plurality,gestation_weeks'.split(',')
        })
      raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))
    
      def read_rawdata(p, step, test_mode):
        if step == 'train':
            selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query)
        else:
            selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)
        if in_test_mode:
            selquery = selquery + ' LIMIT 100'
        #print 'Processing {} data from {}'.format(step, selquery)
        return (p 
              | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=selquery, use_standard_sql=True))
              | '{}_cleanup'.format(step) >> beam.FlatMap(cleanup)
                       )
      
      # run Beam  
      with beam.Pipeline(RUNNER, options=opts) as p:
        with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):
    
          # analyze and transform training       
          raw_data = read_rawdata(p, 'train', in_test_mode)
          raw_dataset = (raw_data, raw_data_metadata)
          transformed_dataset, transform_fn = (
              raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
          transformed_data, transformed_metadata = transformed_dataset
          _ = transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
              os.path.join(OUTPUT_DIR, 'train'),
              coder=example_proto_coder.ExampleProtoCoder(
                  transformed_metadata.schema))
          
          # transform eval data
          raw_test_data = read_rawdata(p, 'eval', in_test_mode)
          raw_test_dataset = (raw_test_data, raw_data_metadata)
          transformed_test_dataset = (
              (raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
          transformed_test_data, _ = transformed_test_dataset
          _ = transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
              os.path.join(OUTPUT_DIR, 'eval'),
              coder=example_proto_coder.ExampleProtoCoder(
                  transformed_metadata.schema))
          _ = (transform_fn
               | 'WriteTransformFn' >>
               transform_fn_io.WriteTransformFn(os.path.join(OUTPUT_DIR, 'metadata')))
    
      job = p.run()
      
    preprocess(query, in_test_mode=False)
    
    
    
    
    /usr/local/lib/python2.7/dist-packages/scipy/ndimage/measurements.py:36: RuntimeWarning: numpy.dtype size changed, may indicate binary incompatibility. Expected 96, got 88
      from . import _ni_label
    /usr/local/lib/python2.7/dist-packages/scipy/ndimage/measurements.py:36: RuntimeWarning: numpy.ufunc size changed, may indicate binary incompatibility. Expected 192, got 176
      from . import _ni_label
    
    Launching Dataflow job preprocess-babyweight-features-171015-184021 ... hang on
    WARNING:tensorflow:From /usr/local/lib/python2.7/dist-packages/tensorflow_transform/mappers.py:305: string_to_index_table_from_tensor (from tensorflow.contrib.lookup.lookup_ops) is deprecated and will be removed after 2017-04-10.
    Instructions for updating:
    Use `index_table_from_tensor`.
    INFO:tensorflow:Assets added to graph.
    INFO:tensorflow:No assets to write.
    INFO:tensorflow:SavedModel written to: gs://asl-ml-immersion-temp/babyweight/preproc_tft/tmp/tftransform_tmp/d9c8204d8ebc439caa6853854675ee09/saved_model.pb
    
    /usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: Any.
      warnings.warn('Using fallback coder for typehint: %r.' % typehint)
    /usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: Union[float, int, long, str, unicode].
      warnings.warn('Using fallback coder for typehint: %r.' % typehint)
    /usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: <type 'NoneType'>.
      warnings.warn('Using fallback coder for typehint: %r.' % typehint)
    
    INFO:tensorflow:Assets added to graph.
    INFO:tensorflow:No assets to write.
    INFO:tensorflow:SavedModel written to: gs://asl-ml-immersion-temp/babyweight/preproc_tft/tmp/tftransform_tmp/32d3edf033e94bbcb99757bf94791270/saved_model.pb
    
    /usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: Dict[str, Union[SparseTensorValue, ndarray]].
      warnings.warn('Using fallback coder for typehint: %r.' % typehint)
    /usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: List[Any].
      warnings.warn('Using fallback coder for typehint: %r.' % typehint)
    
    
    CalledProcessErrorTraceback (most recent call last)
    <ipython-input-9-e25a9a22b6c2> in <module>()
        145   job = p.run()
        146 
    --> 147 preprocess(query, in_test_mode=False)
    
    <ipython-input-9-e25a9a22b6c2> in preprocess(query, in_test_mode)
        141       _ = (transform_fn
        142            | 'WriteTransformFn' >>
    --> 143            transform_fn_io.WriteTransformFn(os.path.join(OUTPUT_DIR, 'metadata')))
        144 
        145   job = p.run()
    
    /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in __exit__(self, exc_type, exc_val, exc_tb)
        333   def __exit__(self, exc_type, exc_val, exc_tb):
        334     if not exc_type:
    --> 335       self.run().wait_until_finish()
        336 
        337   def visit(self, visitor):
    
    /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, test_runner_api)
        326       finally:
        327         shutil.rmtree(tmpdir)
    --> 328     return self.runner.run(self)
        329 
        330   def __enter__(self):
    
    /usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/dataflow_runner.pyc in run(self, pipeline)
        281     # Create the job
        282     result = DataflowPipelineResult(
    --> 283         self.dataflow_client.create_job(self.job), self)
        284 
        285     self._metrics = DataflowMetrics(self.dataflow_client, result, self.job)
    
    /usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.pyc in wrapper(*args, **kwargs)
        166       while True:
        167         try:
    --> 168           return fun(*args, **kwargs)
        169         except Exception as exn:  # pylint: disable=broad-except
        170           if not retry_filter(exn):
    
    /usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/apiclient.pyc in create_job(self, job)
        421   def create_job(self, job):
        422     """Creates job description. May stage and/or submit for remote execution."""
    --> 423     self.create_job_description(job)
        424 
        425     # Stage and submit the job when necessary
    
    /usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/apiclient.pyc in create_job_description(self, job)
        444     """Creates a job described by the workflow proto."""
        445     resources = dependency.stage_job_resources(
    --> 446         job.options, file_copy=self._gcs_file_copy)
        447     job.proto.environment = Environment(
        448         packages=resources, options=job.options,
    
    /usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/dependency.pyc in stage_job_resources(options, file_copy, build_setup_args, temp_dir, populate_requirements_cache)
        325       os.makedirs(requirements_cache_path)
        326     populate_requirements_cache(
    --> 327         setup_options.requirements_file, requirements_cache_path)
        328     for pkg in  glob.glob(os.path.join(requirements_cache_path, '*')):
        329       file_copy(pkg, FileSystems.join(google_cloud_options.staging_location,
    
    /usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/dependency.pyc in _populate_requirements_cache(requirements_file, cache_dir)
        259       '--no-binary', ':all:']
        260   logging.info('Executing command: %s', cmd_args)
    --> 261   processes.check_call(cmd_args)
        262 
        263 
    
    /usr/local/lib/python2.7/dist-packages/apache_beam/utils/processes.pyc in check_call(*args, **kwargs)
         42   if force_shell:
         43     kwargs['shell'] = True
    ---> 44   return subprocess.check_call(*args, **kwargs)
         45 
         46 
    
    /usr/lib/python2.7/subprocess.pyc in check_call(*popenargs, **kwargs)
        539         if cmd is None:
        540             cmd = popenargs[0]
    --> 541         raise CalledProcessError(retcode, cmd)
        542     return 0
        543 
    
    CalledProcessError: Command '['/usr/bin/python', '-m', 'pip', 'install', '--download', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--no-binary', ':all:']' returned non-zero exit status 1
    /usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:40:19.177949. Please add timezone info to timestamps.
      chunks = self.iterencode(o, _one_shot=True)
    
    
    
    In [ ]:
    %bash
    gsutil ls gs://${BUCKET}/babyweight/preproc_tft/*-00000*
    

    Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License