Lab: TfTransform

Learning Objectives

  1. Preproccess data and engineer new features using TfTransform
  2. Create and deploy Apache Beam pipeline
  3. Use processed data to train taxifare model locally then serve a prediction

Introduction

While Pandas is fine for experimenting, for operationalization of your workflow it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam allows for streaming. In this lab we will pull data from BigQuery then use Apache Beam TfTransform to process the data.

Only specific combinations of TensorFlow/Beam are supported by tf.transform so make sure to get a combo that works. In this lab we will be using:

  • TFT 0.15.0
  • TF 2.0
  • Apache Beam [GCP] 2.16.0

In [ ]:
!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst

In [1]:
!pip install --user apache-beam[gcp]==2.16.0 
!pip install --user tensorflow-transform==0.15.0


Collecting apache-beam[gcp]==2.16.0
  Downloading https://files.pythonhosted.org/packages/0f/15/a8065042472311383f34d94fe3ff611cc7ab092b0bf502ad097acb7406e3/apache_beam-2.16.0-cp35-cp35m-manylinux1_x86_64.whl (2.9MB)
     |████████████████████████████████| 2.9MB 4.8MB/s eta 0:00:01
Requirement already satisfied: future<1.0.0,>=0.16.0 in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]==2.16.0) (0.18.2)
Requirement already satisfied: grpcio<2,>=1.12.1 in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]==2.16.0) (1.25.0)
Collecting hdfs<3.0.0,>=2.1.0
  Downloading https://files.pythonhosted.org/packages/82/39/2c0879b1bcfd1f6ad078eb210d09dbce21072386a3997074ee91e60ddc5a/hdfs-2.5.8.tar.gz (41kB)
     |████████████████████████████████| 51kB 7.2MB/s  eta 0:00:01
Collecting pyyaml<4.0.0,>=3.12
  Downloading https://files.pythonhosted.org/packages/9e/a3/1d13970c3f36777c583f136c136f804d70f500168edc1edea6daa7200769/PyYAML-3.13.tar.gz (270kB)
     |████████████████████████████████| 276kB 27.6MB/s eta 0:00:01
Collecting httplib2<=0.12.0,>=0.8
  Downloading https://files.pythonhosted.org/packages/ce/ed/803905d670b52fa0edfdd135337e545b4496c2ab3a222f1449b7256eb99f/httplib2-0.12.0.tar.gz (218kB)
     |████████████████████████████████| 225kB 49.4MB/s eta 0:00:01
Collecting oauth2client<4,>=2.0.1
  Downloading https://files.pythonhosted.org/packages/c0/7b/bc893e35d6ca46a72faa4b9eaac25c687ce60e1fbe978993fe2de1b0ff0d/oauth2client-3.0.0.tar.gz (77kB)
     |████████████████████████████████| 81kB 10.5MB/s eta 0:00:01
Collecting pymongo<4.0.0,>=3.8.0
  Downloading https://files.pythonhosted.org/packages/ba/9b/0b6989718669209b2e3451860ac098471891626f48636f8cca6a449e09ea/pymongo-3.10.0-cp35-cp35m-manylinux2014_x86_64.whl (459kB)
     |████████████████████████████████| 460kB 34.3MB/s eta 0:00:01
Requirement already satisfied: protobuf<4,>=3.5.0.post1 in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]==2.16.0) (3.10.0)
Collecting crcmod<2.0,>=1.7
  Downloading https://files.pythonhosted.org/packages/6b/b0/e595ce2a2527e169c3bcd6c33d2473c1918e0b7f6826a043ca1245dd4e5b/crcmod-1.7.tar.gz (89kB)
     |████████████████████████████████| 92kB 10.2MB/s eta 0:00:01
Collecting dill<0.3.1,>=0.3.0
  Downloading https://files.pythonhosted.org/packages/39/7a/70803635c850e351257029089d38748516a280864c97cbc73087afef6d51/dill-0.3.0.tar.gz (151kB)
     |████████████████████████████████| 153kB 29.2MB/s eta 0:00:01
Collecting fastavro<0.22,>=0.21.4
  Downloading https://files.pythonhosted.org/packages/ac/7d/e63a1ba78326e42a69bda88b1fcfca22ddd773c4cc51ae85b3b869abcff2/fastavro-0.21.24-cp35-cp35m-manylinux1_x86_64.whl (1.2MB)
     |████████████████████████████████| 1.2MB 18.2MB/s eta 0:00:01
Requirement already satisfied: pydot<2,>=1.2.0 in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]==2.16.0) (1.4.1)
Collecting pyarrow<0.15.0,>=0.11.1; python_version >= "3.0" or platform_system != "Windows"
  Downloading https://files.pythonhosted.org/packages/54/95/bcbe5658d6ac65af35996a80ed66d82c50f9c0b36424f4758cd54dd08d73/pyarrow-0.14.1-cp35-cp35m-manylinux2010_x86_64.whl (58.1MB)
     |████████████████████████████████| 58.1MB 127kB/s  eta 0:00:01     |█████████▋                      | 17.5MB 25.5MB/s eta 0:00:02
Requirement already satisfied: python-dateutil<3,>=2.8.0 in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]==2.16.0) (2.8.1)
Collecting mock<3.0.0,>=1.0.1
  Downloading https://files.pythonhosted.org/packages/e6/35/f187bdf23be87092bd0f1200d43d23076cee4d0dec109f195173fd3ebc79/mock-2.0.0-py2.py3-none-any.whl (56kB)
     |████████████████████████████████| 61kB 8.9MB/s  eta 0:00:01
Requirement already satisfied: pytz>=2018.3 in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]==2.16.0) (2019.3)
Collecting avro-python3<2.0.0,>=1.8.1; python_version >= "3.0"
  Downloading https://files.pythonhosted.org/packages/76/b2/98a736a31213d3e281a62bcae5572cf297d2546bc429accf36f9ee1604bf/avro-python3-1.9.1.tar.gz
Collecting google-cloud-bigquery<1.18.0,>=1.6.0; extra == "gcp"
  Downloading https://files.pythonhosted.org/packages/a4/96/1b9cf1d43869c47a205aad411dac7c3040df6093d63c39273fa4d4c45da7/google_cloud_bigquery-1.17.1-py2.py3-none-any.whl (142kB)
     |████████████████████████████████| 143kB 34.3MB/s eta 0:00:01
Requirement already satisfied: cachetools<4,>=3.1.0; extra == "gcp" in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]==2.16.0) (3.1.1)
Collecting google-apitools<0.5.29,>=0.5.28; extra == "gcp"
  Downloading https://files.pythonhosted.org/packages/7f/32/df3e36fd705a00092f1ffa9f41ce1df8dcb594ae313d239b87861a41fc2e/google-apitools-0.5.28.tar.gz (172kB)
     |████████████████████████████████| 174kB 34.5MB/s eta 0:00:01
Collecting google-cloud-pubsub<1.1.0,>=0.39.0; extra == "gcp"
  Downloading https://files.pythonhosted.org/packages/d3/91/07a82945a7396ea34debafd476724bb5fc267c292790fdf2138c693f95c5/google_cloud_pubsub-1.0.2-py2.py3-none-any.whl (118kB)
     |████████████████████████████████| 122kB 36.9MB/s eta 0:00:01
Requirement already satisfied: google-cloud-core<2,>=0.28.1; extra == "gcp" in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]==2.16.0) (1.0.3)
Collecting google-cloud-bigtable<1.1.0,>=0.31.1; extra == "gcp"
  Downloading https://files.pythonhosted.org/packages/95/af/0ef7d097a1d5ad0c843867600e86de915e8ab8864740f49a4636cfb51af6/google_cloud_bigtable-1.0.0-py2.py3-none-any.whl (232kB)
     |████████████████████████████████| 235kB 35.4MB/s eta 0:00:01
Collecting google-cloud-datastore<1.8.0,>=1.7.1; extra == "gcp"
  Downloading https://files.pythonhosted.org/packages/d0/aa/29cbcf8cf7d08ce2d55b9dce858f7c632b434cb6451bed17cb4275804217/google_cloud_datastore-1.7.4-py2.py3-none-any.whl (82kB)
     |████████████████████████████████| 92kB 11.3MB/s eta 0:00:01
Requirement already satisfied: six>=1.5.2 in /usr/local/lib/python3.5/dist-packages (from grpcio<2,>=1.12.1->apache-beam[gcp]==2.16.0) (1.13.0)
Collecting docopt
  Downloading https://files.pythonhosted.org/packages/a2/55/8f8cab2afd404cf578136ef2cc5dfb50baa1761b68c9da1fb1e4eed343c9/docopt-0.6.2.tar.gz
Requirement already satisfied: requests>=2.7.0 in /usr/local/lib/python3.5/dist-packages (from hdfs<3.0.0,>=2.1.0->apache-beam[gcp]==2.16.0) (2.22.0)
Requirement already satisfied: pyasn1>=0.1.7 in /usr/local/lib/python3.5/dist-packages (from oauth2client<4,>=2.0.1->apache-beam[gcp]==2.16.0) (0.4.7)
Requirement already satisfied: pyasn1-modules>=0.0.5 in /usr/local/lib/python3.5/dist-packages (from oauth2client<4,>=2.0.1->apache-beam[gcp]==2.16.0) (0.2.7)
Requirement already satisfied: rsa>=3.1.4 in /usr/local/lib/python3.5/dist-packages (from oauth2client<4,>=2.0.1->apache-beam[gcp]==2.16.0) (4.0)
Requirement already satisfied: setuptools in /usr/local/lib/python3.5/dist-packages (from protobuf<4,>=3.5.0.post1->apache-beam[gcp]==2.16.0) (41.6.0)
Requirement already satisfied: pyparsing>=2.1.4 in /usr/local/lib/python3.5/dist-packages (from pydot<2,>=1.2.0->apache-beam[gcp]==2.16.0) (2.4.5)
Requirement already satisfied: numpy>=1.14 in /usr/local/lib/python3.5/dist-packages (from pyarrow<0.15.0,>=0.11.1; python_version >= "3.0" or platform_system != "Windows"->apache-beam[gcp]==2.16.0) (1.17.4)
Collecting pbr>=0.11
  Downloading https://files.pythonhosted.org/packages/7a/db/a968fd7beb9fe06901c1841cb25c9ccb666ca1b9a19b114d1bbedf1126fc/pbr-5.4.4-py2.py3-none-any.whl (110kB)
     |████████████████████████████████| 112kB 42.9MB/s eta 0:00:01
Requirement already satisfied: google-resumable-media<0.5.0dev,>=0.3.1 in /usr/local/lib/python3.5/dist-packages (from google-cloud-bigquery<1.18.0,>=1.6.0; extra == "gcp"->apache-beam[gcp]==2.16.0) (0.4.1)
Collecting fasteners>=0.14
  Downloading https://files.pythonhosted.org/packages/18/bd/55eb2d6397b9c0e263af9d091ebdb756b15756029b3cededf6461481bc63/fasteners-0.15-py2.py3-none-any.whl
Requirement already satisfied: grpc-google-iam-v1<0.13dev,>=0.12.3 in /usr/local/lib/python3.5/dist-packages (from google-cloud-pubsub<1.1.0,>=0.39.0; extra == "gcp"->apache-beam[gcp]==2.16.0) (0.12.3)
Requirement already satisfied: google-api-core[grpc]<2.0.0dev,>=1.14.0 in /usr/local/lib/python3.5/dist-packages (from google-cloud-pubsub<1.1.0,>=0.39.0; extra == "gcp"->apache-beam[gcp]==2.16.0) (1.14.3)
Requirement already satisfied: chardet<3.1.0,>=3.0.2 in /usr/local/lib/python3.5/dist-packages (from requests>=2.7.0->hdfs<3.0.0,>=2.1.0->apache-beam[gcp]==2.16.0) (3.0.4)
Requirement already satisfied: idna<2.9,>=2.5 in /usr/local/lib/python3.5/dist-packages (from requests>=2.7.0->hdfs<3.0.0,>=2.1.0->apache-beam[gcp]==2.16.0) (2.8)
Requirement already satisfied: urllib3!=1.25.0,!=1.25.1,<1.26,>=1.21.1 in /usr/local/lib/python3.5/dist-packages (from requests>=2.7.0->hdfs<3.0.0,>=2.1.0->apache-beam[gcp]==2.16.0) (1.24.2)
Requirement already satisfied: certifi>=2017.4.17 in /usr/local/lib/python3.5/dist-packages (from requests>=2.7.0->hdfs<3.0.0,>=2.1.0->apache-beam[gcp]==2.16.0) (2019.9.11)
Collecting monotonic>=0.1
  Downloading https://files.pythonhosted.org/packages/ac/aa/063eca6a416f397bd99552c534c6d11d57f58f2e94c14780f3bbf818c4cf/monotonic-1.5-py2.py3-none-any.whl
Requirement already satisfied: googleapis-common-protos[grpc]<2.0.0dev,>=1.5.2 in /usr/local/lib/python3.5/dist-packages (from grpc-google-iam-v1<0.13dev,>=0.12.3->google-cloud-pubsub<1.1.0,>=0.39.0; extra == "gcp"->apache-beam[gcp]==2.16.0) (1.6.0)
Requirement already satisfied: google-auth<2.0dev,>=0.4.0 in /usr/local/lib/python3.5/dist-packages (from google-api-core[grpc]<2.0.0dev,>=1.14.0->google-cloud-pubsub<1.1.0,>=0.39.0; extra == "gcp"->apache-beam[gcp]==2.16.0) (1.7.0)
Building wheels for collected packages: hdfs, pyyaml, httplib2, oauth2client, crcmod, dill, avro-python3, google-apitools, docopt
  Building wheel for hdfs (setup.py) ... done
  Created wheel for hdfs: filename=hdfs-2.5.8-cp35-none-any.whl size=34732 sha256=a8448b854ff8d67f4f1abe0fc4b5717fab60b134da2cd08fb50c8619e918898b
  Stored in directory: /home/jupyter/.cache/pip/wheels/fe/a7/05/23e3699975fc20f8a30e00ac1e515ab8c61168e982abe4ce70
  Building wheel for pyyaml (setup.py) ... done
  Created wheel for pyyaml: filename=PyYAML-3.13-cp35-cp35m-linux_x86_64.whl size=43458 sha256=b37180f052fa3672e7e272c7b1aedc3a67ef22a5a353ed0499ef885a0b585d9d
  Stored in directory: /home/jupyter/.cache/pip/wheels/ad/da/0c/74eb680767247273e2cf2723482cb9c924fe70af57c334513f
  Building wheel for httplib2 (setup.py) ... done
  Created wheel for httplib2: filename=httplib2-0.12.0-cp35-none-any.whl size=95120 sha256=e52a79c0587dd9e96bf548abfc9d9b00473d1c03d358fa48f9eae7a919b3de5c
  Stored in directory: /home/jupyter/.cache/pip/wheels/6d/41/4b/2b369d6e2b7eaebcdd423516d3fb659c7658c16a2be8fd04ec
  Building wheel for oauth2client (setup.py) ... done
  Created wheel for oauth2client: filename=oauth2client-3.0.0-cp35-none-any.whl size=107377 sha256=bc805ed0b69d85d30d6528c26267b3729ede195b7a634fec54f1e9a74c84de64
  Stored in directory: /home/jupyter/.cache/pip/wheels/48/f7/87/b932f09c6335dbcf45d916937105a372ab14f353a9ca431d7d
  Building wheel for crcmod (setup.py) ... done
  Created wheel for crcmod: filename=crcmod-1.7-cp35-cp35m-linux_x86_64.whl size=37319 sha256=63377b1212868211802340731e4a89af6034028cbf348fb269651061ec048384
  Stored in directory: /home/jupyter/.cache/pip/wheels/50/24/4d/4580ca4a299f1ad6fd63443e6e584cb21e9a07988e4aa8daac
  Building wheel for dill (setup.py) ... done
  Created wheel for dill: filename=dill-0.3.0-cp35-none-any.whl size=79806 sha256=bc4a5f86d4392ab0ca222f7c0160e9867d0071463952899570a27640a799a9a6
  Stored in directory: /home/jupyter/.cache/pip/wheels/c9/de/a4/a91eec4eea652104d8c81b633f32ead5eb57d1b294eab24167
  Building wheel for avro-python3 (setup.py) ... done
  Created wheel for avro-python3: filename=avro_python3-1.9.1-cp35-none-any.whl size=44047 sha256=55a6db6d93eeca295cccf0c819089c883bd0cb2a74dd6d24661dc0bb4dfc17a1
  Stored in directory: /home/jupyter/.cache/pip/wheels/94/54/6f/a5df680fd3224aa45145686f3b1b02a878a90ea769fcf9daaf
  Building wheel for google-apitools (setup.py) ... done
  Created wheel for google-apitools: filename=google_apitools-0.5.28-cp35-none-any.whl size=131644 sha256=f9613af74d18c94dedec1af138b83a7104a6ac7b6933d52f2ccae964e3922b28
  Stored in directory: /home/jupyter/.cache/pip/wheels/d6/c2/92/837e8a4d649a209dff85b38d7fbb576b4b480738be70865f29
  Building wheel for docopt (setup.py) ... done
  Created wheel for docopt: filename=docopt-0.6.2-py2.py3-none-any.whl size=19851 sha256=2a769e99810177d6f22f43c010e7e3b689577d7adf8f4cc575038a7b8405be69
  Stored in directory: /home/jupyter/.cache/pip/wheels/9b/04/dd/7daf4150b6d9b12949298737de9431a324d4b797ffd63f526e
Successfully built hdfs pyyaml httplib2 oauth2client crcmod dill avro-python3 google-apitools docopt
ERROR: witwidget-gpu 1.5.0 requires tensorflow-gpu>=1.12.0, which is not installed.
ERROR: witwidget-gpu 1.5.0 requires tensorflow-serving-api-gpu>=1.12.0, which is not installed.
ERROR: witwidget-gpu 1.5.0 has requirement oauth2client>=4.1.3, but you'll have oauth2client 3.0.0 which is incompatible.
ERROR: fairing 0.5.3 has requirement oauth2client>=4.0.0, but you'll have oauth2client 3.0.0 which is incompatible.
Installing collected packages: docopt, hdfs, pyyaml, httplib2, oauth2client, pymongo, crcmod, dill, fastavro, pyarrow, pbr, mock, avro-python3, google-cloud-bigquery, monotonic, fasteners, google-apitools, google-cloud-pubsub, google-cloud-bigtable, google-cloud-datastore, apache-beam
  WARNING: The scripts hdfscli and hdfscli-avro are installed in '/home/jupyter/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
  WARNING: The script fastavro is installed in '/home/jupyter/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
  WARNING: The script plasma_store is installed in '/home/jupyter/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
  WARNING: The script pbr is installed in '/home/jupyter/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
  WARNING: The script gen_client is installed in '/home/jupyter/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
Successfully installed apache-beam-2.16.0 avro-python3-1.9.1 crcmod-1.7 dill-0.3.0 docopt-0.6.2 fastavro-0.21.24 fasteners-0.15 google-apitools-0.5.28 google-cloud-bigquery-1.17.1 google-cloud-bigtable-1.0.0 google-cloud-datastore-1.7.4 google-cloud-pubsub-1.0.2 hdfs-2.5.8 httplib2-0.12.0 mock-2.0.0 monotonic-1.5 oauth2client-3.0.0 pbr-5.4.4 pyarrow-0.14.1 pymongo-3.10.0 pyyaml-3.13
Collecting tensorflow-transform==0.15.0
  Downloading https://files.pythonhosted.org/packages/34/88/9ee55045a1ffbf44fb75b10a30c54609f58987987f69ace9b971938e750d/tensorflow-transform-0.15.0.tar.gz (222kB)
     |████████████████████████████████| 225kB 4.9MB/s eta 0:00:01
Requirement already satisfied: absl-py<0.9,>=0.7 in /usr/local/lib/python3.5/dist-packages (from tensorflow-transform==0.15.0) (0.8.1)
Requirement already satisfied: apache-beam[gcp]<3,>=2.16 in /home/jupyter/.local/lib/python3.5/site-packages (from tensorflow-transform==0.15.0) (2.16.0)
Requirement already satisfied: numpy<2,>=1.16 in /usr/local/lib/python3.5/dist-packages (from tensorflow-transform==0.15.0) (1.17.4)
Requirement already satisfied: protobuf<4,>=3.7 in /usr/local/lib/python3.5/dist-packages (from tensorflow-transform==0.15.0) (3.10.0)
Requirement already satisfied: pydot<2,>=1.2 in /usr/local/lib/python3.5/dist-packages (from tensorflow-transform==0.15.0) (1.4.1)
Requirement already satisfied: six<2,>=1.10 in /usr/local/lib/python3.5/dist-packages (from tensorflow-transform==0.15.0) (1.13.0)
Requirement already satisfied: tensorflow-metadata<0.16,>=0.15 in /usr/local/lib/python3.5/dist-packages (from tensorflow-transform==0.15.0) (0.15.1)
Requirement already satisfied: tensorflow<2.2,>=1.15 in /usr/local/lib/python3.5/dist-packages (from tensorflow-transform==0.15.0) (2.0.0)
Collecting tfx-bsl<0.16,>=0.15
  Downloading https://files.pythonhosted.org/packages/3b/0c/56d8e5b36532f33d631afb665f85181299d672e332ae309db49ea2023405/tfx_bsl-0.15.3-cp35-cp35m-manylinux2010_x86_64.whl (1.9MB)
     |████████████████████████████████| 1.9MB 48.0MB/s eta 0:00:01
Requirement already satisfied: pymongo<4.0.0,>=3.8.0 in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (3.10.0)
Requirement already satisfied: pyarrow<0.15.0,>=0.11.1; python_version >= "3.0" or platform_system != "Windows" in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.14.1)
Requirement already satisfied: dill<0.3.1,>=0.3.0 in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.3.0)
Requirement already satisfied: avro-python3<2.0.0,>=1.8.1; python_version >= "3.0" in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (1.9.1)
Requirement already satisfied: httplib2<=0.12.0,>=0.8 in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.12.0)
Requirement already satisfied: future<1.0.0,>=0.16.0 in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.18.2)
Requirement already satisfied: grpcio<2,>=1.12.1 in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (1.25.0)
Requirement already satisfied: python-dateutil<3,>=2.8.0 in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (2.8.1)
Requirement already satisfied: fastavro<0.22,>=0.21.4 in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.21.24)
Requirement already satisfied: pytz>=2018.3 in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (2019.3)
Requirement already satisfied: hdfs<3.0.0,>=2.1.0 in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (2.5.8)
Requirement already satisfied: crcmod<2.0,>=1.7 in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (1.7)
Requirement already satisfied: pyyaml<4.0.0,>=3.12 in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (3.13)
Requirement already satisfied: oauth2client<4,>=2.0.1 in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (3.0.0)
Requirement already satisfied: mock<3.0.0,>=1.0.1 in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (2.0.0)
Requirement already satisfied: google-cloud-bigquery<1.18.0,>=1.6.0; extra == "gcp" in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (1.17.1)
Requirement already satisfied: google-cloud-core<2,>=0.28.1; extra == "gcp" in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (1.0.3)
Requirement already satisfied: google-cloud-bigtable<1.1.0,>=0.31.1; extra == "gcp" in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (1.0.0)
Requirement already satisfied: google-cloud-datastore<1.8.0,>=1.7.1; extra == "gcp" in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (1.7.4)
Requirement already satisfied: google-cloud-pubsub<1.1.0,>=0.39.0; extra == "gcp" in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (1.0.2)
Requirement already satisfied: cachetools<4,>=3.1.0; extra == "gcp" in /usr/local/lib/python3.5/dist-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (3.1.1)
Requirement already satisfied: google-apitools<0.5.29,>=0.5.28; extra == "gcp" in /home/jupyter/.local/lib/python3.5/site-packages (from apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.5.28)
Requirement already satisfied: setuptools in /usr/local/lib/python3.5/dist-packages (from protobuf<4,>=3.7->tensorflow-transform==0.15.0) (41.6.0)
Requirement already satisfied: pyparsing>=2.1.4 in /usr/local/lib/python3.5/dist-packages (from pydot<2,>=1.2->tensorflow-transform==0.15.0) (2.4.5)
Requirement already satisfied: googleapis-common-protos in /usr/local/lib/python3.5/dist-packages (from tensorflow-metadata<0.16,>=0.15->tensorflow-transform==0.15.0) (1.6.0)
Requirement already satisfied: astor>=0.6.0 in /usr/local/lib/python3.5/dist-packages (from tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (0.8.0)
Requirement already satisfied: tensorboard<2.1.0,>=2.0.0 in /usr/local/lib/python3.5/dist-packages (from tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (2.0.1)
Requirement already satisfied: tensorflow-estimator<2.1.0,>=2.0.0 in /usr/local/lib/python3.5/dist-packages (from tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (2.0.0)
Requirement already satisfied: opt-einsum>=2.3.2 in /usr/local/lib/python3.5/dist-packages (from tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (3.1.0)
Requirement already satisfied: google-pasta>=0.1.6 in /usr/local/lib/python3.5/dist-packages (from tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (0.1.8)
Requirement already satisfied: wheel>=0.26 in /usr/lib/python3/dist-packages (from tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (0.29.0)
Requirement already satisfied: keras-preprocessing>=1.0.5 in /usr/local/lib/python3.5/dist-packages (from tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (1.1.0)
Requirement already satisfied: gast==0.2.2 in /usr/local/lib/python3.5/dist-packages (from tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (0.2.2)
Requirement already satisfied: termcolor>=1.1.0 in /usr/local/lib/python3.5/dist-packages (from tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (1.1.0)
Requirement already satisfied: wrapt>=1.11.1 in /usr/local/lib/python3.5/dist-packages (from tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (1.11.2)
Requirement already satisfied: keras-applications>=1.0.8 in /usr/local/lib/python3.5/dist-packages (from tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (1.0.8)
Collecting tensorflow-serving-api<3,>=1.15
  Downloading https://files.pythonhosted.org/packages/7b/91/2abebef1df17ee6d13aecf32e0f258dfcf62437ab989957b2a62d24f8630/tensorflow_serving_api-2.0.0-py2.py3-none-any.whl
Requirement already satisfied: psutil<6,>=5.6 in /usr/local/lib/python3.5/dist-packages (from tfx-bsl<0.16,>=0.15->tensorflow-transform==0.15.0) (5.6.5)
Requirement already satisfied: requests>=2.7.0 in /usr/local/lib/python3.5/dist-packages (from hdfs<3.0.0,>=2.1.0->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (2.22.0)
Requirement already satisfied: docopt in /home/jupyter/.local/lib/python3.5/site-packages (from hdfs<3.0.0,>=2.1.0->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.6.2)
Requirement already satisfied: rsa>=3.1.4 in /usr/local/lib/python3.5/dist-packages (from oauth2client<4,>=2.0.1->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (4.0)
Requirement already satisfied: pyasn1>=0.1.7 in /usr/local/lib/python3.5/dist-packages (from oauth2client<4,>=2.0.1->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.4.7)
Requirement already satisfied: pyasn1-modules>=0.0.5 in /usr/local/lib/python3.5/dist-packages (from oauth2client<4,>=2.0.1->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.2.7)
Requirement already satisfied: pbr>=0.11 in /home/jupyter/.local/lib/python3.5/site-packages (from mock<3.0.0,>=1.0.1->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (5.4.4)
Requirement already satisfied: google-resumable-media<0.5.0dev,>=0.3.1 in /usr/local/lib/python3.5/dist-packages (from google-cloud-bigquery<1.18.0,>=1.6.0; extra == "gcp"->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.4.1)
Requirement already satisfied: google-api-core<2.0.0dev,>=1.14.0 in /usr/local/lib/python3.5/dist-packages (from google-cloud-core<2,>=0.28.1; extra == "gcp"->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (1.14.3)
Requirement already satisfied: grpc-google-iam-v1<0.13dev,>=0.12.3 in /usr/local/lib/python3.5/dist-packages (from google-cloud-bigtable<1.1.0,>=0.31.1; extra == "gcp"->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.12.3)
Requirement already satisfied: fasteners>=0.14 in /home/jupyter/.local/lib/python3.5/site-packages (from google-apitools<0.5.29,>=0.5.28; extra == "gcp"->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (0.15)
Requirement already satisfied: google-auth-oauthlib<0.5,>=0.4.1 in /usr/local/lib/python3.5/dist-packages (from tensorboard<2.1.0,>=2.0.0->tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (0.4.1)
Requirement already satisfied: google-auth<2,>=1.6.3 in /usr/local/lib/python3.5/dist-packages (from tensorboard<2.1.0,>=2.0.0->tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (1.7.0)
Requirement already satisfied: markdown>=2.6.8 in /usr/local/lib/python3.5/dist-packages (from tensorboard<2.1.0,>=2.0.0->tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (3.1.1)
Requirement already satisfied: werkzeug>=0.11.15 in /usr/local/lib/python3.5/dist-packages (from tensorboard<2.1.0,>=2.0.0->tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (0.16.0)
Requirement already satisfied: h5py in /usr/local/lib/python3.5/dist-packages (from keras-applications>=1.0.8->tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (2.10.0)
Requirement already satisfied: idna<2.9,>=2.5 in /usr/local/lib/python3.5/dist-packages (from requests>=2.7.0->hdfs<3.0.0,>=2.1.0->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (2.8)
Requirement already satisfied: chardet<3.1.0,>=3.0.2 in /usr/local/lib/python3.5/dist-packages (from requests>=2.7.0->hdfs<3.0.0,>=2.1.0->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (3.0.4)
Requirement already satisfied: urllib3!=1.25.0,!=1.25.1,<1.26,>=1.21.1 in /usr/local/lib/python3.5/dist-packages (from requests>=2.7.0->hdfs<3.0.0,>=2.1.0->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (1.24.2)
Requirement already satisfied: certifi>=2017.4.17 in /usr/local/lib/python3.5/dist-packages (from requests>=2.7.0->hdfs<3.0.0,>=2.1.0->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (2019.9.11)
Requirement already satisfied: monotonic>=0.1 in /home/jupyter/.local/lib/python3.5/site-packages (from fasteners>=0.14->google-apitools<0.5.29,>=0.5.28; extra == "gcp"->apache-beam[gcp]<3,>=2.16->tensorflow-transform==0.15.0) (1.5)
Requirement already satisfied: requests-oauthlib>=0.7.0 in /usr/local/lib/python3.5/dist-packages (from google-auth-oauthlib<0.5,>=0.4.1->tensorboard<2.1.0,>=2.0.0->tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (1.3.0)
Requirement already satisfied: oauthlib>=3.0.0 in /usr/local/lib/python3.5/dist-packages (from requests-oauthlib>=0.7.0->google-auth-oauthlib<0.5,>=0.4.1->tensorboard<2.1.0,>=2.0.0->tensorflow<2.2,>=1.15->tensorflow-transform==0.15.0) (3.1.0)
Building wheels for collected packages: tensorflow-transform
  Building wheel for tensorflow-transform (setup.py) ... done
  Created wheel for tensorflow-transform: filename=tensorflow_transform-0.15.0-cp35-none-any.whl size=283295 sha256=470487a0e0d5f37d8aa99c0d96b9ac7842d0afd0fd1c4a834105a867d47b9181
  Stored in directory: /home/jupyter/.cache/pip/wheels/18/62/08/7b4aee4bd80bd969f9c9c653556b0c8732c9c1fbff18a2b26d
Successfully built tensorflow-transform
Installing collected packages: tensorflow-serving-api, tfx-bsl, tensorflow-transform
Successfully installed tensorflow-serving-api-2.0.0 tensorflow-transform-0.15.0 tfx-bsl-0.15.3

NOTE: You may ignore specific incompatibility errors and warnings. These components and issues do not impact your ability to complete the lab. Download .whl file for tensorflow-transform. We will pass this file to Beam Pipeline Options so it is installed on the DataFlow workers


In [2]:
!pip download tensorflow-transform==0.15.0 --no-deps


Collecting tensorflow-transform==0.15.0
  Using cached https://files.pythonhosted.org/packages/34/88/9ee55045a1ffbf44fb75b10a30c54609f58987987f69ace9b971938e750d/tensorflow-transform-0.15.0.tar.gz
  Saved ./tensorflow-transform-0.15.0.tar.gz
Successfully downloaded tensorflow-transform

Restart the kernel (click on the reload button above).


In [1]:
%%bash
pip freeze | grep -e 'flow\|beam'


apache-beam==2.16.0
tensorflow==2.0.0
tensorflow-datasets==1.3.0
tensorflow-estimator==2.0.0
tensorflow-hub==0.7.0
tensorflow-io==0.9.0
tensorflow-metadata==0.15.1
tensorflow-serving-api==2.0.0
tensorflow-transform==0.15.0

In [2]:
import tensorflow as tf
import tensorflow_transform as tft
import shutil
print(tf.__version__)


2.0.0

In [3]:
# change these to try this notebook out
BUCKET = 'cloud-example-labs'
PROJECT = 'project-id'
REGION = 'us-central1'

In [4]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [5]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION


Updated property [core/project].
Updated property [compute/region].

In [6]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
  gsutil mb -l ${REGION} gs://${BUCKET}
fi

Input source: BigQuery

Get data from BigQuery but defer the majority of filtering etc. to Beam. Note that the dayofweek column is now strings.


In [7]:
from google.cloud import bigquery


def create_query(phase, EVERY_N):
    """Creates a query with the proper splits.

    Args:
        phase: int, 1=train, 2=valid.
        EVERY_N: int, take an example EVERY_N rows.

    Returns:
        Query string with the proper splits.
    """
    base_query = """
    WITH daynames AS
    (SELECT ['Sun', 'Mon', 'Tues', 'Wed', 'Thurs', 'Fri', 'Sat'] AS daysofweek)
    SELECT
    (tolls_amount + fare_amount) AS fare_amount,
    daysofweek[ORDINAL(EXTRACT(DAYOFWEEK FROM pickup_datetime))] AS dayofweek,
    EXTRACT(HOUR FROM pickup_datetime) AS hourofday,
    pickup_longitude AS pickuplon,
    pickup_latitude AS pickuplat,
    dropoff_longitude AS dropofflon,
    dropoff_latitude AS dropofflat,
    passenger_count AS passengers,
    'notneeded' AS key
    FROM
    `nyc-tlc.yellow.trips`, daynames
    WHERE
    trip_distance > 0 AND fare_amount > 0
    """
    if EVERY_N is None:
        if phase < 2:
            # training
            query = """{0} AND ABS(MOD(FARM_FINGERPRINT(CAST
            (pickup_datetime AS STRING), 4)) < 2""".format(base_query)
        else:
            query = """{0} AND ABS(MOD(FARM_FINGERPRINT(CAST(
            pickup_datetime AS STRING), 4)) = {1}""".format(base_query, phase)
    else:
        query = """{0} AND ABS(MOD(FARM_FINGERPRINT(CAST(
        pickup_datetime AS STRING)), {1})) = {2}""".format(
            base_query, EVERY_N, phase)

    return query

query = create_query(2, 100000)

Let's pull this query down into a Pandas DataFrame and take a look at some of the statistics.


In [8]:
df_valid = bigquery.Client().query(query).to_dataframe()
display(df_valid.head())
df_valid.describe()


fare_amount dayofweek hourofday pickuplon pickuplat dropofflon dropofflat passengers key
0 4.5 Sat 0 -74.000292 40.728722 -73.995235 40.724961 1 notneeded
1 6.9 Sun 0 -73.986003 40.722688 -74.004549 40.718822 1 notneeded
2 16.5 Sun 0 -74.002155 40.740375 -73.967537 40.792845 5 notneeded
3 143.0 Sun 0 -73.990255 40.740407 -74.350245 40.663847 1 notneeded
4 19.0 Sun 0 -73.977255 40.754930 -73.917570 40.767272 1 notneeded
Out[8]:
fare_amount hourofday pickuplon pickuplat dropofflon dropofflat passengers
count 11181.000000 11181.000000 11181.000000 11181.000000 11181.000000 11181.000000 11181.000000
mean 11.242599 13.244075 -72.576852 39.973146 -72.748974 40.006091 1.722118
std 9.447462 6.548354 10.133452 5.777329 12.981577 5.664887 1.351062
min 2.500000 0.000000 -78.133333 -73.991278 -751.400000 -73.977970 0.000000
25% 6.000000 9.000000 -73.991849 40.734954 -73.991236 40.734008 1.000000
50% 8.500000 14.000000 -73.981824 40.752640 -73.980164 40.753427 1.000000
75% 12.500000 19.000000 -73.967418 40.766700 -73.964153 40.767832 2.000000
max 143.000000 23.000000 40.806487 41.366138 40.785400 41.366138 6.000000

Create ML dataset using tf.transform and Dataflow

Let's use Cloud Dataflow to read in the BigQuery data and write it out as TFRecord files. Along the way, let's use tf.transform to do scaling and transforming. Using tf.transform allows us to save the metadata to ensure that the appropriate transformations get carried out during prediction as well.

transformed_data is type pcollection.


In [9]:
import datetime
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft
import tensorflow_metadata as tfmd
from tensorflow_transform.beam import impl as beam_impl


def is_valid(inputs):
    """Check to make sure the inputs are valid.

    Args:
        inputs: dict, dictionary of TableRow data from BigQuery.

    Returns:
        True if the inputs are valid and False if they are not.
    """
    try:
        pickup_longitude = inputs['pickuplon']
        dropoff_longitude = inputs['dropofflon']
        pickup_latitude = inputs['pickuplat']
        dropoff_latitude = inputs['dropofflat']
        hourofday = inputs['hourofday']
        dayofweek = inputs['dayofweek']
        passenger_count = inputs['passengers']
        fare_amount = inputs['fare_amount']
        return fare_amount >= 2.5 and pickup_longitude > -78 \
            and pickup_longitude < -70 and dropoff_longitude > -78 \
            and dropoff_longitude < -70 and pickup_latitude > 37 \
            and pickup_latitude < 45 and dropoff_latitude > 37 \
            and dropoff_latitude < 45 and passenger_count > 0
    except:
        return False


def preprocess_tft(inputs):
    """Preproccess the features and add engineered features with tf transform.

    Args:
        dict, dictionary of TableRow data from BigQuery.

    Returns:
        Dictionary of preprocessed data after scaling and feature engineering.
    """
    import datetime
    print(inputs)
    result = {}
    result['fare_amount'] = tf.identity(inputs['fare_amount'])
    # build a vocabulary
    result['dayofweek'] = tft.string_to_int(inputs['dayofweek'])
    result['hourofday'] = tf.identity(inputs['hourofday'])  # pass through
    # scaling numeric values
    result['pickuplon'] = (tft.scale_to_0_1(inputs['pickuplon']))
    result['pickuplat'] = (tft.scale_to_0_1(inputs['pickuplat']))
    result['dropofflon'] = (tft.scale_to_0_1(inputs['dropofflon']))
    result['dropofflat'] = (tft.scale_to_0_1(inputs['dropofflat']))
    result['passengers'] = tf.cast(inputs['passengers'], tf.float32)  # a cast
    # arbitrary TF func
    result['key'] = tf.as_string(tf.ones_like(inputs['passengers']))
    # engineered features
    latdiff = inputs['pickuplat'] - inputs['dropofflat']
    londiff = inputs['pickuplon'] - inputs['dropofflon']
    result['latdiff'] = tft.scale_to_0_1(latdiff)
    result['londiff'] = tft.scale_to_0_1(londiff)
    dist = tf.sqrt(latdiff * latdiff + londiff * londiff)
    result['euclidean'] = tft.scale_to_0_1(dist)
    return result


def preprocess(in_test_mode):
    """Sets up preprocess pipeline.

    Args:
        in_test_mode: bool, False to launch DataFlow job, True to run locally.
    """
    import os
    import os.path
    import tempfile
    from apache_beam.io import tfrecordio
    from tensorflow_transform.coders import example_proto_coder
    from tensorflow_transform.tf_metadata import dataset_metadata
    from tensorflow_transform.tf_metadata import dataset_schema
    from tensorflow_transform.beam import tft_beam_io
    from tensorflow_transform.beam.tft_beam_io import transform_fn_io

    job_name = 'preprocess-taxi-features' + '-'
    job_name += datetime.datetime.now().strftime('%y%m%d-%H%M%S')
    if in_test_mode:
        import shutil
        print('Launching local job ... hang on')
        OUTPUT_DIR = './preproc_tft'
        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
        EVERY_N = 100000
    else:
        print('Launching Dataflow job {} ... hang on'.format(job_name))
        OUTPUT_DIR = 'gs://{0}/taxifare/preproc_tft/'.format(BUCKET)
        import subprocess
        subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split())
        EVERY_N = 10000

    options = {
        'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
        'job_name': job_name,
        'project': PROJECT,
        'num_workers': 1,
        'max_num_workers': 1,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True,
        'direct_num_workers': 1,
        'extra_packages': ['tensorflow-transform-0.15.0.tar.gz']
        }

    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    if in_test_mode:
        RUNNER = 'DirectRunner'
    else:
        RUNNER = 'DataflowRunner'

    # Set up raw data metadata
    raw_data_schema = {
        colname: dataset_schema.ColumnSchema(
            tf.string, [], dataset_schema.FixedColumnRepresentation())
        for colname in 'dayofweek,key'.split(',')
    }

    raw_data_schema.update({
        colname: dataset_schema.ColumnSchema(
            tf.float32, [], dataset_schema.FixedColumnRepresentation())
        for colname in
        'fare_amount,pickuplon,pickuplat,dropofflon,dropofflat'.split(',')
    })

    raw_data_schema.update({
        colname: dataset_schema.ColumnSchema(
            tf.int64, [], dataset_schema.FixedColumnRepresentation())
        for colname in 'hourofday,passengers'.split(',')
    })

    raw_data_metadata = dataset_metadata.DatasetMetadata(
        dataset_schema.Schema(raw_data_schema))

    # Run Beam
    with beam.Pipeline(RUNNER, options=opts) as p:
        with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):
            # Save the raw data metadata
            (raw_data_metadata |
                'WriteInputMetadata' >> tft_beam_io.WriteMetadata(
                    os.path.join(
                        OUTPUT_DIR, 'metadata/rawdata_metadata'), pipeline=p))

            # Read training data from bigquery and filter rows
            raw_data = (p | 'train_read' >> beam.io.Read(
                    beam.io.BigQuerySource(
                        query=create_query(1, EVERY_N),
                        use_standard_sql=True)) |
                        'train_filter' >> beam.Filter(is_valid))

            raw_dataset = (raw_data, raw_data_metadata)

            # Analyze and transform training data
            transformed_dataset, transform_fn = (
                raw_dataset | beam_impl.AnalyzeAndTransformDataset(
                    preprocess_tft))
            transformed_data, transformed_metadata = transformed_dataset

            # Save transformed train data to disk in efficient tfrecord format
            transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
                os.path.join(OUTPUT_DIR, 'train'), file_name_suffix='.gz',
                coder=example_proto_coder.ExampleProtoCoder(
                    transformed_metadata.schema))

            # Read eval data from bigquery and filter rows
            raw_test_data = (p | 'eval_read' >> beam.io.Read(
                beam.io.BigQuerySource(
                    query=create_query(2, EVERY_N),
                    use_standard_sql=True)) | 'eval_filter' >> beam.Filter(
                        is_valid))

            raw_test_dataset = (raw_test_data, raw_data_metadata)

            # Transform eval data
            transformed_test_dataset = (
                (raw_test_dataset, transform_fn) | beam_impl.TransformDataset()
                )
            transformed_test_data, _ = transformed_test_dataset

            # Save transformed train data to disk in efficient tfrecord format
            (transformed_test_data |
                'WriteTestData' >> tfrecordio.WriteToTFRecord(
                    os.path.join(OUTPUT_DIR, 'eval'), file_name_suffix='.gz',
                    coder=example_proto_coder.ExampleProtoCoder(
                        transformed_metadata.schema)))

            # Save transformation function to disk for use at serving time
            (transform_fn |
                'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
                    os.path.join(OUTPUT_DIR, 'metadata')))

# Change to True to run locally
preprocess(in_test_mode=False)


Launching Dataflow job preprocess-taxi-features-191217-213506 ... hang on
WARNING:tensorflow:From <ipython-input-9-609e78ab05aa>:124: ColumnSchema (from tensorflow_transform.tf_metadata.dataset_schema) is deprecated and will be removed in a future version.
Instructions for updating:
ColumnSchema is a deprecated, use from_feature_spec to create a `Schema`
WARNING:tensorflow:From <ipython-input-9-609e78ab05aa>:141: Schema (from tensorflow_transform.tf_metadata.dataset_schema) is deprecated and will be removed in a future version.
Instructions for updating:
Schema is a deprecated, use schema_utils.schema_from_feature_spec to create a `Schema`
WARNING:tensorflow:Tensorflow version (2.0.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
WARNING:tensorflow:Tensorflow version (2.0.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
{'fare_amount': <tf.Tensor 'inputs/inputs/F_fare_amount_copy:0' shape=(None,) dtype=float32>, 'dayofweek': <tf.Tensor 'inputs/inputs/dayofweek_copy:0' shape=(None,) dtype=string>, 'hourofday': <tf.Tensor 'inputs/inputs/hourofday_copy:0' shape=(None,) dtype=int64>, 'dropofflat': <tf.Tensor 'inputs/inputs/dropofflat_copy:0' shape=(None,) dtype=float32>, 'pickuplat': <tf.Tensor 'inputs/inputs/pickuplat_copy:0' shape=(None,) dtype=float32>, 'passengers': <tf.Tensor 'inputs/inputs/passengers_copy:0' shape=(None,) dtype=int64>, 'dropofflon': <tf.Tensor 'inputs/inputs/dropofflon_copy:0' shape=(None,) dtype=float32>, 'pickuplon': <tf.Tensor 'inputs/inputs/pickuplon_copy:0' shape=(None,) dtype=float32>, 'key': <tf.Tensor 'inputs/inputs/key_copy:0' shape=(None,) dtype=string>}
WARNING:tensorflow:From <ipython-input-9-609e78ab05aa>:50: string_to_int (from tensorflow_transform.mappers) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tft.compute_and_apply_vocabulary()` instead.
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow_core/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: gs://cloud-example-labs/taxifare/preproc_tft/tmp/tftransform_tmp/6f94628a67354fca9c368e66a9c8c729/saved_model.pb
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: gs://cloud-example-labs/taxifare/preproc_tft/tmp/tftransform_tmp/b466732559fb44308ef359bcfffea2ff/saved_model.pb
WARNING:tensorflow:Tensorflow version (2.0.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
WARNING:tensorflow:Tensorflow version (2.0.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 

This will take 10-15 minutes. You cannot go on in this lab until your DataFlow job has succesfully completed.


In [10]:
%%bash
# ls preproc_tft
gsutil ls gs://${BUCKET}/taxifare/preproc_tft/


gs://cloud-example-labs/taxifare/preproc_tft/
gs://cloud-example-labs/taxifare/preproc_tft/eval-00000-of-00001.gz
gs://cloud-example-labs/taxifare/preproc_tft/train-00000-of-00004.gz
gs://cloud-example-labs/taxifare/preproc_tft/train-00001-of-00004.gz
gs://cloud-example-labs/taxifare/preproc_tft/train-00002-of-00004.gz
gs://cloud-example-labs/taxifare/preproc_tft/train-00003-of-00004.gz
gs://cloud-example-labs/taxifare/preproc_tft/metadata/
gs://cloud-example-labs/taxifare/preproc_tft/tmp/

Train off preprocessed data

Now that we have our data ready and verified it is in the correct location we can train our taxifare model locally.


In [12]:
%%bash
rm -r ./taxi_trained
export PYTHONPATH=${PYTHONPATH}:$PWD
python3 -m tft_trainer.task \
    --train_data_path="gs://${BUCKET}/taxifare/preproc_tft/train*" \
    --eval_data_path="gs://${BUCKET}/taxifare/preproc_tft/eval*"  \
    --output_dir=./taxi_trained \


rm: cannot remove './taxi_trained': No such file or directory
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_log_step_count_steps': 100, '_device_fn': None, '_service': None, '_model_dir': './taxi_trained', '_experimental_distribute': None, '_protocol': None, '_is_chief': True, '_task_id': 0, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f15433f00f0>, '_task_type': 'worker', '_evaluation_master': '', '_master': '', '_keep_checkpoint_every_n_hours': 10000, '_tf_random_seed': None, '_keep_checkpoint_max': 5, '_save_checkpoints_secs': 600, '_save_checkpoints_steps': None, '_num_ps_replicas': 0, '_train_distribute': None, '_save_summary_steps': 100, '_eval_distribute': None, '_experimental_max_worker_delay_secs': None, '_num_worker_replicas': 1, '_global_id_in_cluster': 0, '_session_creation_timeout_secs': 7200}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow_core/python/training/training_util.py:236: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow_estimator/python/estimator/head/regression_head.py:156: to_float (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.cast` instead.
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow_core/python/keras/optimizer_v2/adagrad.py:108: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
2019-12-17 21:48:33.822785: I tensorflow/core/platform/cpu_feature_guard.cc:145] This TensorFlow binary is optimized with Intel(R) MKL-DNN to use the following CPU instructions in performance critical operations:  AVX2 FMA
To enable them in non-MKL-DNN operations, rebuild TensorFlow with the appropriate compiler flags.
2019-12-17 21:48:33.831904: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2200000000 Hz
2019-12-17 21:48:33.832385: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x56544440cd00 executing computations on platform Host. Devices:
2019-12-17 21:48:33.832423: I tensorflow/compiler/xla/service/service.cc:175]   StreamExecutor device (0): Host, Default Version
2019-12-17 21:48:33.832940: I tensorflow/core/common_runtime/process_util.cc:115] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into ./taxi_trained/model.ckpt.
INFO:tensorflow:loss = 106.78464, step = 0
INFO:tensorflow:global_step/sec: 111.062
INFO:tensorflow:loss = 3.4875064, step = 100 (0.900 sec)
INFO:tensorflow:global_step/sec: 209.705
INFO:tensorflow:loss = 55.23517, step = 200 (0.477 sec)
INFO:tensorflow:Saving checkpoints for 300 into ./taxi_trained/model.ckpt.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2019-12-17T21:48:38Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from ./taxi_trained/model.ckpt-300
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [5/50]
INFO:tensorflow:Evaluation [10/50]
INFO:tensorflow:Evaluation [15/50]
INFO:tensorflow:Evaluation [20/50]
INFO:tensorflow:Evaluation [25/50]
INFO:tensorflow:Evaluation [30/50]
INFO:tensorflow:Evaluation [35/50]
INFO:tensorflow:Evaluation [40/50]
INFO:tensorflow:Evaluation [45/50]
INFO:tensorflow:Evaluation [50/50]
INFO:tensorflow:Finished evaluation at 2019-12-17-21:48:39
INFO:tensorflow:Saving dict for global step 300: average_loss = 7.591091, global_step = 300, label/mean = 5.318125, loss = 7.591091, prediction/mean = 2.9586215
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 300: ./taxi_trained/model.ckpt-300
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow_core/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.
INFO:tensorflow:Signatures INCLUDED in export for Classify: None
INFO:tensorflow:Signatures INCLUDED in export for Eval: None
INFO:tensorflow:Signatures INCLUDED in export for Train: None
INFO:tensorflow:Signatures INCLUDED in export for Regress: None
INFO:tensorflow:Signatures INCLUDED in export for Predict: ['predict']
INFO:tensorflow:Signatures EXCLUDED from export because they cannot be be served via TensorFlow Serving APIs:
INFO:tensorflow:'regression' : Regression input must be a single string Tensor; got {'passengers': <tf.Tensor 'passengers:0' shape=(None,) dtype=float32>, 'pickuplon': <tf.Tensor 'pickuplon:0' shape=(None,) dtype=float32>, 'latdiff': <tf.Tensor 'sub_1:0' shape=(None,) dtype=float32>, 'hourofday': <tf.Tensor 'hourofday:0' shape=(None,) dtype=int64>, 'pickuplat': <tf.Tensor 'pickuplat:0' shape=(None,) dtype=float32>, 'dropofflon': <tf.Tensor 'dropofflon:0' shape=(None,) dtype=float32>, 'euclidean': <tf.Tensor 'Sqrt:0' shape=(None,) dtype=float32>, 'dayofweek': <tf.Tensor 'dayofweek:0' shape=(None,) dtype=int64>, 'dropofflat': <tf.Tensor 'dropofflat:0' shape=(None,) dtype=float32>, 'londiff': <tf.Tensor 'sub:0' shape=(None,) dtype=float32>}
INFO:tensorflow:'serving_default' : Regression input must be a single string Tensor; got {'passengers': <tf.Tensor 'passengers:0' shape=(None,) dtype=float32>, 'pickuplon': <tf.Tensor 'pickuplon:0' shape=(None,) dtype=float32>, 'latdiff': <tf.Tensor 'sub_1:0' shape=(None,) dtype=float32>, 'hourofday': <tf.Tensor 'hourofday:0' shape=(None,) dtype=int64>, 'pickuplat': <tf.Tensor 'pickuplat:0' shape=(None,) dtype=float32>, 'dropofflon': <tf.Tensor 'dropofflon:0' shape=(None,) dtype=float32>, 'euclidean': <tf.Tensor 'Sqrt:0' shape=(None,) dtype=float32>, 'dayofweek': <tf.Tensor 'dayofweek:0' shape=(None,) dtype=int64>, 'dropofflat': <tf.Tensor 'dropofflat:0' shape=(None,) dtype=float32>, 'londiff': <tf.Tensor 'sub:0' shape=(None,) dtype=float32>}
WARNING:tensorflow:Export includes no default signature!
INFO:tensorflow:Restoring parameters from ./taxi_trained/model.ckpt-300
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: ./taxi_trained/export/exporter/temp-b'1576619319'/saved_model.pb
INFO:tensorflow:Loss for final step: 36.147404.

In [13]:
!ls $PWD/taxi_trained/export/exporter


1576619319

Now let's create fake data in JSON format and use it to serve a prediction with gcloud ai-platform local predict


In [19]:
%%writefile /tmp/test.json
{"dayofweek":0, "hourofday":17, "pickuplon": -73.885262, "pickuplat": 40.773008, "dropofflon": -73.987232, "dropofflat": 40.732403, "passengers": 2.0}


Overwriting /tmp/test.json

In [20]:
%%bash
sudo find "/usr/lib/google-cloud-sdk/lib/googlecloudsdk/command_lib/ml_engine" -name '*.pyc' -delete

In [21]:
%%bash
model_dir=$(ls $PWD/taxi_trained/export/exporter/)
gcloud ai-platform local predict \
    --model-dir=./taxi_trained/export/exporter/${model_dir} \
    --json-instances=/tmp/test.json


PREDICTIONS
[20.746084213256836]
If the signature defined in the model is not serving_default then you must specify it via --signature-name flag, otherwise the command may fail.
WARNING: WARNING:tensorflow:From /usr/local/lib/python2.7/dist-packages/tensorflow_core/python/compat/v2_compat.py:65: disable_resource_variables (from tensorflow.python.ops.variable_scope) is deprecated and will be removed in a future version.
Instructions for updating:
non-resource variables are not supported in the long term
2019-12-17 21:50:07.411300: I tensorflow/core/platform/cpu_feature_guard.cc:145] This TensorFlow binary is optimized with Intel(R) MKL-DNN to use the following CPU instructions in performance critical operations:  AVX2 FMA
To enable them in non-MKL-DNN operations, rebuild TensorFlow with the appropriate compiler flags.
2019-12-17 21:50:07.419152: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2200000000 Hz
2019-12-17 21:50:07.419442: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x560ff33e0950 executing computations on platform Host. Devices:
2019-12-17 21:50:07.419471: I tensorflow/compiler/xla/service/service.cc:175]   StreamExecutor device (0): Host, Default Version
2019-12-17 21:50:07.419807: I tensorflow/core/common_runtime/process_util.cc:115] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
WARNING:tensorflow:From /usr/lib/google-cloud-sdk/lib/third_party/ml_sdk/cloud/ml/prediction/frameworks/tf_prediction_lib.py:230: load (from tensorflow.python.saved_model.loader_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.loader.load or tf.compat.v1.saved_model.load. There will be a new function for importing SavedModels in Tensorflow 2.0.
WARNING:tensorflow:From /usr/lib/google-cloud-sdk/lib/third_party/ml_sdk/cloud/ml/prediction/frameworks/tf_prediction_lib.py:230: load (from tensorflow.python.saved_model.loader_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.loader.load or tf.compat.v1.saved_model.load. There will be a new function for importing SavedModels in Tensorflow 2.0.

Copyright 2016-2018 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License