Preprocessing using Dataflow

This notebook illustrates:

  1. Creating datasets for Machine Learning using Dataflow </ol>
  2. While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.

    !sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst
    pip install --user apache-beam[gcp]

    Run the command again if you are getting oauth2client error.

    Note: You may ignore the following responses in the cell output above:

    ERROR (in Red text) related to: witwidget-gpu, fairing

    WARNING (in Yellow text) related to: hdfscli, hdfscli-avro, pbr, fastavro, gen_client

    Restart the kernel before proceeding further.

    Make sure the Dataflow API is enabled by going to this link. Ensure that you've installed Beam by importing it and printing the version number.

    # Ensure the right version of Tensorflow is installed.
    !pip freeze | grep tensorflow==2.1
    import apache_beam as beam

    You may receive a UserWarning about the Apache Beam SDK for Python 3 as not being yet fully supported. Don't worry about this.

    # change these to try this notebook out
    BUCKET = 'cloud-training-demos-ml'
    PROJECT = 'cloud-training-demos'
    REGION = 'us-central1'
    import os
    os.environ['BUCKET'] = BUCKET
    os.environ['PROJECT'] = PROJECT
    os.environ['REGION'] = REGION
    if ! gsutil ls | grep -q gs://${BUCKET}/; then
      gsutil mb -l ${REGION} gs://${BUCKET}

    Create ML dataset using Dataflow

    Let's use Cloud Dataflow to read in the BigQuery data, do some preprocessing, and write it out as CSV files. In this case, I want to do some preprocessing, modifying data so that we can simulate what is known if no ultrasound has been performed. Note that after you launch this, the actual processing is happening on the cloud. Go to the GCP webconsole to the Dataflow section and monitor the running job. It took about 20 minutes for me.

    If you wish to continue without doing this step, you can copy my preprocessed output:

    gsutil -m cp -r gs://cloud-training-demos/babyweight/preproc gs://your-bucket/
    But if you do this, you also have to use my TensorFlow model since yours might expect the fields in a different order

    import datetime, os
    def to_csv(rowdict):
      import hashlib
      import copy
      # TODO #1:
      # Pull columns from BQ and create line(s) of CSV input
      CSV_COLUMNS = None
      # Create synthetic data where we assume that no ultrasound has been performed
      # and so we don't know sex of the baby. Let's assume that we can tell the difference
      # between single and multiple, but that the errors rates in determining exact number
      # is difficult in the absence of an ultrasound.
      no_ultrasound = copy.deepcopy(rowdict)
      w_ultrasound = copy.deepcopy(rowdict)
      no_ultrasound['is_male'] = 'Unknown'
      if rowdict['plurality'] > 1:
        no_ultrasound['plurality'] = 'Multiple(2+)'
        no_ultrasound['plurality'] = 'Single(1)'
      # Change the plurality column to strings
      w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality'] - 1]
      # Write out two rows for each input row, one with ultrasound and one without
      for result in [no_ultrasound, w_ultrasound]:
        data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
        key = hashlib.sha224(data.encode('utf-8')).hexdigest()  # hash the columns to form a key
        yield str('{},{}'.format(data, key))
    def preprocess(in_test_mode):
      import shutil, os, subprocess
      job_name = 'preprocess-babyweight-features' + '-' +'%y%m%d-%H%M%S')
      if in_test_mode:
          print('Launching local job ... hang on')
          OUTPUT_DIR = './preproc'
          shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
          print('Launching Dataflow job {} ... hang on'.format(job_name))
          OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)
            subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
      options = {
          'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
          'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
          'job_name': job_name,
          'project': PROJECT,
          'region': REGION,
          'teardown_policy': 'TEARDOWN_ALWAYS',
          'no_save_main_session': True,
          'num_workers': 4,
          'max_num_workers': 5
      opts = beam.pipeline.PipelineOptions(flags = [], **options)
      if in_test_mode:
          RUNNER = 'DirectRunner'
          RUNNER = 'DataflowRunner'
      p = beam.Pipeline(RUNNER, options = opts)
      query = """
    WHERE year > 2000
    AND weight_pounds > 0
    AND mother_age > 0
    AND plurality > 0
    AND gestation_weeks > 0
    AND month > 0
      if in_test_mode:
        query = query + ' LIMIT 100' 
      for step in ['train', 'eval']:
        if step == 'train':
          selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query)
          selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)
         ## TODO Task #2: Modify the Apache Beam pipeline such that the first part of the pipe reads the data from BigQuery
         | '{}_read'.format(step) >> None 
         | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
         | '{}_out'.format(step) >>, '{}.csv'.format(step))))
      job =
      if in_test_mode:
    # TODO  Task #3: Once you have verified that the files produced locally are correct, change in_test_mode to False
    #               to execute this in Cloud Dataflow
    preprocess(in_test_mode = False)

    The above step will take 20+ minutes. Go to the GCP web console, navigate to the Dataflow section and wait for the job to finish before you run the following step.

    gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*

